技术文章

使用 Apache Kafka 通过 MATLAB System object 流式传输数据

作者 : Peter Webb,MathWorks


随着 Apache® Kafka® 和类似事件流处理平台的应用日益增加,越来越多的工程团队都在想方设法使用 MATLAB® 来实现流式分析。如果应用可从 Kafka 主题获取流数据,并在 MATLAB 中处理这些数据,然后将其发布到另一个主题,则该应用可广泛用于各种用例和工作流,包括对来自传感器、物联网设备或其他来源的数据进行筛选、变换和分析。

在 MATLAB 中工作时,System objectStreaming Data Framework for MATLAB Production Server™ 可简化读写 Kafka 的应用的开发。System object 针对分段处理数据流的迭代计算进行了优化,因此可与 Streaming Data Framework 配合使用,后者将 Kafka 数据分段提供给 MATLAB 应用。同样重要的是,System object 会保持状态,从而可以平滑或消除人为瞬变。此类瞬变可能会出现在分段信号或数据流的框架边界处。

本文介绍了开发使用 System object 处理 Kafka 流数据的 MATLAB 应用并将这些应用部署到 MATLAB Production Server 的工作流。该工作流包含下面三个阶段:

  1. 开发和测试数据处理算法。
  2. 将算法嵌入流式分析函数中,并使用本地测试服务器进行验证。
  3. 将算法打包并部署到 MATLAB Production Server。

为了更好地理解上述每个阶段的关键步骤,在应用示例的上下文中查看它们会有所帮助。此处描述的示例基于 DSP System Toolbox™ 中的一个信号处理示例,该示例使用 dsp.LowpassFilter System object™ 将一个低通滤波器应用于含噪正弦波(图 1)。本文中的示例代码(可供下载)经过修改适用于 Kafka,可从一个 Kafka 主题中将由 400 万个采样组成的正弦波信号读取为 1,000 个片段,其中每个片段包含 4,000 个采样。这里,需要注意的是,此 System object 和 Streaming Data Framework 工作流并不局限于信号处理应用:该工作流普遍适用于具有时变输入的动态系统。

两个含噪信号的频率图截图,其中一个信号未应用低通滤波器,另一个信号应用了低通滤波器。

图 1. 频谱分析仪窗口显示原始含噪信号(黄色)和低通滤波信号(蓝色)。

要求

要为该工作流的所有三个阶段运行示例代码,您需要:

  • MATLAB(22b 版本或更高版本)
  • DSP System Toolbox
  • Streaming Data Framework for MATLAB Production Server
  • MATLAB Compiler™ 和 MATLAB Compiler SDK™
  • MATLAB Production Server

要仅对第一阶段(涉及在 MATLAB 中开发处理 Kafka 流数据的算法)运行示例代码,您只需 MATLAB、DSP System Toolbox 和 Streaming Data Framework for MATLAB Production Server 即可。此外,如果您要对实时 Kafka 流进行测试,则必然需要 Kafka 集群。您也可以使用 Streaming Data Framework 的 TestStream 对象在没有 Kafka 集群的情况下进行开发和测试。示例代码假设您可以访问 Kafka 集群。

开发和测试算法

在该工作流中,第一阶段的重点是开发和测试使用 Kafka 数据的 MATLAB 算法。我们特意选择了一个非常简单的算法,以便此示例可以重点介绍如何将 System object 连接到 Kafka 流。

在示例文件 filterNoise.m 中,以下代码行创建了一个 dsp.LowpassFilter System object,以及一个用于从 Kafka 主题中读取的流对象:

filterNoise.m

FIRLowPass = dsp.LowpassFilter('PassbandFrequency',5000,'StopbandFrequency',8000); % Set parameters for connection to Kafka server host = "mpskafka2936glnxa64.mathworks.com"; port = 9092; topic = “NoisySineWave” inKS = kafkaStream(host, port, topic, Rows=frameSize, Order="IngestTime", BodyFormat="Binary");

