主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

使用本地测试服务器测试流分析功能

此示例展示了如何使用 MATLAB® Production Server™ 的开发版本在部署到 MATLAB Production Server 之前测试流处理分析函数。MATLAB Compiler SDK™ 包含 MATLAB Production Server 的开发版本,您可以将其用作本地测试服务器,在将应用程序代码部署到企业系统之前对其进行测试和调试。

前提条件

  • 您的系统上必须安装 Streaming Data Framework for MATLAB Production Server。有关详细信息,请参阅安装 Streaming Data Framework for MATLAB Production Server

  • 您必须拥有一个正在运行的 Kafka® 服务器,并且您具有创建主题所需的权限。该示例假设您的 Kafka 主机的网络地址是 kafka.host.com:9092

编写流式分析 MATLAB 函数

为了测试目的,请使用位于 recamanSum 文件夹中的示例 MATLAB 函数 initRecamanSumsupport_package_root\mps\streaming\Examples\Numeric,其中 support_package_root 是系统上支持包的根文件夹。要获取此文件夹的路径,请使用以下命令:

fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')

稍后,您使用本地测试服务器测试 recamanSum 可部署存档。

有关 recamanSuminitRecamanSum 函数的详细信息以及访问代码,请参阅 编写状态函数写入状态初始化函数

创建示例流数据

通过创建示例数据并将数据写入 Kafka 流来准备测试。对于此示例,您创建一个 1000 个元素的 Recamán 序列并将其写入 Kafka 主题 recamanSum_data

要创建 Recamán 序列,您可以使用以下 recamanTimeTable 函数,该函数也位于 \Examples\Numeric 文件夹中。recamanTimeTable 创建一个时间表,其中包含 Recamán 序列的前 N 个元素。

function tt = recamanTimeTable(N)
    rs = zeros(1,N);
    for k=2:N
        n = k-1;
        subtract = rs(k-1) - n;
        if  subtract > 0 && any(rs == subtract) == false
            rs(k) = subtract;
        else
            rs(k) = rs(k-1) + n;
        end
    end

    incr = seconds(1:N);

    thisVeryInstant = ...
        convertTo(datetime, "epochtime", "Epoch", "1970-1-1");
    thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",...
        "epochtime", "Epoch", "1970-1-1");

    thisVeryInstant.TimeZone = "UTC";
    timestamp = (thisVeryInstant - seconds(N)) + incr';

    key = (0:N-1)';
    key = string(key);
    R = rs';
    tt = timetable(timestamp,R,key);

end

您可以使用以下代码使用 recamanTimeTable 函数创建一个 1000 个元素的 Recamán 序列并将其写入 recamanSum_data Kafka 主题。该示例假设您在网络地址 kafka.host.com:9092 上运行 Kafka 主机,并且您具有在 Kafka 集群中创建主题所需的权限。

kafkaHost = "kafka.host.com";
kafkaPort = 9092;

tt0 = recamanTimeTable(1000);

dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);

try deleteTopic(dataKS); catch, end

writetimetable(dataKS, tt0);

tt1 = readtimetable(dataKS);

if isequal(tt0(1:height(tt1),:), tt1)
    fprintf(1,"Success writing data to topic %s.\n", dataKS.Name);
end

stop(dataKS);

使用本地测试服务器仿真生产

为了在生产环境中仿真流处理数据处理,您可以使用 MATLAB Production Server 的开发版本运行 recamanSum 可部署存档并处理来自 recamanSum_data 主题的数据。

  1. 创建一个连接到 KafkaStream 主题的 recamanSum_data 对象。

    ks = kafkaStream("kafka.host.com",9092,"recamanSum_data");
  2. 创建另一个 KafkaStream 对象,将 recamanSum 解析函数的结果写入名为 recaman_results 的不同主题。

    outKS = kafkaStream("kafka.host.com",9092,"recamanSum_results");
  3. 创建一个运行 EventStreamProcessor 函数的 recamanSum 对象,并使用 initRecamanSum 函数初始化持久状态。

    提供持久数据存储连接名称作为输入参量是可选的。如果您不提供,EventStreamProcessor 将创建一个具有唯一名称的连接以缓存迭代之间的数据状态。

    esp = eventStreamProcessor(ks,@recamanSum,@initRecamanSum,OutputStream=outKS);
  4. 使用 MATLAB 编辑器,您可以在 recamanSum 函数中设置断点,以在启动服务器时检查传入的流处理数据。

  5. 启动测试服务器。

    startServer(esp);

    这样做会打开 Production Server Compiler,其中包含流处理函数 recamanSum、入口点函数 streamfcn 和可部署存档 recamanSum 的值。Production Server Compiler app interface with recamanSum

  6. 通过点击 Test Client,然后点击 Start 从应用程序启动测试服务器。

  7. 导航回 MATLAB 命令提示符以开始处理事件。

    start(esp);

    Production Server Compiler 中,您可以看到测试服务器接收数据。

    Production Server Compiler app interface with recamanSum

  8. 从 MATLAB 编辑器中,如果您设置了断点,则可以使用调试器检查函数处理的数据、状态和结果。点击 Continue 继续调试,或者在完成调试后点击 Stop

  9. 从 MATLAB 命令提示符处,停止事件处理并关闭服务器。

    stop(esp);
    stopServer(esp);
  10. 从输出流中读取处理的结果。

    results = readtimetable(outKS);
    results =
    
      50×2 timetable
    
             timestamp          key     sum 
        ____________________    ____    ____
    
        20-Jan-1970 04:55:08    "0"        0
        20-Jan-1970 04:55:08    "1"        1
        20-Jan-1970 04:55:08    "2"        4
        20-Jan-1970 04:55:08    "3"       10
        20-Jan-1970 04:55:08    "4"       12
    
                 :               :       :  
    
        20-Jan-1970 04:55:08    "45"    1697
        20-Jan-1970 04:55:08    "46"    1732
        20-Jan-1970 04:55:08    "47"    1814
        20-Jan-1970 04:55:08    "48"    1848
        20-Jan-1970 04:55:08    "49"    1931
    
    	Display all 50 rows.

另请参阅

| | | | | | | |

主题