Main Content

Deploy Streaming Analytic Function to MATLAB Production Server

You can package streaming analytic functions developed using Streaming Data Framework for MATLAB® Production Server™ and deploy the packaged archive (CTF file) to MATLAB Production Server. The deployed archive expects to receive streaming data. The Kafka® Connector executable pulls data from a Kafka host and pushes it to the deployed streaming archive. In the MATLAB desktop, Streaming Data Framework for MATLAB Production Server manages the Kafka connector. On a server instance, you must manage starting and stopping the Kafka.

The topic describes the Kafka connector and provides an example to process streaming data using a stateful streaming analytic function deployed to the server.

Kafka Connector Specifications

The Kafka connector is a Java® program that requires at least Java 8. To use the Kafka connector, the JAVA_HOME environment variable on your server machine must be set to the path of your Java 8 installation.

Each deployed archive that contains a streaming analytic function requires its own Kafka connector. For example, if you have two archives, you require two connectors. You do not have to install the Kafka connector twice, but you must run it twice and have exactly one Kafka connector configuration file per archive.

The lifecycle management of the Kafka connector depends on your production environment. Streaming Data Framework for MATLAB Production Server provides tools to make starting, stopping, and controlling the Kafka connector easier.

Prerequisites for Running Example

The following example provides a sample stateful streaming analytic function, shows how to package and deploy it to MATLAB Production Server, and shows how to manage the Kafka connector on the server.

To run the example, you require sample streaming data and a running MATLAB Production Server instance with a running persistence service.

Create Sample Streaming Data

Create sample streaming data and write the data to a Kafka stream. For this example, you create a 1000-element Recamán sequence and write it to a Kafka topic recamanSum_data. For details about creating the streaming data, see Create Sample Streaming Data.

Create Server Instance

Create a MATLAB Production Server instance to host the streaming deployable archive. For details about creating a server instance using the command line, see Set Up MATLAB Production Server Using the Command Line. For details about creating a server instance using the dashboard, see Create Server Instance Using Dashboard.

Start Persistence Service

Create a persistence service on the server instance and name the persistence connection RR. Start the persistence service. Later, when you package the streaming function into a deployable archive, you use the RR connection name. For details about creating and starting a persistence service, see Data Caching Basics.

Start Server Instance

Start the server instance that you created. For details about starting a server instance using the command line, see Start Server Instance Using Command Line. For details about starting a server instance using the dashboard, see Start Server Instance Using Dashboard.

Write Streaming Analytic MATLAB Function

For this example, use the sample MATLAB functions recamanSum and initRecamanSum, which are located in the support_package_root\toolbox\mps\streaming\Examples\Numeric folder, where support_package_root is the root folder of support packages on your system. To get the path to this folder, use this command:

fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')

Later, you package the recamanSum function and deploy it to MATLAB Production Server. For details about the recamanSum and initRecamanSum functions and to access the code, see Write Stateful Function and Write State Initialization Function.

Package Streaming Analytic Function

To package the recamanSum streaming analytic into a deployable archive, you can run the following script. The script creates an input KafkaStream object dataKS connected to the recamanSum_data topic and an output KafkaStream object resultKS connected to the recamanSum_results topic. Then, the script uses the streamingDataCompiler function to launch the Production Server Compiler (MATLAB Compiler SDK) app. Using the app, you create a deployable archive recamanSum.ctf suitable for deployment to MATLAB Production Server. Provide the StateStore input argument in the call to streamingDatacompiler and set its value to RR. RR is the persistence connection name you created in Start Persistence Service.

kafkaHost = "kafka.host.com";
kafkaPort = 9092;

dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);

resultKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_results", ...
    Rows=100);

archive = streamingDataCompiler("recamanSum", dataKS, resultKS, ...
    InitialState="initRecamanSum", StateStore="RR");

From the Production Server Compiler app, click Package to create the recamanSum archive. When the packaging process finishes, open the output folder. In the output folder, navigate to the for_distribution folder. The for_distribution folder contains the recamanSum.ctf deployable archive and Kafka connector scripts that you use later.

Package dialog box with option to open output folder selected

Deploy Streaming Analytic Function to Server

Deploy the recamanSum archive to a running MATLAB Production Server instance. If you manage the server using the command line, copy the recamanSum archive to the auto_deploy folder of your server instance. For other ways to deploy, see Deploy Archive to MATLAB Production Server.

Start Kafka Connector

Depending on the operating system of your server instance, enter the following commands at the system prompt to start the Kafka connector script kafka-connector-start. The Kafka connector pulls data from the Kafka host and pushes it to the deployed streaming archive.

The output of the start script is a process ID (PID). Save the value of the PID. You use this ID to stop the Kafka Connector process later.

Windows

powershell -executionPolicy bypass -File kafka-connector-start.ps1 -out out.log -err error.log -c collector.properties -k kafka.properties

Linux

 chmod +x kafka-connector-start.sh
./kafka-connector-start.sh -out out.log -err error.log -c collector.properties -k kafka.properties

Read Processed Data from Output Stream

After you start the Kafka Connector, the server starts receiving several requests. The deployed recamanSum archive receives streaming data from the input Kafka stream as input and calculates the cumulative sum of the Recamán sequence. Wait a few seconds for the server to finish processing these requests.

Create another KafkaStream object to read the result from the output topic.

readStream = kafkaStream("kafka.host.com", 9092, "recamanSum_results");

Call readtimetable to read the output data.

result = readtimetable(readStream)

Stop Kafka Connector

Depending on the operating system of your server instance, enter the following commands at the system prompt to stop the Kafka Connector script kafka-connector-stop. Replace PID with the process ID that you save when you start the connector.

Windows

powershell -executionPolicy bypass -File kafka-connector-stop.ps1 PID

Linux

 chmod +x kafka-connector-stop.sh
 ./kafka-connector-stop.sh PID

See Also

|

Related Topics