使用本地测试服务器测试流分析功能
此示例展示了如何使用 MATLAB® Production Server™ 的开发版本在部署到 MATLAB Production Server 之前测试流处理分析函数。MATLAB Compiler SDK™ 包含 MATLAB Production Server 的开发版本,您可以将其用作本地测试服务器,在将应用程序代码部署到企业系统之前对其进行测试和调试。
前提条件
您的系统上必须安装 Streaming Data Framework for MATLAB Production Server。有关详细信息,请参阅安装 Streaming Data Framework for MATLAB Production Server。
您必须拥有一个正在运行的 Kafka® 服务器,并且您具有创建主题所需的权限。该示例假设您的 Kafka 主机的网络地址是
kafka.host.com:9092。
编写流式分析 MATLAB 函数
为了测试目的,请使用位于 recamanSum 文件夹中的示例 MATLAB 函数 initRecamanSum 和 ,其中 support_package_root\mps\streaming\Examples\Numeric 是系统上支持包的根文件夹。要获取此文件夹的路径,请使用以下命令:support_package_root
fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')
稍后,您使用本地测试服务器测试 recamanSum 可部署存档。
有关 recamanSum 和 initRecamanSum 函数的详细信息以及访问代码,请参阅 编写状态函数 和 写入状态初始化函数。
创建示例流数据
通过创建示例数据并将数据写入 Kafka 流来准备测试。对于此示例,您创建一个 1000 个元素的 Recamán 序列并将其写入 Kafka 主题 recamanSum_data。
要创建 Recamán 序列,您可以使用以下 recamanTimeTable 函数,该函数也位于 \Examples\Numeric 文件夹中。recamanTimeTable 创建一个时间表,其中包含 Recamán 序列的前 N 个元素。
function tt = recamanTimeTable(N) rs = zeros(1,N); for k=2:N n = k-1; subtract = rs(k-1) - n; if subtract > 0 && any(rs == subtract) == false rs(k) = subtract; else rs(k) = rs(k-1) + n; end end incr = seconds(1:N); thisVeryInstant = ... convertTo(datetime, "epochtime", "Epoch", "1970-1-1"); thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",... "epochtime", "Epoch", "1970-1-1"); thisVeryInstant.TimeZone = "UTC"; timestamp = (thisVeryInstant - seconds(N)) + incr'; key = (0:N-1)'; key = string(key); R = rs'; tt = timetable(timestamp,R,key); end
您可以使用以下代码使用 recamanTimeTable 函数创建一个 1000 个元素的 Recamán 序列并将其写入 recamanSum_data Kafka 主题。该示例假设您在网络地址 kafka.host.com:9092 上运行 Kafka 主机,并且您具有在 Kafka 集群中创建主题所需的权限。
kafkaHost = "kafka.host.com"; kafkaPort = 9092; tt0 = recamanTimeTable(1000); dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100); try deleteTopic(dataKS); catch, end writetimetable(dataKS, tt0); tt1 = readtimetable(dataKS); if isequal(tt0(1:height(tt1),:), tt1) fprintf(1,"Success writing data to topic %s.\n", dataKS.Name); end stop(dataKS);
使用本地测试服务器仿真生产
为了在生产环境中仿真流处理数据处理,您可以使用 MATLAB Production Server 的开发版本运行 recamanSum 可部署存档并处理来自 recamanSum_data 主题的数据。
创建一个连接到
KafkaStream主题的recamanSum_data对象。ks = kafkaStream("kafka.host.com",9092,"recamanSum_data");
创建另一个
KafkaStream对象,将recamanSum解析函数的结果写入名为recaman_results的不同主题。outKS = kafkaStream("kafka.host.com",9092,"recamanSum_results");
创建一个运行
EventStreamProcessor函数的recamanSum对象,并使用initRecamanSum函数初始化持久状态。提供持久数据存储连接名称作为输入参量是可选的。如果您不提供,
EventStreamProcessor将创建一个具有唯一名称的连接以缓存迭代之间的数据状态。esp = eventStreamProcessor(ks,@recamanSum,@initRecamanSum,OutputStream=outKS);
使用 MATLAB 编辑器,您可以在
recamanSum函数中设置断点,以在启动服务器时检查传入的流处理数据。启动测试服务器。
startServer(esp);
这样做会打开 Production Server Compiler,其中包含流处理函数
recamanSum、入口点函数streamfcn和可部署存档recamanSum的值。
通过点击 Test Client,然后点击 Start 从应用程序启动测试服务器。
导航回 MATLAB 命令提示符以开始处理事件。
start(esp);
在 Production Server Compiler 中,您可以看到测试服务器接收数据。

从 MATLAB 编辑器中,如果您设置了断点,则可以使用调试器检查函数处理的数据、状态和结果。点击 Continue 继续调试,或者在完成调试后点击 Stop。
从 MATLAB 命令提示符处,停止事件处理并关闭服务器。
stop(esp); stopServer(esp);
从输出流中读取处理的结果。
results = readtimetable(outKS);
results = 50×2 timetable timestamp key sum ____________________ ____ ____ 20-Jan-1970 04:55:08 "0" 0 20-Jan-1970 04:55:08 "1" 1 20-Jan-1970 04:55:08 "2" 4 20-Jan-1970 04:55:08 "3" 10 20-Jan-1970 04:55:08 "4" 12 : : : 20-Jan-1970 04:55:08 "45" 1697 20-Jan-1970 04:55:08 "46" 1732 20-Jan-1970 04:55:08 "47" 1814 20-Jan-1970 04:55:08 "48" 1848 20-Jan-1970 04:55:08 "49" 1931 Display all 50 rows.
另请参阅
kafkaStream | eventStreamProcessor | execute | package | seek | start | startServer | stop | stopServer