请注意,上面的代码适用于名为 NoisySineWave 的 Kafka 主题。您需要创建自己的主题来测试您的算法。您可以使用 signalToStream.m 中的代码,用含噪正弦波数据填充 Kafka 主题。稍后,在 filterNoise.m 中,有一个 for 循环从该主题中读取一个帧,并用 System object 处理该帧,然后在频谱分析仪中显示结果。readtimetable 函数将 Kafka 事件流中的每个事件转换为时间表中的一行。原始的 DSP System Toolbox 示例代码在循环的每次迭代时都会生成一帧正弦波数据。在以下代码中,您可将该行替换为对 readtimetable 的调用。下面这种方式很重要,需要牢记:要与流数据集成,请将从文件读取或生成合成数据的代码更改为调用 readtimetable

filterNoise.m

for f = 1:frameCount ... % Read one entire frame from the Kafka topic tt = readtimetable(inKS); % Process the frame with the low pass filter. y = FIRLowPass(tt.x); % Display the results with the spectrum analyzer. SpecAna(tt.x,y) end

System object 使用内部状态来存储过去的行为,因此,在每次调用 FIRLowPass 函数时,它都会处理当前帧的数据,同时考虑所有已处理的帧。因此,与帧相关的瞬变得到了最小化。

如果您运行以下代码,则会发现随着时间的推移,频谱分析仪逐渐显示出与图 1 所示图像相同的结果,这与原始 DSP System Toolbox 示例代码的结果相符。关键区别在于实现。针对此示例修改的代码从 Kafka 主题而非 MATLAB 工作区中的变量读取数据。

将算法嵌入流式分析函数中

在 MATLAB 中测试了您的算法后,下一步就是将它实现为流式分析函数,以备在生产环境中部署。在开发和测试期间,MATLAB 代码会主动调用了一个函数 (readtimetable) 来使用 Kafka 主题中的数据,但在生产环境中,只要数据可供使用,系统就会调用该流式分析函数。您可以从拉取和推送模式方面思考这一问题。在前一阶段,MATLAB 函数从 Kafka 中拉取数据,而在此阶段,数据从 Kafka 推送到 MATLAB 函数中。

对于流式分析函数,值得注意的另一个要点是,它们不直接显示输出,而是将结果写入日志文件、数据库或其他目标 - 在本例中为 Kafka 主题。

流式分析函数可以是无状态函数,也可以是有状态函数。有状态函数要求在数据帧之间保留一些程序数据。无状态函数则无此要求。由于 MATLAB System object 使用状态数据对数据帧之间的过渡进行平滑处理,因此,它们需要有状态流式分析函数。将基于 MATLAB System object 的算法变换为流式分析需要五个步骤:

  1. 创建一个具有两个输入参数和两个输出参数的流式分析函数。
  2. 从状态输入变量中提取算法所需的状态。
  3. 从输入时间表中提取时间序列数据。
  4. 调用您的算法。
  5. 返回计算结果和下一个数据帧所需的状态。

Streaming Data Framework 可帮助您测试和部署流式分析函数。

要了解在示例应用中这是如何工作的,请先查看 testFilterStream.m。在设置了输入和输出流后,此文件中的下一个关键步骤是创建一个 EventStreamProcessor 对象,该对象将流式分析函数 filterStream 应用于 Kafka 事件流。

testFilterStream.m

% Use an EventStreamProcessor to apply the streaming analytic to the input % Kafka topic and write the result to the output Kafka topic. esp = eventStreamProcessor(inKS,@filterStream,@initFilterStream,OutputStream=outKS);

在示例代码中,您可以看到传递给 eventStreamProcessor 的前三个参数是输入事件流和两个函数:filterStreaminitFilterStream

第二个函数 initFilterStream 只调用一次,用于初始化流式分析函数的持久状态。在该示例中,此函数(在 initFilterStream.m 中定义)只创建将用于处理 Kafka 信号数据的 dsp.LowpassFilter System object。请注意,这与 filterNoise.m 中用于创建低通滤波器的代码完全相同。

initFilterStream.m

% Create a DSP LowPassFilter, which is a MATLAB system object. state.FIRLowPass = dsp.LowpassFilter('PassbandFrequency',5000,'StopbandFrequency',8000);

