主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

将流式分析函数部署到 MATLAB Production Server

您可以打包使用 Streaming Data Framework for MATLAB® Production Server™ 开发的流处理分析函数,并将打包后的存档(CTF 文件)部署到 MATLAB Production Server。已部署的存档希望接收流处理数据。Kafka® 连接器可执行文件从 Kafka 主机提取数据并将其推送到已部署的流处理存档。在 MATLAB 桌面中,由 Streaming Data Framework for MATLAB Production Server 管理 Kafka 连接器。在服务器实例上,您必须管理启动和停止 Kafka。

本主题介绍了 Kafka 连接器,并提供了使用部署到服务器的状态流处理分析函数处理流处理数据的示例。

Kafka 连接器规格

Kafka 连接器是一个 Java® 程序,至少需要 Java 8。要使用 Kafka 连接器,必须将服务器计算机上的 JAVA_HOME 环境变量设置为 Java 8 安装的路径。

每个包含流处理分析函数的部署存档都需要自己的 Kafka 连接器。例如,如果您有两个存档,则需要两个连接器。您不必安装 Kafka 连接器两次,但必须运行它两次,并且每个存档中只有一个 Kafka 连接器配置文件。

Kafka 连接器的生命周期管理取决于您的生产环境。Streaming Data Framework for MATLAB Production Server 提供了一些工具,使启动、停止和控制 Kafka 连接器更加容易。

运行示例的先决条件

下面的示例提供了一个有状态流处理分析函数的示例,展示了如何将其打包并部署到 MATLAB Production Server,并展示了如何在服务器上管理 Kafka 连接器。

要运行该示例,您需要示例流处理数据和正在运行的 MATLAB Production Server 实例以及正在运行的持久性服务。

创建示例流数据

创建示例流处理数据并将数据写入 Kafka 流。对于此示例,您创建一个 1000 个元素的 Recamán 序列并将其写入 Kafka 主题 recamanSum_data。有关创建流处理数据的详细信息,请参阅创建示例流数据

创建服务器实例

创建一个 MATLAB Production Server 实例来托管流处理可部署存档。有关使用命令行创建服务器实例的详细信息,请参阅 使用命令行设置 MATLAB Production Server。有关使用仪表板创建服务器实例的详细信息,请参阅使用仪表板创建服务器实例

启动持久服务

在服务器实例上创建持久性服务,并将持久性连接命名为 RR。启动持久性服务。稍后,当您将流处理函数打包到可部署存档中时,您将使用 RR 连接名称。有关创建和启动持久性服务的详细信息,请参阅数据缓存基础知识

启动服务器实例

启动您创建的服务器实例。有关使用命令行启动服务器实例的详细信息,请参阅 使用命令行启动服务器实例。有关使用仪表板启动服务器实例的详细信息,请参阅使用仪表板启动服务器实例

编写流式分析 MATLAB 函数

对于此示例,使用示例 MATLAB 函数 recamanSuminitRecamanSum,它们位于 support_package_root\toolbox\mps\streaming\Examples\Numeric 文件夹中,其中 support_package_root 是系统上支持包的根文件夹。要获取此文件夹的路径,请使用以下命令:

fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')

稍后,您打包 recamanSum 函数并将其部署到 MATLAB Production Server。有关 recamanSuminitRecamanSum 函数的详细信息以及访问代码,请参阅 编写状态函数写入状态初始化函数

封装流式分析函数

要将 recamanSum 流处理分析打包到可部署存档中,您可以运行以下脚本。该脚本创建了一个连接到 recamanSum_data 主题的输入 KafkaStream 对象 dataKS,以及一个连接到 recamanSum_results 主题的输出 KafkaStream 对象 resultKS。然后,脚本使用 streamingDataCompiler 函数启动 Production Server Compiler (MATLAB Compiler SDK)。使用该应用程序,您可以创建一个可部署存档 recamanSum.ctf,适合部署到 MATLAB Production Server。在对 streamingDatacompiler 的调用中提供 StateStore 输入参量并将其值设置为 RRRR 是您在 启动持久服务 中创建的持久连接名称。

kafkaHost = "kafka.host.com";
kafkaPort = 9092;

dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);

resultKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_results", ...
    Rows=100);

archive = streamingDataCompiler("recamanSum", dataKS, resultKS, ...
    InitialState="initRecamanSum", StateStore="RR");

Production Server Compiler 中,点击 Package 以创建 recamanSum 存档。打包过程完成后,打开输出文件夹。在输出文件夹中,导航到 for_distribution 文件夹。for_distribution 文件夹包含您稍后使用的 recamanSum.ctf 可部署存档和 Kafka 连接器脚本。

Package dialog box with option to open output folder selected

将流分析函数部署到服务器

recamanSum 存档部署到正在运行的 MATLAB Production Server 实例。如果您使用命令行管理服务器,请将 recamanSum 存档复制到服务器实例的 auto_deploy 文件夹。有关其他部署方式,请参阅将存档部署至 MATLAB Production Server

启动 Kafka 连接器

根据服务器实例的操作系统,在系统提示符下输入以下命令以启动 Kafka 连接器脚本 kafka-connector-start。Kafka 连接器从 Kafka 主机提取数据并将其推送到已部署的流处理存档。

启动脚本的输出是一个进程 ID(PID)。保存 PID 的值。您稍后可以使用此 ID 来停止 Kafka 连接器进程。

Windows

powershell -executionPolicy bypass -File kafka-connector-start.ps1 -out out.log -err error.log -c collector.properties -k kafka.properties

Linux

 chmod +x kafka-connector-start.sh
./kafka-connector-start.sh -out out.log -err error.log -c collector.properties -k kafka.properties

从输出流读取处理后的数据

启动 Kafka 连接器后,服务器开始接收多个请求。部署的 recamanSum 存档接收来自输入的 Kafka 流的流处理数据作为输入,并计算 Recamán 序列的累积和。等待几秒钟,让服务器完成处理这些请求。

创建另一个 KafkaStream 对象来从输出主题读取结果。

readStream = kafkaStream("kafka.host.com", 9092, "recamanSum_results");

调用 readtimetable 读取输出数据。

result = readtimetable(readStream)

停止 Kafka 连接器

根据服务器实例的操作系统,在系统提示符下输入以下命令以停止 Kafka 连接器脚本 kafka-connector-stop。将 PID 替换为您启动连接器时保存的进程 ID。

Windows

powershell -executionPolicy bypass -File kafka-connector-stop.ps1 PID

Linux

 chmod +x kafka-connector-stop.sh
 ./kafka-connector-stop.sh PID

另请参阅

|

主题