Apache Kafka
The Apache Kafka (Kafka) action sends messages directly to your Amazon Managed Streaming for Apache Kafka (Amazon MSK), Apache Kafka clusters managed by third-party
providers such as Confluent Cloud
Note
This topic assumes familiarity with the Apache Kafka platform and related
concepts. For more information about Apache Kafka, see Apache Kafka
Requirements
This rule action has the following requirements:
-
An IAM role that AWS IoT can assume to perform the
ec2:CreateNetworkInterface
,ec2:DescribeNetworkInterfaces
,ec2:CreateNetworkInterfacePermission
,ec2:DeleteNetworkInterface
,ec2:DescribeSubnets
,ec2:DescribeVpcs
,ec2:DescribeVpcAttribute
, andec2:DescribeSecurityGroups
operations. This role creates and manages elastic network interfaces to your Amazon Virtual Private Cloud to reach your Kafka broker. For more information, see Granting an AWS IoT rule the access it requires.In the AWS IoT console, you can choose or create a role to allow AWS IoT Core to perform this rule action.
For more information about network interfaces, see Elastic network interfaces in the Amazon EC2 User Guide.
The policy attached to the role that you specify should look like the following example.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
-
If you use AWS Secrets Manager to store the credentials required to connect to your Kafka broker, you must create an IAM role that AWS IoT Core can assume to perform the
secretsmanager:GetSecretValue
andsecretsmanager:DescribeSecret
operations.The policy attached to the role that you specify should look like the following example.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] } -
You can run your Apache Kafka clusters inside Amazon Virtual Private Cloud (Amazon VPC). You must create an Amazon VPC destination and use an NAT gateway in your subnets to forward messages from AWS IoT to a public Kafka cluster. The AWS IoT rules engine creates a network interface in each of the subnets listed in the VPC destination to route traffic directly to the VPC. When you create a VPC destination, the AWS IoT rules engine automatically creates a VPC rule action. For more information about VPC rule actions, see Virtual private cloud (VPC) destinations.
-
If you use a customer managed AWS KMS key (KMS key) to encrypt data at rest, the service must have permission to use the KMS key on the caller's behalf. For more information, see Amazon MSK encryption in the Amazon Managed Streaming for Apache Kafka Developer Guide.
Parameters
When you create an AWS IoT rule with this action, you must specify the following information:
- destinationArn
-
The Amazon Resource Name (ARN) of the VPC destination. For information about creating a VPC destination, see Virtual private cloud (VPC) destinations.
- topic
-
The Kafka topic for messages to be sent to the Kafka broker.
You can substitute this field using a substitution template. For more information, see Substitution templates.
- key (optional)
-
The Kafka message key.
You can substitute this field using a substitution template. For more information, see Substitution templates.
- headers (optional)
-
The list of Kafka headers that you specify. Each header is a key-value pair that you can specify when you create a Kafka action. You can use these headers to route data from IoT clients to downstream Kafka clusters without modifying your message payload.
You can substitute this field using a substitution template. To understand how to pass an inline Rule's function as a substitution template in Kafka Action's header, see Examples. For more information, see Substitution templates.
Note
Headers in binary format are not supported.
- partition (optional)
-
The Kafka message partition.
You can substitute this field using a substitution template. For more information, see Substitution templates.
- clientProperties
-
An object that defines the properties of the Apache Kafka producer client.
- acks (optional)
-
The number of acknowledgments the producer requires the server to have received before considering a request complete.
If you specify 0 as the value, the producer won't wait for any acknowledgment from the server. If the server doesn't receive the message, the producer won't retry to send the message.
Valid values:
-1
,0
,1
,all
. The default value is1
. - bootstrap.servers
-
A list of host and port pairs (for example,
host1:port1
,host2:port2
) used to establish the initial connection to your Kafka cluster. - compression.type (optional)
-
The compression type for all data generated by the producer.
Valid values:
none
,gzip
,snappy
,lz4
,zstd
. The default value isnone
. - security.protocol
-
The security protocol used to attach to your Kafka broker.
Valid values:
SSL
,SASL_SSL
. The default value isSSL
. - key.serializer
-
Specifies how to turn the key objects that you provide with the
ProducerRecord
into bytes.Valid value:
StringSerializer
. - value.serializer
-
Specifies how to turn value objects that you provide with the
ProducerRecord
into bytes.Valid value:
ByteBufferSerializer
. - ssl.truststore
-
The truststore file in base64 format or the location of the truststore file in AWS Secrets Manager. This value isn't required if your truststore is trusted by Amazon certificate authorities (CA).
This field supports substitution templates. If you use Secrets Manager to store the credentials required to connect to your Kafka broker, you can use the
get_secret
SQL function to retrieve the value for this field. For more information about substitution templates, see Substitution templates. For more information about theget_secret
SQL function, see get_secret(secretId, secretType, key, roleArn). If the truststore is in the form of a file, use theSecretBinary
parameter. If the truststore is in the form of a string, use theSecretString
parameter.The maximum size of this value is 65 KB.
- ssl.truststore.password
-
The password for the truststore. This value is required only if you've created a password for the truststore.
- ssl.keystore
-
The keystore file. This value is required when you specify
SSL
as the value forsecurity.protocol
.This field supports substitution templates. Use Secrets Manager to store the credentials required to connect to your Kafka broker. To retrieve the value for this field, use the
get_secret
SQL function. For more information about substitution templates, see Substitution templates. For more information about theget_secret
SQL function, see get_secret(secretId, secretType, key, roleArn). Use theSecretBinary
parameter. - ssl.keystore.password
-
The store password for the keystore file. This value is required if you specify a value for
ssl.keystore
.The value of this field can be plaintext . This field also supports substitution templates. Use Secrets Manager to store the credentials required to connect to your Kafka broker. To retrieve the value for this field, use the
get_secret
SQL function. For more information about substitution templates, see Substitution templates. For more information about theget_secret
SQL function, see get_secret(secretId, secretType, key, roleArn). Use theSecretString
parameter. - ssl.key.password
-
The password of the private key in your keystore file.
This field supports substitution templates. Use Secrets Manager to store the credentials required to connect to your Kafka broker. To retrieve the value for this field, use the
get_secret
SQL function. For more information about substitution templates, see Substitution templates. For more information about theget_secret
SQL function, see get_secret(secretId, secretType, key, roleArn). Use theSecretString
parameter. - sasl.mechanism
-
The security mechanism used to connect to your Kafka broker. This value is required when you specify
SASL_SSL
forsecurity.protocol
.Valid values:
PLAIN
,SCRAM-SHA-512
,GSSAPI
.Note
SCRAM-SHA-512
is the only supported security mechanism in the cn-north-1, cn-northwest-1, us-gov-east-1, and us-gov-west-1 Regions. - sasl.plain.username
-
The username used to retrieve the secret string from Secrets Manager. This value is required when you specify
SASL_SSL
forsecurity.protocol
andPLAIN
forsasl.mechanism
. - sasl.plain.password
-
The password used to retrieve the secret string from Secrets Manager. This value is required when you specify
SASL_SSL
forsecurity.protocol
andPLAIN
forsasl.mechanism
. - sasl.scram.username
-
The username used to retrieve the secret string from Secrets Manager. This value is required when you specify
SASL_SSL
forsecurity.protocol
andSCRAM-SHA-512
forsasl.mechanism
. - sasl.scram.password
-
The password used to retrieve the secret string from Secrets Manager. This value is required when you specify
SASL_SSL
forsecurity.protocol
andSCRAM-SHA-512
forsasl.mechanism
. - sasl.kerberos.keytab
-
The keytab file for Kerberos authentication in Secrets Manager. This value is required when you specify
SASL_SSL
forsecurity.protocol
andGSSAPI
forsasl.mechanism
.This field supports substitution templates. Use Secrets Manager to store the credentials required to connect to your Kafka broker. To retrieve the value for this field, use the
get_secret
SQL function. For more information about substitution templates, see Substitution templates. For more information about theget_secret
SQL function, see get_secret(secretId, secretType, key, roleArn). Use theSecretBinary
parameter. - sasl.kerberos.service.name
-
The Kerberos principal name under which Apache Kafka runs. This value is required when you specify
SASL_SSL
forsecurity.protocol
andGSSAPI
forsasl.mechanism
. - sasl.kerberos.krb5.kdc
-
The hostname of the key distribution center (KDC) to which your Apache Kafka producer client connects. This value is required when you specify
SASL_SSL
forsecurity.protocol
andGSSAPI
forsasl.mechanism
. - sasl.kerberos.krb5.realm
-
The realm to which your Apache Kafka producer client connects. This value is required when you specify
SASL_SSL
forsecurity.protocol
andGSSAPI
forsasl.mechanism
. - sasl.kerberos.principal
-
The unique Kerberos identity to which Kerberos can assign tickets to access Kerberos-aware services. This value is required when you specify
SASL_SSL
forsecurity.protocol
andGSSAPI
forsasl.mechanism
.
Examples
The following JSON example defines an Apache Kafka action in an AWS IoT rule. The following example passes the sourceIp() inline function as a substitution template in the Kafka Action header.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Important notes about your Kerberos setup
-
Your key distribution center (KDC) must be resolvable through private Domain Name System (DNS) within your target VPC. One possible approach is to add the KDC DNS entry to a private hosted zone. For more information about this approach, see Working with private hosted zones.
-
Each VPC must have DNS resolution enabled. For more information, see Using DNS with your VPC.
-
Network interface security groups and instance-level security groups in the VPC destination must allow traffic from within your VPC on the following ports.
-
TCP traffic on the bootstrap broker listener port (often 9092, but must be within the 9000–9100 range)
-
TCP and UDP traffic on port 88 for the KDC
-
-
SCRAM-SHA-512
is the only supported security mechanism in the cn-north-1, cn-northwest-1, us-gov-east-1, and us-gov-west-1 Regions.