连接到安全 Kafka 集群
要管理事件流处理任务,Streaming Data Framework for MATLAB® Production Server™ 需要配置信息。例如,要连接到安全的 Kafka® 集群,框架必须知道要使用的安全协议和 SSL 证书。您可以通过在创建流连接器对象时设置提供程序属性来提供此信息。创建对象后,配置属性是只读的。这些属性在桌面开发期间使用,然后收集起来部署到生产中。
您可以使用流和流处理对象的两种类型的属性来提供配置信息:
- 命名对象属性- 创建与流交互的 MATLAB 对象所需的属性,例如 - KafkaStream对象的- ConnectionTimeout属性。
- 第三方提供程序属性 - 不是流处理数据框架中的 MATLAB 对象的属性,例如 - retention.msKafka 属性或连接到安全 Kafka 集群所需的属性(例如- security.protocol和- ssl.truststore.type)。
Kafka 提供程序属性
当您创建 KafkaStream 对象以连接到 Kafka 主机时,使用一个或多个 propname,propval 输入参量对指定 Kafka 提供程序属性及其相应的值。使用单引号或双引号将 propname 括起来。您可以按任意顺序指定多个属性及其值,如 propname1,propval1、...、propnameN,propvalN。例如,kafkaStream(host,port,topic,"sasl.mechanism","SCRAM-SHA-512") 将 Kafka 属性 sasl.mechanism 设置为 SCRAM-SHA-512。有关 Kafka 属性的完整列表,请参阅 Kafka 文档中的 Kafka 配置。流处理框架为这些属性提供了一种传递机制,它们直接传递给 Kafka 配置机制而无需任何验证。
连接到安全 Kafka 集群
创建对象以连接到安全的 Kafka 集群时,您指定的 Kafka 属性会根据以下因素而有所不同:
- Kafka 集群是否使用 TLS 或 SASL 进行保护 
- 无论使用对象从流中读取还是写入流 
- 是否在使用对象读取时,将 - KafkaStream对象的- Order属性设置为- "EventTime"或- "IngestTime"。
从 SSL 保护的 Kafka 集群读取事件
创建从 Kafka 流读取的对象时,指定以下 Kafka 属性。
- security.protocol- 将安全协议设置为- SSL。
- ssl.truststore.type- 将信任库文件的文件格式设置为- SSL或- JKS。
- ssl.truststore.location- 如果您的服务器证书不存在于系统信任库中,请设置信任库文件的位置。
例如,以下语法创建一个对象,用于从位于 SSL 安全集群中网络地址 recamanSum_data 的 Kafka 主机上的 kafka.host.com:9093 主题读取事件。 
ks_read = kafkaStream("kafka.host.com",9093,"recamanSum_data", ... "security.protocol","SSL","ssl.truststore.type","PEM", ... "ssl.truststore.location","mps-kafka.pem")
将事件写入 SSL 保护的 Kafka 集群
创建对象以写入流或从流中读取时,请指定以下 Kafka 属性(Order="IngestTime")。
- security.protocol- 将安全协议设置为- SSL。
- ssl.ca.location- 设置证书颁发机构 (CA) 根证书的位置。
例如,以下语法创建一个对象,将事件写入位于 SSL 安全集群中网络地址 recamanSum_results 的 Kafka 主机上的 kafka.host.com:9093 主题。
outKS = kafkaStream("kafka.host.com",9093,"recamanSum_results", ... "security.protocol","SSL", ... "ssl.ca.location","my-ssl-cert.pem");
从 SASL 安全的 Kafka 集群读取事件
要创建一个从 SASL 保护的 Kafka 集群读取的对象,需要设置 sasl.jaas.config Kafka 属性。sasl.jaas.config 属性的值很长、结构化且难以输入。为了更容易提供 sasl.jaas.config 值,框架提供了两个属性 sasl.user 和 sasl.password,您可以设置它们。框架使用 sasl.jaas.config、sasl.user、sasl.password 和 security.protocol 的值来合成 sasl.mechanism 属性的值。
创建从流中读取的对象时指定以下 Kafka 属性。
- security.protocol- 将安全协议设置为- SASL。
- ssl.truststore.type- 将信任库文件的文件格式设置为- SSL或- JKS。
- ssl.truststore.location- 如果您的服务器证书不存在于系统信任库中,请设置信任库文件的位置。
- sasl.mechanism- 设置用于客户端连接的 SASL 机制。
- sasl.user- 设置 SASL 授权的用户名。
- sasl.password- 为- sasl.user设置 SASL 密码。
例如,以下语法创建一个对象,用于从位于 SASL 安全集群中网络地址 recamanSum_data 的 Kafka 主机上的 kafka.host.com:9093 主题读取事件。
inKS_sasl = kafkaStream("kafka.host.com",9093,"recamanSum_data", ... "security.protocol","SASL_SSL", "ssl.truststore.type","PEM",... "ssl.truststore.location","my-ssl-cert.pem", ... "sasl.mechanism","SCRAM-SHA-512", ... "sasl.user","sasl-consumer", ... "sasl.password","apachekafka")
将事件写入 SASL 保护的 Kafka 集群
创建对象以写入流或从流中读取时,请指定以下 Kafka 属性(Order="IngestTime")。
- security.protocol- 将安全协议设置为- SASL。
- ssl.ca.location- 设置 CA 根证书的位置。
- sasl.mechanism- 设置用于客户端连接的 SASL 机制。
- sasl.user- 设置 SASL 授权的用户名。
- sasl.password- 为- sasl.user设置 SASL 密码。
例如,以下语法创建一个对象,将事件写入位于 SASL 安全集群中网络地址 recamanSum_results 的 Kafka 主机上的 kafka.host.com:9093 主题。
outKS_sasl = kafkaStream("kafka.host.com",9093,"recamanSum_results", ... "security.protocol","SASL_SSL", ... "ssl.ca.location","my-ssl-cert.pem", ... "sasl.mechanism","SCRAM-SHA-512", ... "sasl.user","sasl-producer", ... "sasl.password","apachekafka")
客户端身份验证
要启用客户端身份验证,您必须将 ssl.keystore.location 属性设置为客户端证书的位置,即客户端必须发送到服务器的证书。如果您的服务器或客户端证书受密码保护,您可能还需要设置 ssl.truststore.password 属性和 ssl.keystore.password 属性。
另请参阅
getProviderProperties | categoryList | isProperty | kafkaStream