Process Kafka Events Using MATLAB
This example shows how to use the Streaming Data Framework for MATLAB® Production Server™ to process events from a Kafka® stream. The example provides and explains the recamanSum
and
initRecamanSum
streaming analytic functions that process event streams,
and the demoRecaman
script that creates event streams, validates event
stream creation, uses the streaming analytic function to process event streams, and writes the
results to an output stream.
The example functions and script are located in the
folder, where support_package_root
\mps\streaming\Examples\Numeric
is the root
folder of support packages on your system. To get the path to this folder, use this
command:support_package_root
fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')
Prerequisites
You must have Streaming Data Framework for MATLAB Production Server installed on your system. For more information, see Install Streaming Data Framework for MATLAB Production Server.
You must have a running Kafka server where you have the necessary permissions to create topics. The example assumes that the network address of your Kafka host is
kafka.host.com:9092
.
Write Streaming Analytic MATLAB Function
For this example, use the sample MATLAB functions recamanSum
and initRecamanSum
.
Later, you iterate the recamanSum
streaming function over several events
to compute results.
Write Stateful Function
The recamanSum
function is stateful. In
stateful functions, the data state is shared between events, and past events can influence
the way current events are processed. recamanSum
computes the
cumulative sum of a numeric sequence in stream variable R
, and returns
a table cSum
and structure state
. The table
cSum
contains the cumulative sum of the elements in
R
along with timestamps. The structure state
contains the final value of the sequence in its field cumsum
.
function [cSum, state] = recamanSum(data, state) timestamp = data.Properties.RowTimes; key = data.key; sum = cumsum(data.R) + state.cumsum; state.cumsum = sum(end); cSum = timetable(timestamp, key, sum); end
Write State Initialization Function
The initRecamanSum
function initializes state for the first
iteration of the recamanSum
function.
function state = initRecamanSum(config) state.cumsum = 0; end
Create Sample Stream Events
To run the example, you require sample streaming data. The
demoRecaman
script contains the following code to create streaming data
that consists of the first 1000 elements of Recamán's sequence and also contains code to
write the sequence to a Kafka topic recamanSum_data
.
Set the Kafka hostname and port number.
kafkaHost = "kafka.host.com"; kafkaPort = 9092;
Create the first 1000 elements of Recamán's sequence.
To create the sequence, you can use the following
recamanTimeTable
function also located in the\Examples\Numeric
folder.recamanTimeTable
creates a timetable containing the firstN
elements of Recamán's sequence.function tt = recamanTimeTable(N) rs = zeros(1,N); for k=2:N n = k-1; subtract = rs(k-1) - n; if subtract > 0 && any(rs == subtract) == false rs(k) = subtract; else rs(k) = rs(k-1) + n; end end incr = seconds(1:N); thisVeryInstant = ... convertTo(datetime, "epochtime", "Epoch", "1970-1-1"); thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",... "epochtime", "Epoch", "1970-1-1"); thisVeryInstant.TimeZone = "UTC"; timestamp = (thisVeryInstant - seconds(N)) + incr'; key = (0:N-1)'; key = string(key); R = rs'; tt = timetable(timestamp,R,key); end
Store the results of
recamanTimeTable
in a timetable.tt0 = recamanTimeTable(1000);
Create a stream object connected to the
recamanSum_data
topic. Later, you write the timetable that contains the Recamán sequence torecamanSum_data
.dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);
If the
recamanSum_data
topic already exists, delete it.try deleteTopic(dataKS); catch, end
Write the entire Recamán sequence to the
recamanSum_data
topic.writetimetable(dataKS, tt0);
Validate Sample Data Creation
To validate the sample stream events that you created, confirm that the first 100 rows
that you read from the recamanSum_data
topic are the same as the sample
data you created and wrote to the recamanSum_data
topic. The
demoRecaman
script contains the following code.
Read one window of data (100 rows) from the
recamanSum_data
topic into a timetablett1
.tt1 = readtimetable(dataKS);
Check if the data read into
tt1
is equal to the first 100 elements from the Recamán sequence you wrote.if isequal(tt0(1:height(tt1),:), tt1) fprintf(1,"Success writing data to topic %s.\n", dataKS.Name); end
Stop reading from the
dataKS
stream, since later you usedataKS
to read again from therecamanSum_data
topic. Reading from the same topic using multiple streams is not permitted.stop(dataKS);
Process Stream Events with Streaming Analytic Function
Iterate the recamanSum
streaming analytic function multiple times to
read the numeric sequence from the input stream, compute its cumulative sum, and write the
results to the output stream. The demoRecaman
script contains the
following code.
Create an output stream connected to the
recamanSum_results
topic. UserecamanSum_results
to store the output of therecamanSum
streaming function.resultKS = kafkaStream(kafkaHost,kafkaPort,"recamanSum_results", ... Rows=100);
Create an event stream processor to iterate the
recamanSum
streaming function over the input topic connected to the streamdataKS
. Write the results to the output topic connected to the streamresultKS
. Use a persistent storage connection namedRR
to store data state between iterations.rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,... StateStore="RR",OutputStream=resultKS);
Execute the stream function ten times. Since the window size, or the number of rows read at a time, is 100, ten iterations consumes the entire sequence of 1000 elements.
fprintf(1,"Computing cumulative sum of Recaman sequence.\n"); execute(rsp, 10);
Delete the event stream processor. This shuts down
StateStore
, which is required to run this script more than once in a row.clear rsp;
Read the results from the output stream.
fprintf(1,"Reading results from %s.\n", resultKS.Name); tt2 = timetable.empty; for n = 1:10 tt2 = [ tt2 ; readtimetable(resultKS) ]; end cSum = cumsum(tt0.R); if tt2(end,:).sum == cSum(end) fprintf(1,"Cumulative sum computed successfully: %d.\n", ... tt2(end,:).sum); else fprintf(1,"Expected cumulative sum %d. Computed %d instead.\n", ... cSum(end), tt2(end,:).sum); end
When you run the entire demoRecaman
script, you see the following
output.
Success writing data to topic recamanSum_data. Computing cumulative sum of Recaman sequence. Reading results from recamanSum_results. Cumulative sum computed successfully: 837722.
See Also
readtimetable
| writetimetable
| kafkaStream
| eventStreamProcessor
| execute
| inMemoryStream
| testStream