Streaming Data Framework for MATLAB Production Server 基础知识
使用 Streaming Data Framework for MATLAB® Production Server™ 从事件流处理平台(例如 Kafka®)读取和写入。使用此框架,您可以:
在 MATLAB 中开发流处理分析函数,用于过滤、转换、记录或处理事件流处理数据。
连接到流处理源并通过使用 Streaming Data Framework for MATLAB Production Server 的函数来测试分析函数如何读取和写入事件流。
仿真生产环境来测试您的流处理分析算法(需要 MATLAB Compiler SDK™)。
打包解析函数(需要 MATLAB Compiler SDK)并部署到 MATLAB Production Server。
安装 Streaming Data Framework for MATLAB Production Server
从 MATLAB 附加功能资源管理器安装 Streaming Data Framework for MATLAB Production Server 支持包。有关安装附加功能的信息,请参阅获取和管理附加功能 (MATLAB)。
安装完成后,在
中查找示例,其中 support_package_root
\toolbox\mps\streaming\Examples
是系统上支持包的根文件夹。要获取此文件夹的路径,请使用以下命令:support_package_root
fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples')
系统需求
Streaming Data Framework for MATLAB Production Server 具有与 MATLAB 相同的系统要求。欲了解更多信息,请参阅 MATLAB 的系统要求。
编写流式分析 MATLAB 函数
事件流分析函数通常消耗输入事件流并可以产生输出事件流。它可以使用任何可部署到 MATLAB Production Server 的 MATLAB 功能来过滤、转换、记录或处理事件流。
事件流分析函数处理事件窗口或批次。一个事件由三部分组成:
键 - 标识事件源
时间戳 - 表示事件发生的时间
正文 - 包含事件数据,指定为一组无序的(名称,值)对
分析函数将事件读入时间表。时间表的每一行代表一个流处理事件,通常按时间顺序排列。如果分析函数产生结果,那么它们也必须是时间表。
处理流时,您可以多次调用分析函数,因为窗口大小通常比流中的消息数量小得多。MATLAB Production Server 的无状态执行模型隔离了各个窗口的处理,因此一个窗口的处理不会影响下一个窗口的处理。需要在连续窗口处理之间进行交互的有状态函数指定在窗口之间保存并传递给分析函数的下一次调用的 MATLAB 结构体。
分析函数可以具有以下三种签名之一:
函数签名 | 描述 |
---|---|
results = analyticFcn (data) | 发出结果流的无状态分析函数 |
[ results, state ] = analyticFcn (data, state) | 状态分析函数,保留批次之间的状态并发出结果流 |
analyticFcn (data) | 无状态分析函数,不发出结果流 |
无状态解析函数
下面的 plotSierpinski
函数是无状态分析函数的一个示例。plotSierpinski
绘制输入时间表的 X 和 Y 列。该函数的源代码和运行它的脚本位于 \Examples\ExportOptions
文件夹中。
function howMany = plotSierpinski(xyData) hold on arrayfun(@(x,y)plot(x,y,'ro-', 'MarkerSize', 2), [xyData.X], [xyData.Y]); hold off drawnow count = height(xyData); howMany = timetable(xyData.Properties.RowTimes(end), count); end
状态分析功能
下面的 recamanSum
函数是有状态分析函数的一个示例。在状态函数中,数据状态在事件之间共享,过去的事件可以影响当前事件的处理方式。recamanSum
计算数值序列的累积和。返回两个值:
cSum
- 包含流中元素累计总和的表state
- 包含序列最终值的结构
recamanSum
函数的源代码、它的初始化函数 initRecamanSum
以及运行解析函数所用的脚本位于 \Examples\Numeric
文件夹中。
function [cSum, state] = recamanSum(data, state) timestamp = data.Properties.RowTimes; key = data.key; sum = cumsum(data.R) + state.cumsum; state.cumsum = sum(end); cSum = timetable(timestamp, key, sum); end
使用 MATLAB 处理 Kafka 事件
要处理来自流的事件,您需要创建一个对象来连接到流,从流中读取事件,迭代流处理分析函数来处理几个事件窗口,并且,如果分析函数产生结果,则创建不同的流对象将结果写入流。
以下代码示例概述了如何使用框架处理一个事件窗口。假设您有一个在网络地址 kafka.host.com:9092
上运行的 Kafka 主机,该主机有一个主题 recamanSum_data
。另外,假设 recamanSum_data
主题包含 Recamán 序列的前 1000 个元素。
创建一个
KafkaStream
对象,用于读取和写入recamanSum_data
主题。inKS = kafkaStream("kafka.host.com",9092,"recamanSum_data");
将“
recamanSum_data
”主题中的事件读入时间表“tt
”。tt = readtimetable(inKS);
调用
recamanSum
函数,计算tt
中 Recamán 序列一部分的累计和。由于recamanSum
是一个有状态的函数,首先调用initRecamSum
函数,该函数初始化状态。state = initRecamanSum(); [results, state] = recamanSum(tt,state);
有关如何处理多个事件窗口的详细示例,请参阅使用 MATLAB 处理 Kafka 事件。
使用 MATLAB Production Server 的开发版本仿真生产
在部署到 MATLAB Production Server 之前,您可以使用 MATLAB Production Server 的开发版本作为本地测试服务器来测试流处理分析函数。有关详细示例,请参阅使用本地测试服务器测试流分析功能。
将流分析部署到 MATLAB Production Server
您还可以将解析函数打包并部署到 MATLAB Production Server。有关详细示例,请参阅将流式分析函数部署到 MATLAB Production Server。
另请参阅
readtimetable
| writetimetable
| kafkaStream
| inMemoryStream
| testStream