主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

eventStreamProcessor

将流分析函数应用于事件流

自 R2022b 起

    此对象需要 Streaming Data Framework for MATLAB® Production Server™

    说明

    使用 EventStreamProcessor 对象将流分析函数应用于事件流。使用 EventStreamProcessor 对象函数,您可以自动将事件从事件流引导到流处理分析函数,从而可以处理事件流中的大量数据。

    您可以对已知数量的事件窗口同步运行流处理分析函数,类似于 for 循环。您还可以使用桌面托管服务器运行它,以仿真生产环境中的异步部署。

    EventStreamProcessor 函数可以通过将事件收集到可配置大小的窗口中,按批顺序处理流处理数据。当窗口充满了请求的事件数量时,事件窗口将传递给流处理分析函数。然后,您可以保存分析函数产生的任何结果,并可以选择将其发布到不同的流。

    流处理函数可以是有状态的,也可以是无状态的。对于有状态函数,EventStreamProcessor 对象在对流处理函数的调用之间保持状态。如果流处理函数改变状态,则该函数可以将状态作为第二个输出参量返回。EventStreamProcessor 对象保留这些更改以供下一次函数迭代使用。

    创建对象

    描述

    esp = eventStreamProcessor(inputStream,streamFcn) 创建一个 EventStreamProcessor 对象,它将流函数 streamFcn 应用于事件流 inputStream,并分别设置该对象的 InputStreamStreamFunction 属性。

    esp = eventStreamProcessor(inputStream,streamFcn,initialState) 创建一个 EventStreamProcessor 对象,该对象另外使用函数 initialState 初始化持久状态并设置 InitialState 属性。如果 streamFcn 是有状态的,那么就需要 initialState

    示例

    esp = eventStreamProcessor(___,Name=Value) 使用一个或多个名称值参量设置对象属性。Name 是属性名称,Value 是相应的值。您可以按任意顺序指定多个名称-值参量,如 Name1=Value1,...,NameN=ValueN

    示例

    属性

    全部展开

    package 函数生成的可部署存档的名称,指定为字符串。默认存档名称是流处理函数的名称。

    数据类型: string

    用于对事件进行分组的事件变量的名称,指定为字符串数组或字符向量。

    如果 GroupVariable 非空,则每个事件窗口将被分成几组,其中事件变量具有相同的值。然后将每个组分别发送到流处理函数。GroupVariable 通常设置为事件键,以便独立处理来自每个事件源的事件。

    数据类型: string | char

    为流处理分析函数创建初始状态的函数,指定为函数句柄。如果流处理分析函数是有状态的,则在创建对象时必须设置此属性。

    流处理分析函数从中读取事件的事件流,指定为 KafkaStreamInMemoryStreamTestStream 对象。

    流处理分析函数将事件写入的事件流,指定为 KafkaStreamInMemoryStreamTestStream 对象。

    注意

    如果您使用 package 函数将流处理函数打包到可部署存档中,请不要将 OutputStream 设置为 InMemoryStream 对象。package 函数不支持该对象作为输出流。

    持久存储连接名称,指定为字符串或字符向量。当使用 InitialState 或使用有状态流函数时,必须指定 StateStore。将部署存档的 MATLAB Production Server 实例必须知道该连接名称。有关使用数据缓存进行持久存储的更多信息,请参阅 数据缓存基础知识

    数据类型: char | string

    流分析函数,指定为函数句柄。

    数据类型: function handle

    要读取的事件流中的位置,指定为以下值之一:

    • "Beginning" - 流中第一个可用事件

    • "End" - 刚刚过了信息流中的最后一个事件

    • "Current" - 刚刚经过流中的当前事件

    数据类型: string

    调用 seek 函数后清除持久状态的标志,指定为逻辑标量。

    数据类型: logical

    对象函数

    execute在特定数量的事件窗口上执行事件流处理函数
    package将流处理函数打包到 EventStreamProcessor 配置的可部署存档中
    seek设置事件流中的位置以开始处理事件
    start开始使用本地测试服务器处理事件流
    startServer启动本地测试服务器
    stop停止使用本地测试服务器处理事件流
    stopServer关闭本地测试服务器

    示例

    全部折叠

    假设您有一个在网络地址 kafka.host.com:9092 上运行的 Kafka® 服务器,该服务器有一个主题 RecamanSequence

    创建一个连接到 RecamanSequence 主题的对象。

    ks = kafkaStream("kafka.host.com",9092,"RecamanSequence");

    假设您有一个流处理分析函数 recamanSum 和一个名为 initRecamanSum 的初始化持久状态的函数。

    创建一个 EventStreamProcessor 对象来运行 recamanSum 函数并使用 initRecamanSum 函数初始化持久状态。

    esp = eventStreamProcessor(ks,@recamanSum,@initRecamanSum);
    esp = 
    
      EventStreamProcessor with properties:
    
          StreamFunction: @recamanSum
             InputStream: [1×1 matlab.io.stream.event.KafkaStream]
            OutputStream: [1×1 matlab.io.stream.event.InMemoryStream]
            InitialState: @initRecamanSum
           GroupVariable: [0×0 string]
            ReadPosition: Beginning
             ArchiveName: "recamanSum"
        ResetStateOnSeek: 1

    对流进行十次流处理分析函数迭代。

    execute(esp,10);

    检查结果。

    result = readtimetable(esp.OutputStream)

    假设您有一个在网络地址 kafka.host.com:9092 上运行的 Kafka 服务器,该服务器有一个主题 RecamanSequence

    还假设您有一个流处理分析函数 recamanSum 和一个函数 initRecamanSum 来初始化持久状态。

    创建一个连接到 RecamanSequence 主题的 KafkaStream 对象。

    ks = kafkaStream("kafka.host.com",9092,"RecamanSequence");

    创建另一个 KafkaStream 对象,将流处理分析函数的结果写入名为 RecamanSequenceResults 的不同主题。

    outKS = kafkaStream("kafka.host.com",9092,"RecamanSequenceResults");

    创建一个运行 recamanSum 函数的 EventStreamProcessor 对象,并使用 initRecamanSum 函数初始化持久状态。

    esp = eventStreamProcessor(ks,@recamanSum,@initRecamanSum,OutputStream=outKS);
    esp = 
    
      EventStreamProcessor with properties:
    
          StreamFunction: @recamanSum
             InputStream: [1×1 matlab.io.stream.event.KafkaStream]
            OutputStream: [1×1 matlab.io.stream.event.KafkaStream]
            InitialState: @initRecamanSum
           GroupVariable: [0×0 string]
            ReadPosition: Beginning
             ArchiveName: "recamanSum"
        ResetStateOnSeek: 1

    使用 MATLAB 编辑器,您可以在 recamanSum 函数中设置断点,以在启动服务器时检查传入的流处理数据。

    启动测试服务器。

    注意

    要使用测试服务器,您需要 MATLAB Compiler SDK™

    startServer(esp);

    这会打开 Production Server Compiler。当应用程序打开时,您必须手动启动服务器。

    要从应用程序启动测试服务器,点击 Test Client,然后点击 Start。有关如何使用该应用程序的示例,请参阅针对 MATLAB 测试客户端数据集成 (MATLAB Compiler SDK)

    导航回 MATLAB 命令提示符以开始处理事件。

    start(esp);

    Production Server Compiler 中,测试服务器接收数据。

    从 MATLAB 编辑器中,如果您设置了断点,则可以使用调试器检查函数处理的数据、状态和结果。点击 Continue 继续调试,或者点击 Stop 完成调试。

    从 MATLAB 命令提示符处停止服务器。

    stop(esp);

    从输出流读取结果。

    results = readtimetable(outKS);

    版本历史记录

    在 R2022b 中推出