主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

连接到安全 Kafka 集群

要管理事件流处理任务,Streaming Data Framework for MATLAB® Production Server™ 需要配置信息。例如,要连接到安全的 Kafka® 集群,框架必须知道要使用的安全协议和 SSL 证书。您可以通过在创建流连接器对象时设置提供程序属性来提供此信息。创建对象后,配置属性是只读的。这些属性在桌面开发期间使用,然后收集起来部署到生产中。

您可以使用流和流处理对象的两种类型的属性来提供配置信息:

  • 命名对象属性- 创建与流交互的 MATLAB 对象所需的属性,例如 KafkaStream 对象的 ConnectionTimeout 属性。

  • 第三方提供程序属性 - 不是流处理数据框架中的 MATLAB 对象的属性,例如 retention.ms Kafka 属性或连接到安全 Kafka 集群所需的属性(例如 security.protocolssl.truststore.type)。

Kafka 提供程序属性

当您创建 KafkaStream 对象以连接到 Kafka 主机时,使用一个或多个 propname,propval 输入参量对指定 Kafka 提供程序属性及其相应的值。使用单引号或双引号将 propname 括起来。您可以按任意顺序指定多个属性及其值,如 propname1,propval1、...、propnameN,propvalN。例如,kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512") 将 Kafka 属性 sasl.mechanism 设置为 SCRAM-SHA-512。有关 Kafka 属性的完整列表,请参阅 Kafka 文档中的 Kafka 配置。流处理框架为这些属性提供了一种传递机制,它们直接传递给 Kafka 配置机制而无需任何验证。

连接到安全 Kafka 集群

创建对象以连接到安全的 Kafka 集群时,您指定的 Kafka 属性会根据以下因素而有所不同:

  • Kafka 集群是否使用 TLS 或 SASL 进行保护

  • 无论使用对象从流中读取还是写入流

  • 是否在使用对象读取时,将 KafkaStream 对象的 Order 属性设置为 "EventTime""IngestTime"

从 SSL 保护的 Kafka 集群读取事件

创建从 Kafka 流读取的对象时,指定以下 Kafka 属性。

  • security.protocol - 将安全协议设置为 SSL

  • ssl.truststore.type - 将信任库文件的文件格式设置为 SSLJKS

  • ssl.truststore.location - 如果您的服务器证书不存在于系统信任库中,请设置信任库文件的位置。

例如,以下语法创建一个对象,用于从位于 SSL 安全集群中网络地址 recamanSum_data 的 Kafka 主机上的 kafka.host.com:9093 主题读取事件。

ks_read = kafkaStream("kafka.host.com",9093,"recamanSum_data", ... 
      "security.protocol","SSL","ssl.truststore.type","PEM", ...
      "ssl.truststore.location","mps-kafka.pem")

将事件写入 SSL 保护的 Kafka 集群

创建对象以写入流或从流中读取时,请指定以下 Kafka 属性(Order="IngestTime")。

  • security.protocol - 将安全协议设置为 SSL

  • ssl.ca.location - 设置证书颁发机构 (CA) 根证书的位置。

例如,以下语法创建一个对象,将事件写入位于 SSL 安全集群中网络地址 recamanSum_results 的 Kafka 主机上的 kafka.host.com:9093 主题。

outKS = kafkaStream("kafka.host.com",9093,"recamanSum_results", ... 
      "security.protocol","SSL", ...
      "ssl.ca.location","my-ssl-cert.pem");

从 SASL 安全的 Kafka 集群读取事件

要创建一个从 SASL 保护的 Kafka 集群读取的对象,需要设置 sasl.jaas.config Kafka 属性。sasl.jaas.config 属性的值很长、结构化且难以输入。为了更容易提供 sasl.jaas.config 值,框架提供了两个属性 sasl.usersasl.password,您可以设置它们。框架使用 sasl.jaas.configsasl.usersasl.passwordsecurity.protocol 的值来合成 sasl.mechanism 属性的值。

创建从流中读取的对象时指定以下 Kafka 属性。

  • security.protocol - 将安全协议设置为 SASL

  • ssl.truststore.type - 将信任库文件的文件格式设置为 SSLJKS

  • ssl.truststore.location - 如果您的服务器证书不存在于系统信任库中,请设置信任库文件的位置。

  • sasl.mechanism - 设置用于客户端连接的 SASL 机制。

  • sasl.user - 设置 SASL 授权的用户名。

  • sasl.password - 为 sasl.user 设置 SASL 密码。

例如,以下语法创建一个对象,用于从位于 SASL 安全集群中网络地址 recamanSum_data 的 Kafka 主机上的 kafka.host.com:9093 主题读取事件。

inKS_sasl = kafkaStream("kafka.host.com",9093,"recamanSum_data", ...
      "security.protocol","SASL_SSL", 
      "ssl.truststore.type","PEM",...
      "ssl.truststore.location","my-ssl-cert.pem", ...
      "sasl.mechanism","SCRAM-SHA-512", ...
      "sasl.user","sasl-consumer", ...
      "sasl.password","apachekafka")

将事件写入 SASL 保护的 Kafka 集群

创建对象以写入流或从流中读取时,请指定以下 Kafka 属性(Order="IngestTime")。

  • security.protocol - 将安全协议设置为 SASL

  • ssl.ca.location - 设置 CA 根证书的位置。

  • sasl.mechanism - 设置用于客户端连接的 SASL 机制。

  • sasl.user - 设置 SASL 授权的用户名。

  • sasl.password - 为 sasl.user 设置 SASL 密码。

例如,以下语法创建一个对象,将事件写入位于 SASL 安全集群中网络地址 recamanSum_results 的 Kafka 主机上的 kafka.host.com:9093 主题。

outKS_sasl = kafkaStream("kafka.host.com",9093,"recamanSum_results", ...
        "security.protocol","SASL_SSL", ...
        "ssl.ca.location","my-ssl-cert.pem", ...
        "sasl.mechanism","SCRAM-SHA-512", ...
        "sasl.user","sasl-producer", ...
        "sasl.password","apachekafka")

客户端身份验证

要启用客户端身份验证,您必须将 ssl.keystore.location 属性设置为客户端证书的位置,即客户端必须发送到服务器的证书。如果您的服务器或客户端证书受密码保护,您可能还需要设置 ssl.truststore.password 属性和 ssl.keystore.password 属性。

另请参阅

| | |

主题

外部网站