主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

使用 MATLAB 处理 Kafka 事件

此示例展示如何使用 Streaming Data Framework for MATLAB® Production Server™ 处理来自 Kafka® 流的事件。该示例提供并解释了处理事件流的 recamanSuminitRecamanSum 流处理分析函数,以及创建事件流、验证事件流创建、使用流处理分析函数处理事件流并将结果写入输出流的 demoRecaman 脚本。

示例函数和脚本位于 support_package_root\mps\streaming\Examples\Numeric 文件夹中,其中 support_package_root 是系统上支持包的根文件夹。要获取此文件夹的路径,请使用以下命令:

fullfile(matlabshared.supportpkg.getSupportPackageRoot,'toolbox','mps','streaming','Examples','Numeric')

前提条件

  • 您的系统上必须安装 Streaming Data Framework for MATLAB Production Server。有关详细信息,请参阅安装 Streaming Data Framework for MATLAB Production Server

  • 您必须拥有一个正在运行的 Kafka 服务器,并且您具有创建主题所需的权限。该示例假设您的 Kafka 主机的网络地址是 kafka.host.com:9092

编写流式分析 MATLAB 函数

对于此示例,使用示例 MATLAB 函数 recamanSuminitRecamanSum。稍后,您可以在多个事件上迭代 recamanSum 流处理函数来计算结果。

编写状态函数

recamanSum 函数是有状态的。在有状态函数中,数据状态在事件之间共享,过去的事件可以影响当前事件的处理方式。recamanSum 计算流变量 R 中数值序列的累积和,并返回表 cSum 和结构体 state。表 cSum 包含 R 中元素的累积和以及时间戳。结构体 state 在其字段 cumsum 中包含该序列的最终值。

function [cSum, state] = recamanSum(data, state)
    timestamp = data.Properties.RowTimes; 
    key = data.key;
    
    sum = cumsum(data.R) + state.cumsum;
    
    state.cumsum = sum(end);

    cSum = timetable(timestamp, key, sum);
end

写入状态初始化函数

initRecamanSum 函数初始化 recamanSum 函数第一次迭代的状态。

function state = initRecamanSum(config)
    state.cumsum = 0;
end

创建示例流事件

要运行该示例,您需要示例流处理数据。demoRecaman 脚本包含以下代码,用于创建由 Recamán 序列的前 1000 个元素组成的流处理数据,还包含将序列写入 Kafka 主题 recamanSum_data 的代码。

  1. 设置 Kafka 主机名和端口号。

    kafkaHost = "kafka.host.com";
    kafkaPort = 9092;

  2. 创建 Recamán 序列的前 1000 个元素。

    要创建序列,您可以使用位于 recamanTimeTable 文件夹中的以下 \Examples\Numeric 函数。recamanTimeTable 创建一个时间表,其中包含 Recamán 序列的前 N 元素。

    function tt = recamanTimeTable(N)
        rs = zeros(1,N);
        for k=2:N
            n = k-1;
            subtract = rs(k-1) - n;
            if  subtract > 0 && any(rs == subtract) == false
                rs(k) = subtract;
            else
                rs(k) = rs(k-1) + n;
            end
        end
    
        incr = seconds(1:N);
    
        thisVeryInstant = ...
            convertTo(datetime, "epochtime", "Epoch", "1970-1-1");
        thisVeryInstant = datetime(thisVeryInstant, "ConvertFrom",...
            "epochtime", "Epoch", "1970-1-1");
    
        thisVeryInstant.TimeZone = "UTC";
        timestamp = (thisVeryInstant - seconds(N)) + incr';
    
        key = (0:N-1)';
        key = string(key);
        R = rs';
        tt = timetable(timestamp,R,key);
    
    end

  3. recamanTimeTable 的结果存储在时间表中。

    tt0 = recamanTimeTable(1000);

  4. 创建一个连接到 recamanSum_data 主题的流对象。然后,您将包含 Recamán 序列的时间表写入 recamanSum_data

    dataKS = kafkaStream(kafkaHost, kafkaPort, "recamanSum_data", Rows=100);

  5. 如果 recamanSum_data 主题已经存在,请将其删除。

    try deleteTopic(dataKS); catch, end

  6. 将整个 Recamán 序列写入 recamanSum_data 主题。

    writetimetable(dataKS, tt0);

验证示例数据创建

要验证您创建的示例流事件,请确认您从 recamanSum_data 主题读取的前 100 行与您创建并写入 recamanSum_data 主题的示例数据相同。demoRecaman 脚本包含以下代码。

  1. recamanSum_data 主题中的一个数据窗口(100 行)读入时间表 tt1

    tt1 = readtimetable(dataKS);

  2. 检查读入 tt1 的数据是否等于您写入的 Recamán 序列的前 100 个元素。

    if isequal(tt0(1:height(tt1),:), tt1)
        fprintf(1,"Success writing data to topic %s.\n", dataKS.Name);
    end

  3. 停止从 dataKS 流读取,因为稍后您将使用 dataKS 再次从 recamanSum_data 主题读取。不允许使用多个流读取同一主题。

    stop(dataKS);

使用流分析函数处理流事件

多次迭代 recamanSum 流处理分析函数,从输入流中读取数值序列,计算其累积和,并将结果写入输出流。demoRecaman 脚本包含以下代码。

  1. 创建连接到 recamanSum_results 主题的输出流。使用 recamanSum_results 存储 recamanSum 流处理函数的输出。

    resultKS = kafkaStream(kafkaHost,kafkaPort,"recamanSum_results", ...
        Rows=100);

  2. 创建一个事件流处理器,以在连接到流 recamanSum 的输入主题上迭代 dataKS 流处理函数。将结果写入与流 resultKS 连接的输出主题。使用名为 RR 的持久存储连接来存储迭代之间的数据状态。

    rsp = eventStreamProcessor(dataKS,@recamanSum,@initRecamanSum,...
        StateStore="RR",OutputStream=resultKS);
  3. 执行流函数十次。由于窗口大小(即一次读取的行数)为 100,因此十次迭代将消耗整个 1000 个元素的序列。

    fprintf(1,"Computing cumulative sum of Recaman sequence.\n");
    execute(rsp, 10);

  4. 删除事件流处理器。这将关闭 StateStore,这是连续多次运行该脚本所必需的。

    clear rsp;

  5. 从输出流中读取结果。

    fprintf(1,"Reading results from %s.\n", resultKS.Name);
    tt2 = timetable.empty;
    for n = 1:10
        tt2 = [ tt2 ; readtimetable(resultKS) ];
    end
    
    cSum = cumsum(tt0.R);
    if tt2(end,:).sum == cSum(end)
        fprintf(1,"Cumulative sum computed successfully: %d.\n", ...
            tt2(end,:).sum);
    else
        fprintf(1,"Expected cumulative sum %d. Computed %d instead.\n", ...
            cSum(end), tt2(end,:).sum);
    end

当您运行整个 demoRecaman 脚本时,您会看到以下输出。

Success writing data to topic recamanSum_data.
Computing cumulative sum of Recaman sequence.
Reading results from recamanSum_results.
Cumulative sum computed successfully: 837722.

另请参阅

| | | | | |

主题