主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

kafkaStream

创建与 Kafka 主题中的事件流的连接

自 R2022b 起

    此对象需要 Streaming Data Framework for MATLAB® Production Server™

    说明

    kafkaStream 函数创建一个 KafkaStream 对象,该对象连接到 Kafka® 主题并从该主题读取和写入事件流。

    一个事件由三部分组成:

    • 键 - 识别事件来源

    • 时间戳 - 表示事件发生的时间

    • 正文 - 包含以无序的(名称,值)对形式指定的事件数据

    创建 KafkaStream 对象后,使用 readtimetable 函数将事件读入时间表或使用 writetimetable 函数将时间表写入流。

    readtimetable 将事件转换为时间表的行。事件主体中的名称成为时间表列名,与每个名称关联的值成为事件行中的列值,事件时间戳成为行时间戳。writetimetable 将时间表的行转换为流中的事件。

    创建对象

    描述

    基于行的事件窗口

    ks = kafkaStream(host,port,topic) 创建一个默认的 KafkaStream 对象,该对象连接到指定主机名和端口的 Kafka 主题。此语法将 HostPortTopic 属性分别设置为 hostporttopic。该对象一次读取 50 个流事件行。

    ks = kafkaStream(host,port,topic,Rows=numevents) 创建一个 KafkaStream 对象,该对象一次读取 numevents 流事件行。

    示例

    基于持续时间的事件窗口

    ks = kafkaStream(host,port,topic,Duration=timespan) 创建一个 KafkaStream 对象,该对象读取在指定时间戳跨度 timespan 内发生的流事件。

    附加选项

    ks = kafkaStream(___,propname1,propval1,...,propnameN,propvalN) 使用任何前面的语法设置 Kafka 提供程序属性。

    示例

    ks = kafkaStream(___,Name=Value) 使用一个或多个名称值参量指定事件流选项。您还可以使用名称值参量设置属性。您可以使用这些名称-值参量和属性来指定如何将事件转换为时间表或从时间表转换。

    输入参量

    全部展开

    事件窗口中的事件数,指定为正整数。Rows=numevents 指定对 readtimetable 函数的调用返回的行数。如果可供读取的行数少于指定数,则 readtimetable 将超时并返回一个空的时间表。

    readtimetable 直到处理完窗口中的所有事件才会返回,因此具有较大行值的窗口可能会阻止其他进程继续运行。要配置超时时间以防止阻塞,请使用 ReadLimit 属性。

    示例: Rows=500 指定每次调用 readtimetable 都会返回一个包含 500 行的时间表。

    数据类型: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    事件窗口中的时间戳跨度,指定为持续时间标量。Duration=timespan 根据时间戳确定 readtimetable 函数返回的事件。timespan 指定事件窗口中事件的最后一个时间戳和第一个时间戳之间的差值。

    readtimetable 直到处理完窗口中的所有事件才会返回,因此持续时间较长的窗口可能会阻止其他进程继续运行。要配置超时时间以防止阻塞,请使用 ReadLimit 属性。

    示例: Duration=minutes(1) 指定每次调用 readtimetable 都会返回一个包含一分钟事件的时间表,其中最后一个事件的时间戳不比第一个事件的时间戳晚一分钟以上。

    数据类型: duration

    Kafka 提供程序属性的名称,指定为字符向量或字符串标量。使用单引号或双引号括住 propname。Kafka 属性名称始终至少包含一个点字符,例如 retention.ms。有关 Kafka 属性的列表,请参阅 Kafka 文档:https://kafka.apache.org/documentation/#configuration

    属性的值 propval 必须遵循属性名称。将属性名称及其对应的值指定为以逗号分隔的对。

    示例: kafkaStream(host,port,topic,"security.protocol","SASL_SSL") 将 Kafka 配置属性 security.protocol 设置为 SASL_SSL

    Kafka 提供程序属性的值。有关 Kafka 属性及其值的列表,请参阅 Kafka 文档:https://kafka.apache.org/documentation/#configuration

    该属性的值必须遵循属性名称 propname。将属性名称及其对应的值指定为以逗号分隔的对。您可以将 propval 指定为任何受支持的 MATLAB 数据类型,但必须能够将该值转换为字符串。

    示例: kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512") 将 Kafka 配置属性 sasl.mechanism 的值设置为 SCRAM-SHA-512

    名称-值参数

    全部展开

    Name1=Value1,...,NameN=ValueN 的形式指定可选参量对组,其中 Name 是参量名称,Value 是对应的值。名称-值参量必须出现在其他参量之后,但对各个参量对组的顺序没有要求。

    宽限期

    全部展开

    GraceUnit 为单位等待请求的事件窗口中的消息的时间长度,指定为实数标量或持续时间标量。KafkaStream 对象等待宽限期结束才返回从流中读取的事件。GracePeriodGraceUnit 参量一起设置 GracePeriod 属性。

    此参量仅适用于具有基于持续时间的事件窗口的对象,即使用 timespan 参量创建的 KafkaStream 对象。对于使用 numevents 参量创建的对象,宽限期将被忽略。

    示例: 10

    示例: minutes(10)

    数据类型: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64 | duration

    GracePeriod 名称-值参量指定的宽限期的时间单位,指定为以下值之一:

    • "Milliseconds"

    • "Seconds"

    • "Minutes"

    • "Hours"

    • "Days"

    GracePeriodGraceUnit 参量一起设置 GracePeriod 属性。

    KafkaStream 对象将 GracePeriod 持续时间标量转换为 GraceUnit 指定的单位。例如,假设您使用 minutes 函数指定两分钟的宽限期,但将单位设置为秒。GracePeriod 属性显示宽限期(以秒为单位)。

    ks = kafkaStream(host,port,topic,Duration=minutes(10), ...
                     GracePeriod=minutes(2),GraceUnit="Seconds")
    
    ks = 
    
      KafkaStream with properties:
                ...
                GracePeriod: "120 Seconds"
                ...

    此参量仅适用于具有基于持续时间的事件窗口的对象,即使用 timespan 参量创建的 KafkaStream 对象。对于使用 numevents 参量创建的对象,宽限期将被忽略。

    数据类型: string | char

    架构

    全部展开

    将事件数据转换为 MATLAB 数据类型的规则,指定为事件架构格式的 JSON 字符串。您可以使用 ImportOptions 属性更轻松地指定事件模式。

    将 MATLAB 数据类型转换为事件数据的规则,指定为事件架构格式的 JSON 字符串。您可以使用 ExportOptions 属性更轻松地指定事件模式。

    属性

    全部展开

    Kafka 服务器的主机名,指定为字符向量或字符串标量。

    示例: '144.213.5.7''localhost'

    数据类型: char | string

    Kafka 服务器的端口号,指定为 [0, 65,535] 范围内的整数。

    示例: 9092

    Kafka 主题名称,指定为字符向量或字符串标量。

    示例: "CoolingFan"

    数据类型: char | string

    Kafka 消费者组 ID,指定为字符向量或字符串标量。

    多个 Kafka 消费者可以属于同一个消费者组。在这种情况下,Kafka 在组内的消费者之间共享数据,以便同一组中的任何两个消费者都不会收到相同的消息。默认情况下,每个 kafkaStream 对象都有一个唯一的消费者组 ID,这使得多个消费者可以独立地从同一主题读取。

    数据类型: char | string

    对流中的事件进行排序的策略,指定为以下值之一:

    • "EventTime" - 根据事件发生的时间排序。即使事件到达 Kafka 服务器的顺序混乱,也能确保事件时间的顺序。

    • "IngestTime" - 根据事件在流中出现的时间对其进行排序。

    创建对象后无法设置此属性的值。

    数据类型: string | char

    此 属性 为只读。

    KakfaStream 对象等待消息的时间,指定为 "Length Units" 形式的字符串标量,其中:

    • Length 是宽限期的长度,由对象创建期间的 GracePeriod 参量指定。

    • Units 是宽限期的单位,由对象创建期间的 GraceUnits 参量指定。

    创建对象时,如果没有指定宽限期,则 GracePeriod 属性将设置为 "0 Seconds"(无宽限期)。

    示例: "10 Minutes"

    数据类型: string

    此 属性 为只读。

    事件窗口大小,指定为固定的时间量(使用 timespan 参量)或固定数量的消息(使用 numevents 参量)。

    数据类型: duration | single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    等待流响应的策略,指定为以下值之一:

    • "Size" - 客户端优先填充事件窗口。使用这种策略,只要客户端仍在接收预期数量的消息,它可能会等待比 RequestTimeout 时间段更长的时间。默认消息数为 50。如果客户端在 RequestTimeout 时间段内没有收到任何消息,它就不再等待。

    • "Time" - 客户端严格遵守 RequestTimeout 限制,即使它没有收到预期数量的消息。RequestTimeout 指定流对象在接收事件之间等待的时间。如果流正在主动接收数据,则该操作期间不会超时。

    事件时间戳的单位,指定为以下值之一:

    • "Milliseconds"

    • "Seconds"

    • "Minutes"

    • "Hours"

    • "Days"

    将事件时间戳解释为 UNIX® 纪元之前或之后相应单位的数量。

    数据类型: string | char

    连接和请求超时

    客户端等待 Kafka 主机的初始响应的秒数,指定为正整数。

    数据类型: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    终止请求之前等待的秒数,指定为正整数。等待时间包括连接 Kafka 主机以及 Kafka 主机和客户端之间的数据传输。

    数据类型: single | double | int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64

    导入和导出选项

    将流事件转换为 MATLAB 数据的规则,指定为 ImportOptions 对象。该对象控制流事件导入 MATLAB。

    将 MATLAB 数据转换为流事件的规则,指定为 ExportOptions 对象。该对象控制将 MATLAB 数据导出到流中。

    标志指示导出模式是否写入输出流,指定为逻辑标量。

    该模式嵌入在每个事件中,这可以显著增加事件的规模。如果下游应用程序不需要该模式,请将此标志设置为 false 以减少流中的字节数。

    数据类型: logical

    事件键和主体编码

    事件流中键变量的名称,指定为字符串标量或字符向量。

    数据类型: string | char

    用于解释事件键中的位的字符编码格式,指定为以下之一:

    • utf8 - UTF-8 编码格式

    • utf16 - UTF-16 编码格式

    • base64 - Base 64 编码格式

    • uint8 - 八位无符号二进制字节

    如果 KeyEncodingutf8utf16,那么 KeyType 属性必须是 text。如果 KeyEncodingbase64uint8,那么 KeyType 必须是数值编码格式之一。

    用于解释事件键中的字节的字符编码方案,指定为以下值之一:

    • uint8 - 单字节无符号整数

    • int8 - 单字节有符号整数

    • uint16 - 双字节无符号整数

    • int16 - 双字节有符号整数

    • uint32 - 四字节无符号整数

    • int32 - 四字节有符号整数

    • uint64 - 八字节无符号整数

    • int64 - 八字节有符号整数

    • single - 单精度 IEEE 754 浮点数

    • double - 双精度 IEEE 754 浮点数

    • text - 字符串

    如果 KeyTypetext,那么 KeyEncoding 属性必须是 utf8utf16。如果 KeyType 是任何其他数值编码格式,则 KeyEncoding 必须是 base64uint8

    在事件键中存储位的顺序,指定为下列之一。

    • LittleEndian - 首先存储最低有效位

    • BigEndian - 首先存储最高有效位

    • MatchHost - 位存储顺序与流处理数据框架运行的主机使用的顺序相同

    • NotApplicable - 不是整数键

    此属性仅适用于整数键,不适用于浮点或文本键。

    用于解释事件主体中的位的字符编码格式,指定为以下之一:

    • utf8 - UTF-8 编码格式

    • utf16 - UTF-16 编码格式

    • base64 - Base 64 编码格式

    • uint8 - 八位无符号二进制字节

    该属性决定事件主体中使用的字节的大小和编码,采用 BodyFormat 指定的格式。

    事件主体中的字节格式,指定为以下之一:

    • JSON - JSON 字符串

    • Array - MATLAB 数组

    • Text - 字符串数据

    • Binary - 二进制数据

    根据 BodyEncoding 指定的编码,字节可以大于八位。

    对象函数

    全部展开

    readtimetable从事件流读取时间表
    writetimetable将时间表写入事件流
    seek在事件流中设置读取位置
    preview预览事件流中的事件子集
    identifyingName事件流名称
    detectImportOptions根据事件流内容创建导入选项
    detectExportOptions根据事件流内容创建导出选项
    readeventsKafka 流中读取原始事件,无需应用架构处理
    flush重置读取窗口边界
    stop停止处理来自 Kafka 主题的事件流
    loggederrorKafka 流操作的错误信息
    createTopicKafka 集群中创建主题
    deleteTopic从 Kafka 集群中删除主题
    categoryListKafka 流提供程序属性列表
    getProviderPropertiesKafka 流配置属性数据
    setProviderProperties设置特定于 Kafka 配置的属性
    isProperty确定是否设置了 Kafka 流提供程序属性

    示例

    全部折叠

    假设您有一个在网络地址 kafka.host.com:9092 上运行的 Kafka 服务器,该服务器有一个主题 CoolingFan

    假设 Kafka 主机配置为使用 SSL。要配置 Kafka 主机和客户端之间的 SSL 通信,请在创建用于读取和写入 Kafka 主题的对象时提供 SSL 配置设置。

    ks = kafkaStream("kafka.host.com",9092,"CoolingFan", ...
                    "security.protocol","SASL_SSL", ...
                    "ssl.truststore.type","PEM", ...
                    "ssl.truststore.location","prodserver.pem")
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "CoolingFan"
                      Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "Import to MATLAB types"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 50
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    确认设置了哪些属性。

    props = getProviderProperties(ks);
    unique({props.name}')
    ans =
    
      7×1 cell array
    
        {'auto.offset.reset'      }
        {'retention.ms'           }
        {'sasl.jaas.config'       }
        {'sasl.username'          }
        {'security.protocol'      }
        {'ssl.truststore.location'}
        {'ssl.truststore.type'    }

    假设您有一个在网络地址 kafka.host.com:9092 上运行的 Kafka 服务器,该服务器有一个主题 CoolingFan

    创建一个连接到 CoolingFan 主题的对象,并且只请求 10 条消息,而不是默认值。

    ks = kafkaStream("kafka.host.com",9092,"CoolingFan",Rows=10)
    ks = 
    
      KafkaStream with properties:
    
                      Topic: "CoolingFan"
                      Group: "da576775-49c9-4de3-9955-2bdd9f963aa0"
                      Order: EventTime
                       Host: "kafka.host.com"
                       Port: 9092
          ConnectionTimeout: 30
             RequestTimeout: 61
              ImportOptions: "Import to MATLAB types"
              ExportOptions: "Source: function eventSchema"
              PublishSchema: "true"
                 WindowSize: 10
                KeyVariable: "key"
                KeyEncoding: "utf16"
                    KeyType: "text"
               KeyByteOrder: "BigEndian"
               BodyEncoding: "utf8"
                 BodyFormat: "JSON"
                  ReadLimit: "Size"
        TimestampResolution: "Milliseconds"

    使用该对象将事件流中的 10 条消息读取到时间表中。

    tt = readtimetable(ks)
    tt =
    
      10×11 timetable
    
             timestamp          vMotor    wMotor    Tmass     
        ____________________    ______    ______    ______    
    
        31-Oct-2020 00:00:00    1.0909         0        25            
        31-Oct-2020 00:00:00    1.1506     100.5     25.17            
        31-Oct-2020 00:00:00    1.1739     190.9    25.223             
        31-Oct-2020 00:00:00    1.1454    330.61     25.15             
        31-Oct-2020 00:00:00    1.1346    382.77    25.122           
        31-Oct-2020 00:00:00    1.1287    420.88    25.106             
        31-Oct-2020 00:00:00    1.1253    454.55    25.096             
        31-Oct-2020 00:00:00    1.1232     478.1     25.09            
        31-Oct-2020 00:00:00    1.1217    500.16    25.086    ...        

    版本历史记录

    在 R2022b 中推出