使用 Apache Kafka 通过 MATLAB System object 流式传输数据
作者 : Peter Webb,MathWorks
随着 Apache® Kafka® 和类似事件流处理平台的应用日益增加,越来越多的工程团队都在想方设法使用 MATLAB® 来实现流式分析。如果应用可从 Kafka 主题获取流数据,并在 MATLAB 中处理这些数据,然后将其发布到另一个主题,则该应用可广泛用于各种用例和工作流,包括对来自传感器、物联网设备或其他来源的数据进行筛选、变换和分析。
在 MATLAB 中工作时,System object 和 Streaming Data Framework for MATLAB Production Server™ 可简化读写 Kafka 的应用的开发。System object 针对分段处理数据流的迭代计算进行了优化,因此可与 Streaming Data Framework 配合使用,后者将 Kafka 数据分段提供给 MATLAB 应用。同样重要的是,System object 会保持状态,从而可以平滑或消除人为瞬变。此类瞬变可能会出现在分段信号或数据流的框架边界处。
本文介绍了开发使用 System object 处理 Kafka 流数据的 MATLAB 应用并将这些应用部署到 MATLAB Production Server 的工作流。该工作流包含下面三个阶段:
- 开发和测试数据处理算法。
- 将算法嵌入流式分析函数中,并使用本地测试服务器进行验证。
- 将算法打包并部署到 MATLAB Production Server。
为了更好地理解上述每个阶段的关键步骤,在应用示例的上下文中查看它们会有所帮助。此处描述的示例基于 DSP System Toolbox™ 中的一个信号处理示例,该示例使用 dsp.LowpassFilter
System object™ 将一个低通滤波器应用于含噪正弦波(图 1)。本文中的示例代码(可供下载)经过修改适用于 Kafka,可从一个 Kafka 主题中将由 400 万个采样组成的正弦波信号读取为 1,000 个片段,其中每个片段包含 4,000 个采样。这里,需要注意的是,此 System object 和 Streaming Data Framework 工作流并不局限于信号处理应用:该工作流普遍适用于具有时变输入的动态系统。
要求
要为该工作流的所有三个阶段运行示例代码,您需要:
- MATLAB(22b 版本或更高版本)
- DSP System Toolbox
- Streaming Data Framework for MATLAB Production Server
- MATLAB Compiler™ 和 MATLAB Compiler SDK™
- MATLAB Production Server
要仅对第一阶段(涉及在 MATLAB 中开发处理 Kafka 流数据的算法)运行示例代码,您只需 MATLAB、DSP System Toolbox 和 Streaming Data Framework for MATLAB Production Server 即可。此外,如果您要对实时 Kafka 流进行测试,则必然需要 Kafka 集群。您也可以使用 Streaming Data Framework 的 TestStream
对象在没有 Kafka 集群的情况下进行开发和测试。示例代码假设您可以访问 Kafka 集群。
开发和测试算法
在该工作流中,第一阶段的重点是开发和测试使用 Kafka 数据的 MATLAB 算法。我们特意选择了一个非常简单的算法,以便此示例可以重点介绍如何将 System object 连接到 Kafka 流。
在示例文件 filterNoise.m
中,以下代码行创建了一个 dsp.LowpassFilter
System object,以及一个用于从 Kafka 主题中读取的流对象:
FIRLowPass = dsp.LowpassFilter('PassbandFrequency',5000,'StopbandFrequency',8000); % Set parameters for connection to Kafka server host = "mpskafka2936glnxa64.mathworks.com"; port = 9092; topic = “NoisySineWave” inKS = kafkaStream(host, port, topic, Rows=frameSize, Order="IngestTime", BodyFormat="Binary");
filterNoise.m
请注意,上面的代码适用于名为 NoisySineWave
的 Kafka 主题。您需要创建自己的主题来测试您的算法。您可以使用 signalToStream.m
中的代码,用含噪正弦波数据填充 Kafka 主题。稍后,在 filterNoise.m
中,有一个 for 循环从该主题中读取一个帧,并用 System object 处理该帧,然后在频谱分析仪中显示结果。readtimetable
函数将 Kafka 事件流中的每个事件转换为时间表中的一行。原始的 DSP System Toolbox 示例代码在循环的每次迭代时都会生成一帧正弦波数据。在以下代码中,您可将该行替换为对 readtimetable
的调用。下面这种方式很重要,需要牢记:要与流数据集成,请将从文件读取或生成合成数据的代码更改为调用 readtimetable
。
for f = 1:frameCount ... % Read one entire frame from the Kafka topic tt = readtimetable(inKS); % Process the frame with the low pass filter. y = FIRLowPass(tt.x); % Display the results with the spectrum analyzer. SpecAna(tt.x,y) end
filterNoise.m
System object 使用内部状态来存储过去的行为,因此,在每次调用 FIRLowPass
函数时,它都会处理当前帧的数据,同时考虑所有已处理的帧。因此,与帧相关的瞬变得到了最小化。
如果您运行以下代码,则会发现随着时间的推移,频谱分析仪逐渐显示出与图 1 所示图像相同的结果,这与原始 DSP System Toolbox 示例代码的结果相符。关键区别在于实现。针对此示例修改的代码从 Kafka 主题而非 MATLAB 工作区中的变量读取数据。
将算法嵌入流式分析函数中
在 MATLAB 中测试了您的算法后,下一步就是将它实现为流式分析函数,以备在生产环境中部署。在开发和测试期间,MATLAB 代码会主动调用了一个函数 (readtimetable
) 来使用 Kafka 主题中的数据,但在生产环境中,只要数据可供使用,系统就会调用该流式分析函数。您可以从拉取和推送模式方面思考这一问题。在前一阶段,MATLAB 函数从 Kafka 中拉取数据,而在此阶段,数据从 Kafka 推送到 MATLAB 函数中。
对于流式分析函数,值得注意的另一个要点是,它们不直接显示输出,而是将结果写入日志文件、数据库或其他目标 - 在本例中为 Kafka 主题。
流式分析函数可以是无状态函数,也可以是有状态函数。有状态函数要求在数据帧之间保留一些程序数据。无状态函数则无此要求。由于 MATLAB System object 使用状态数据对数据帧之间的过渡进行平滑处理,因此,它们需要有状态流式分析函数。将基于 MATLAB System object 的算法变换为流式分析需要五个步骤:
- 创建一个具有两个输入参数和两个输出参数的流式分析函数。
- 从状态输入变量中提取算法所需的状态。
- 从输入时间表中提取时间序列数据。
- 调用您的算法。
- 返回计算结果和下一个数据帧所需的状态。
Streaming Data Framework 可帮助您测试和部署流式分析函数。
要了解在示例应用中这是如何工作的,请先查看 testFilterStream.m
。在设置了输入和输出流后,此文件中的下一个关键步骤是创建一个 EventStreamProcessor
对象,该对象将流式分析函数 filterStream
应用于 Kafka 事件流。
% Use an EventStreamProcessor to apply the streaming analytic to the input % Kafka topic and write the result to the output Kafka topic. esp = eventStreamProcessor(inKS,@filterStream,@initFilterStream,OutputStream=outKS);
testFilterStream.m
在示例代码中,您可以看到传递给 eventStreamProcessor
的前三个参数是输入事件流和两个函数:filterStream
和 initFilterStream
。
第二个函数 initFilterStream
只调用一次,用于初始化流式分析函数的持久状态。在该示例中,此函数(在 initFilterStream.m
中定义)只创建将用于处理 Kafka 信号数据的 dsp.LowpassFilter
System object。请注意,这与 filterNoise.m
中用于创建低通滤波器的代码完全相同。
% Create a DSP LowPassFilter, which is a MATLAB system object. state.FIRLowPass = dsp.LowpassFilter('PassbandFrequency',5000,'StopbandFrequency',8000);
initFilterStream.m
第一个函数是流式分析函数 filterStream
。针对每个数据帧,都会调用一次该函数。由于 filterStream
是有状态函数,因此 EventStreamProcessor
对象会在该函数的调用之间保持状态。在调用时,该函数会被传递给当前帧的数据,以及当前状态(此处的状态是 dsp.LowpassFilter
System object)。在处理(在本例中是通过应用低通滤波器)了数据后,流式分析函数返回滤波后的信号作为第一个参数,并返回更新后的状态作为第二个输出参数。EventStreamProcessor
将滤波后的信号写入输出主题,并为下一次函数迭代保留状态更改。
function [result,state] = filterStream(signal,state) % filterStream Pass the input signal through a low pass filter. Capture the % resulting filtered signal in a timetable with the same timestamps as the % input signal. % Retrieve the DSP LowPassFilter object from inter-frame persistent % state. FIRLowPass = state.FIRLowPass; % Apply the low pass filter to the signal y = FIRLowPass(signal.x); % Create a timetable from the filtered signal and the input signal % timestamps. ts = signal.Properties.RowTimes; result = timetable(ts,y); % Preserve the low pass filter in inter-frame persistent state. This % ensures the state is loaded by whichever MPS worker processes the % next frame of the signal. state.FIRLowPass = FIRLowPass; end
filterStream.m
返回到 testFilterStream.m
时,您可以看到,在设置了 EventStreamProcessor
后,代码会测试流式分析函数,具体方法是使用 EventStreamProcessor
对象的 execute 函数处理 10 帧数据。然后,该代码从相应的 Kafka 主题中读取输入信号和新生成的输出信号,并通过频谱分析仪显示结果。
% Test the streaming analytic by processing 10 frames. This produces 10 % frames in the output topic. N = 10; execute(esp,N);
testFilterStream.m
通过运行测试(比如在 testFilterStream.m
中实现的测试),您可以先评估流式分析函数的性能,然后再将其部署到 MATLAB Production Server。您还可以使用 MATLAB Production Server 的开发版本测试您的流式分析函数,该开发版本可用作本地测试服务器,用于在将您的代码部署到企业系统之前对其进行调试。
将算法打包并部署到 MATLAB Production Server
该工作流的第三个阶段是打包和部署。该阶段有完善的配套文档,并且相对简单。
示例文件 deployFilterStream.m
显示了如下主要步骤:
- 创建输入和输出流。
- 设置
EventStreamProcessor
对象。 - 调用 package 方法以启动 Production Server Compiler。
% Input and output streams inKS = kafkaStream("mpskafka2936glnxa64.mathworks.com", 9092,"NoisySineWave", Rows=frameSize); outKS = kafkaStream("mpskafka2936glnxa64.mathworks.com", 9092, "LowPassSineWave"); esp = eventStreamProcessor(inKS,@filterStream,@initFilterStream,OutputStream=outKS); prj = package(esp,StateStore="LocalRedis");
deployFilterStream.m
package
函数用您的流式分析函数的信息预填 Production Server Compiler(图 2)。当您点击 Package 时,MATLAB Compiler 会生成可部署的存档(CTF 文件),用于部署到 MATLAB Production Server。接下来,按照将流式分析函数部署到 MATLAB Production Server 中的步骤部署您的存档,并启动 Kafka Connector 可执行文件,以从您的 Kafka 主机中拉取数据并将其推送到您部署的存档。
结束语
使用 Streaming Data Framework,您可以轻松地修改基于 MATLAB System object 的算法,以访问 Kafka 主题中的数据流。若要扩展您的解决方案,使其能够处理大量数据,您可以将流式分析函数部署到 MATLAB Production Server。
部署可能是一个耗时的过程,需要 IT 团队的参与及其时间安排,还需要考虑治理限制。为了最大限度地减少不必要的部署,您需要部署已经过全面测试和调试的应用。能够在部署之前进行本地调试和测试,是本文所述的 MATLAB 应用构建工作流的一项重要优势,这些应用可使用 System object 和 Streaming Data Framework for MATLAB Production Server 无缝访问 Kafka 数据。
2023 年发布