使用 MATLAB 处理 Kafka 事件
此示例展示如何使用 Streaming Data Framework for MATLAB® Production Server™ 处理来自 Kafka® 流的事件。该示例提供并解释了处理事件流的 recamanSum
和 initRecamanSum
流处理分析函数,以及创建事件流、验证事件流创建、使用流处理分析函数处理事件流并将结果写入输出流的 demoRecaman
脚本。
示例函数和脚本位于
文件夹中,其中 support_package_root
\mps\streaming\Examples\Numeric
是系统上支持包的根文件夹。要获取此文件夹的路径,请使用以下命令:support_package_root
fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')
前提条件
您的系统上必须安装 Streaming Data Framework for MATLAB Production Server。有关详细信息,请参阅安装 Streaming Data Framework for MATLAB Production Server。
您必须拥有一个正在运行的 Kafka 服务器,并且您具有创建主题所需的权限。该示例假设您的 Kafka 主机的网络地址是
kafka.host.com:9092
。
编写流式分析 MATLAB 函数
对于此示例,使用示例 MATLAB 函数 recamanSum
和 initRecamanSum
。稍后,您可以在多个事件上迭代 recamanSum
流处理函数来计算结果。
编写状态函数
recamanSum
函数是有状态的。在有状态函数中,数据状态在事件之间共享,过去的事件可以影响当前事件的处理方式。recamanSum
计算流变量 R
中数值序列的累积和,并返回表 cSum
和结构体 state
。表 cSum
包含 R
中元素的累积和以及时间戳。结构体 state
在其字段 cumsum
中包含该序列的最终值。
function [cSum, state] = recamanSum(data, state) timestamp = data.Properties.RowTimes; key = data.key; sum = cumsum(data.R) + state.cumsum; state.cumsum = sum(end); cSum = timetable(timestamp, key, sum); end
写入状态初始化函数
initRecamanSum
函数初始化 recamanSum
函数第一次迭代的状态。
function state = initRecamanSum(config) state.cumsum = 0; end
创建示例流事件
要运行该示例,您需要示例流处理数据。demoRecaman
脚本包含以下代码,用于创建由 Recamán 序列的前 1000 个元素组成的流处理数据,还包含将序列写入 Kafka 主题 recamanSum_data
的代码。
设置 Kafka 主机名和端口号。
kafkaHost = "kafka.host.com"; kafkaPort = 9092;
创建 Recamán 序列的前 1000 个元素。
要创建序列,您可以使用位于
recamanTimeTable
文件夹中的以下\Examples\Numeric
函数。recamanTimeTable
创建一个时间表,其中包含 Recamán 序列的前N
元素。function tt = recamanTimeTable(N) rs = zeros(1,N); for k=2:N n = k-1; subtract = rs(k-1) - n; if subtract > 0 && any(rs == subtract) == false rs(k) = subtract; else rs(k) = rs(k-1) + n; end end incr = seconds(1:N); thisVeryInstant = ... convertTo(datetime, "epochtime", "Epoch", "1970-1-1"); thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",... "epochtime", "Epoch", "1970-1-1"); thisVeryInstant.TimeZone = "UTC"; timestamp = (thisVeryInstant - seconds(N)) + incr'; key = (0:N-1)'; key = string(key); R = rs'; tt = timetable(timestamp,R,key); end
将
recamanTimeTable
的结果存储在时间表中。tt0 = recamanTimeTable(1000);
创建一个连接到
recamanSum_data
主题的流对象。然后,您将包含 Recamán 序列的时间表写入recamanSum_data
。dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);
如果
recamanSum_data
主题已经存在,请将其删除。try deleteTopic(dataKS); catch, end
将整个 Recamán 序列写入
recamanSum_data
主题。writetimetable(dataKS, tt0);
验证示例数据创建
要验证您创建的示例流事件,请确认您从 recamanSum_data
主题读取的前 100 行与您创建并写入 recamanSum_data
主题的示例数据相同。demoRecaman
脚本包含以下代码。
将
recamanSum_data
主题中的一个数据窗口(100 行)读入时间表tt1
。tt1 = readtimetable(dataKS);
检查读入
tt1
的数据是否等于您写入的 Recamán 序列的前 100 个元素。if isequal(tt0(1:height(tt1),:), tt1) fprintf(1,"Success writing data to topic %s.\n", dataKS.Name); end
停止从
dataKS
流读取,因为稍后您将使用dataKS
再次从recamanSum_data
主题读取。不允许使用多个流读取同一主题。stop(dataKS);
使用流分析函数处理流事件
多次迭代 recamanSum
流处理分析函数,从输入流中读取数值序列,计算其累积和,并将结果写入输出流。demoRecaman
脚本包含以下代码。
创建连接到
recamanSum_results
主题的输出流。使用recamanSum_results
存储recamanSum
流处理函数的输出。resultKS = kafkaStream(kafkaHost,kafkaPort,"recamanSum_results", ... Rows=100);
创建一个事件流处理器,以在连接到流
recamanSum
的输入主题上迭代dataKS
流处理函数。将结果写入与流resultKS
连接的输出主题。使用名为RR
的持久存储连接来存储迭代之间的数据状态。rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,... StateStore="RR",OutputStream=resultKS);
执行流函数十次。由于窗口大小(即一次读取的行数)为 100,因此十次迭代将消耗整个 1000 个元素的序列。
fprintf(1,"Computing cumulative sum of Recaman sequence.\n"); execute(rsp, 10);
删除事件流处理器。这将关闭
StateStore
,这是连续多次运行该脚本所必需的。clear rsp;
从输出流中读取结果。
fprintf(1,"Reading results from %s.\n", resultKS.Name); tt2 = timetable.empty; for n = 1:10 tt2 = [ tt2 ; readtimetable(resultKS) ]; end cSum = cumsum(tt0.R); if tt2(end,:).sum == cSum(end) fprintf(1,"Cumulative sum computed successfully: %d.\n", ... tt2(end,:).sum); else fprintf(1,"Expected cumulative sum %d. Computed %d instead.\n", ... cSum(end), tt2(end,:).sum); end
当您运行整个 demoRecaman
脚本时,您会看到以下输出。
Success writing data to topic recamanSum_data. Computing cumulative sum of Recaman sequence. Reading results from recamanSum_results. Cumulative sum computed successfully: 837722.
另请参阅
readtimetable
| writetimetable
| kafkaStream
| eventStreamProcessor
| execute
| inMemoryStream
| testStream