第一个函数是流式分析函数 filterStream。针对每个数据帧,都会调用一次该函数。由于 filterStream 是有状态函数,因此 EventStreamProcessor 对象会在该函数的调用之间保持状态。在调用时,该函数会被传递给当前帧的数据,以及当前状态(此处的状态是 dsp.LowpassFilter System object)。在处理(在本例中是通过应用低通滤波器)了数据后,流式分析函数返回滤波后的信号作为第一个参数,并返回更新后的状态作为第二个输出参数。EventStreamProcessor 将滤波后的信号写入输出主题,并为下一次函数迭代保留状态更改。

filterStream.m

function [result,state] = filterStream(signal,state) % filterStream Pass the input signal through a low pass filter. Capture the % resulting filtered signal in a timetable with the same timestamps as the % input signal. % Retrieve the DSP LowPassFilter object from inter-frame persistent % state. FIRLowPass = state.FIRLowPass; % Apply the low pass filter to the signal y = FIRLowPass(signal.x); % Create a timetable from the filtered signal and the input signal % timestamps. ts = signal.Properties.RowTimes; result = timetable(ts,y); % Preserve the low pass filter in inter-frame persistent state. This % ensures the state is loaded by whichever MPS worker processes the % next frame of the signal. state.FIRLowPass = FIRLowPass; end

返回到 testFilterStream.m 时,您可以看到,在设置了 EventStreamProcessor 后,代码会测试流式分析函数,具体方法是使用 EventStreamProcessor 对象的 execute 函数处理 10 帧数据。然后,该代码从相应的 Kafka 主题中读取输入信号和新生成的输出信号,并通过频谱分析仪显示结果。

testFilterStream.m

% Test the streaming analytic by processing 10 frames. This produces 10 % frames in the output topic. N = 10; execute(esp,N);

通过运行测试(比如在 testFilterStream.m 中实现的测试),您可以先评估流式分析函数的性能,然后再将其部署到 MATLAB Production Server。您还可以使用 MATLAB Production Server 的开发版本测试您的流式分析函数,该开发版本可用作本地测试服务器,用于在将您的代码部署到企业系统之前对其进行调试。

将算法打包并部署到 MATLAB Production Server

该工作流的第三个阶段是打包和部署。该阶段有完善的配套文档,并且相对简单。

示例文件 deployFilterStream.m 显示了如下主要步骤:

  1. 创建输入和输出流。
  2. 设置 EventStreamProcessor 对象。
  3. 调用 package 方法以启动 Production Server Compiler。

deployFilterStream.m

% Input and output streams inKS = kafkaStream("mpskafka2936glnxa64.mathworks.com", 9092,"NoisySineWave", Rows=frameSize); outKS = kafkaStream("mpskafka2936glnxa64.mathworks.com", 9092, "LowPassSineWave"); esp = eventStreamProcessor(inKS,@filterStream,@initFilterStream,OutputStream=outKS); prj = package(esp,StateStore="LocalRedis");

package 函数用您的流式分析函数的信息预填 Production Server Compiler(图 2)。当您点击 Package 时,MATLAB Compiler 会生成可部署的存档(CTF 文件),用于部署到 MATLAB Production Server。接下来,按照将流式分析函数部署到 MATLAB Production Server 中的步骤部署您的存档,并启动 Kafka Connector 可执行文件,以从您的 Kafka 主机中拉取数据并将其推送到您部署的存档。

Production Server Compiler 的截图,其中已预填流式分析函数的信息。

图 2. Production Server Compiler。

结束语

使用 Streaming Data Framework,您可以轻松地修改基于 MATLAB System object 的算法,以访问 Kafka 主题中的数据流。若要扩展您的解决方案,使其能够处理大量数据,您可以将流式分析函数部署到 MATLAB Production Server。

部署可能是一个耗时的过程,需要 IT 团队的参与及其时间安排,还需要考虑治理限制。为了最大限度地减少不必要的部署,您需要部署已经过全面测试和调试的应用。能够在部署之前进行本地调试和测试,是本文所述的 MATLAB 应用构建工作流的一项重要优势,这些应用可使用 System object 和 Streaming Data Framework for MATLAB Production Server 无缝访问 Kafka 数据。

2023 年发布