

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Apache Kafka
<a name="apache-kafka-rule-action"></a>

[Tindakan Apache Kafka (Kafka) mengirim pesan langsung ke Amazon [Managed Streaming for Apache Kafka (Amazon MSK), cluster Apache Kafka yang dikelola](https://docs.aws.amazon.com//msk/latest/developerguide/what-is-msk.html) oleh penyedia pihak ketiga seperti Confluent Cloud, atau cluster Apache Kafka yang dikelola sendiri.](https://www.confluent.io/) Dengan tindakan aturan Kafka, Anda dapat merutekan data IoT Anda ke cluster Kafka. Ini memungkinkan Anda membangun jaringan data berkinerja tinggi untuk berbagai tujuan, seperti analitik streaming, integrasi data, visualisasi, dan aplikasi bisnis yang sangat penting.

**catatan**  
Topik ini mengasumsikan keakraban dengan platform Apache Kafka dan konsep terkait. Untuk informasi lebih lanjut tentang Apache Kafka, lihat [Apache](https://kafka.apache.org/) Kafka. [MSK Tanpa Server tidak didukung](https://docs.aws.amazon.com//msk/latest/developerguide/serverless.html). MSK Kluster tanpa server hanya dapat dilakukan melalui otentikasi IAM, yang saat ini tidak didukung oleh tindakan aturan Apache Kafka. Untuk informasi selengkapnya tentang cara mengonfigurasi AWS IoT Core dengan Confluent, lihat [Memanfaatkan Konfluen dan AWS Memecahkan Tantangan Manajemen Perangkat dan Data IoT](https://aws.amazon.com/blogs/apn/leveraging-confluent-and-aws-to-solve-iot-device-and-data-management-challenges/).

## Persyaratan
<a name="apache-kafka-rule-action-requirements"></a>

Tindakan aturan ini memiliki persyaratan sebagai berikut:
+ Peran IAM yang AWS IoT dapat diasumsikan untuk melakukan`ec2:CreateNetworkInterface`,`ec2:DescribeNetworkInterfaces`,`ec2:CreateNetworkInterfacePermission`,`ec2:DeleteNetworkInterface`,`ec2:DescribeSubnets`, `ec2:DescribeVpcs``ec2:DescribeVpcAttribute`, dan `ec2:DescribeSecurityGroups` operasi. Peran ini menciptakan dan mengelola antarmuka jaringan elastis ke Amazon Virtual Private Cloud Anda untuk menjangkau broker Kafka Anda. Untuk informasi selengkapnya, lihat [Memberikan AWS IoT aturan akses yang dibutuhkannya](iot-create-role.md).

  Di AWS IoT konsol, Anda dapat memilih atau membuat peran untuk memungkinkan AWS IoT Core untuk melakukan tindakan aturan ini. 

  Untuk informasi selengkapnya tentang antarmuka jaringan, lihat [Antarmuka jaringan elastis](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-eni.html) di Panduan Pengguna *Amazon EC2*.

  Kebijakan yang dilampirkan pada peran yang Anda tentukan akan terlihat seperti contoh berikut.  
****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "ec2:CreateNetworkInterface",
              "ec2:DescribeNetworkInterfaces",
              "ec2:CreateNetworkInterfacePermission",
              "ec2:DeleteNetworkInterface",
              "ec2:DescribeSubnets",
              "ec2:DescribeVpcs",
              "ec2:DescribeVpcAttribute",
              "ec2:DescribeSecurityGroups"
              ],
              "Resource": "*"
          }
      ]
  }
  ```
+ Jika Anda menggunakan AWS Secrets Manager untuk menyimpan kredensyal yang diperlukan untuk terhubung ke broker Kafka Anda, Anda harus membuat peran IAM yang AWS IoT Core dapat diasumsikan untuk melakukan dan operasi. `secretsmanager:GetSecretValue` `secretsmanager:DescribeSecret`

  Kebijakan yang dilampirkan pada peran yang Anda tentukan akan terlihat seperti contoh berikut.  
****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "secretsmanager:GetSecretValue",
                  "secretsmanager:DescribeSecret"
              ],
              "Resource": [
                  "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_client_truststore-*",
                  "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_keytab-*"
              ]
          }
      ]
  }
  ```
+ Anda dapat menjalankan cluster Apache Kafka Anda di dalam Amazon Virtual Private Cloud (Amazon VPC). Anda harus membuat tujuan Apache Kafka Virtual Private Cloud (VPC) Virtual Cloud (VPC) dan menggunakan gateway NAT di subnet Anda untuk meneruskan pesan dari ke kluster Kafka publik. AWS IoT Mesin AWS IoT aturan membuat antarmuka jaringan di setiap subnet yang tercantum di tujuan untuk mengarahkan lalu lintas langsung ke VPC. Saat tujuan Anda, mesin AWS IoT aturan secara otomatis membuat tindakan aturan VPC. Untuk informasi selengkapnya tentang tindakan aturan VPC, lihat. [Tujuan Apache Kafka Virtual Private Cloud (VPC)](kafka-vpc-destination.md)
+ Jika Anda menggunakan pelanggan yang dikelola AWS KMS key (kunci KMS) untuk mengenkripsi data saat istirahat, layanan harus memiliki izin untuk menggunakan kunci KMS atas nama pemanggil. Untuk informasi selengkapnya, lihat [enkripsi MSK Amazon di Panduan Pengembang](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html) *Amazon Managed Streaming for Apache* Kafka Kafka.

## Parameter
<a name="apache-kafka-rule-action-parameters"></a>

Saat Anda membuat AWS IoT aturan dengan tindakan ini, Anda harus menentukan informasi berikut:

Destinasi ARN  
Nama Sumber Daya Amazon (ARN) dari tujuan Apache Kafka Virtual Private Cloud (VPC). Untuk informasi tentang membuat tujuan, lihat[Tujuan Apache Kafka Virtual Private Cloud (VPC)](kafka-vpc-destination.md).

topik  
Topik Kafka untuk pesan yang akan dikirim ke broker Kafka.  
Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk informasi selengkapnya, lihat [Templat substitusi](iot-substitution-templates.md). 

kunci (opsional)  
Kunci pesan Kafka.  
Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk informasi selengkapnya, lihat [Templat substitusi](iot-substitution-templates.md). 

header (opsional)  
Daftar header Kafka yang Anda tentukan. Setiap header adalah pasangan kunci-nilai yang dapat Anda tentukan saat Anda membuat tindakan Kafka. Anda dapat menggunakan header ini untuk merutekan data dari klien IoT ke hilir kluster Kafka tanpa mengubah payload pesan Anda.  
Anda dapat mengganti bidang ini menggunakan templat substitusi. [Untuk memahami cara meneruskan fungsi Aturan sebaris sebagai templat substitusi di header Kafka Action, lihat Contoh.](#apache-kafka-rule-action-examples) Untuk informasi selengkapnya, lihat [Templat substitusi](iot-substitution-templates.md).  
Header dalam format biner tidak didukung.

partisi (opsional)  
Partisi pesan Kafka.  
Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk informasi selengkapnya, lihat [Templat substitusi](iot-substitution-templates.md).

ClientProperties  
Objek yang mendefinisikan properti klien produsen Apache Kafka.    
acks (opsional)  
Jumlah ucapan terima kasih yang harus diterima oleh produsen sebelum mempertimbangkan permintaan lengkap.  
Jika Anda menentukan 0 sebagai nilai, produsen tidak akan menunggu pengakuan apa pun dari server. Jika server tidak menerima pesan, produser tidak akan mencoba lagi untuk mengirim pesan.  
Nilai yang valid:`-1`,`0`,`1`,`all`. Nilai default-nya adalah `1`.  
bootstrap.servers  
Daftar pasangan host dan port (misalnya,`host1:port1`,`host2:port2`) yang digunakan untuk membuat koneksi awal ke cluster Kafka Anda.  
kompresi.type (opsional)  
Jenis kompresi untuk semua data yang dihasilkan oleh produsen.  
Nilai yang valid:`none`,`gzip`,`snappy`,`lz4`,`zstd`. Nilai default-nya adalah `none`.  
security.protocol  
Protokol keamanan yang digunakan untuk melampirkan ke broker Kafka Anda.  
Nilai-nilai yang valid: `SSL`, `SASL_SSL`. Nilai default-nya adalah `SSL`.  
key.serializer  
Menentukan cara mengubah objek kunci yang Anda berikan dengan `ProducerRecord` menjadi byte.  
Nilai valid: `StringSerializer`.  
value.serializer  
Menentukan cara mengubah objek nilai yang Anda berikan dengan `ProducerRecord` menjadi byte.  
Nilai valid: `ByteBufferSerializer`.  
ssl.truststore  
File truststore dalam format base64 atau lokasi file truststore di. [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/) Nilai ini tidak diperlukan jika truststore Anda dipercaya oleh otoritas sertifikat Amazon (CA).  
Bidang ini mendukung template substitusi. Jika Anda menggunakan Secrets Manager untuk menyimpan kredensyal yang diperlukan untuk terhubung ke broker Kafka Anda, Anda dapat menggunakan fungsi `get_secret` SQL untuk mengambil nilai untuk bidang ini. Untuk informasi selengkapnya tentang templat substitusi, lihat[Templat substitusi](iot-substitution-templates.md). Untuk informasi selengkapnya tentang fungsi `get_secret` SQL, lihat[get\$1secret (secretID, secretType, kunci, roLearn)](iot-sql-functions.md#iot-sql-function-get-secret). Jika truststore dalam bentuk file, gunakan parameternya`SecretBinary`. Jika truststore dalam bentuk string, gunakan parameternya`SecretString`.  
Ukuran maksimum nilai ini adalah 65 KB.  
ssl.truststore.password  
Kata sandi untuk truststore. Nilai ini diperlukan hanya jika Anda telah membuat kata sandi untuk truststore.  
ssl.keystore  
File keystore. Nilai ini diperlukan saat Anda menentukan `SSL` sebagai nilai untuk`security.protocol`.  
Bidang ini mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensil yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan fungsi `get_secret` SQL. Untuk informasi selengkapnya tentang templat substitusi, lihat[Templat substitusi](iot-substitution-templates.md). Untuk informasi selengkapnya tentang fungsi `get_secret` SQL, lihat[get\$1secret (secretID, secretType, kunci, roLearn)](iot-sql-functions.md#iot-sql-function-get-secret). Gunakan parameter `SecretBinary`.  
ssl.keystore.password  
Kata sandi toko untuk file keystore. Nilai ini diperlukan jika Anda menentukan nilai untuk`ssl.keystore`.  
Nilai bidang ini bisa berupa plaintext. Bidang ini juga mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensil yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan fungsi `get_secret` SQL. Untuk informasi selengkapnya tentang templat substitusi, lihat[Templat substitusi](iot-substitution-templates.md). Untuk informasi selengkapnya tentang fungsi `get_secret` SQL, lihat[get\$1secret (secretID, secretType, kunci, roLearn)](iot-sql-functions.md#iot-sql-function-get-secret). Gunakan parameter `SecretString`.  
ssl.key.password  
Kata sandi kunci pribadi di file keystore Anda.  
Bidang ini mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensil yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan fungsi `get_secret` SQL. Untuk informasi selengkapnya tentang templat substitusi, lihat[Templat substitusi](iot-substitution-templates.md). Untuk informasi selengkapnya tentang fungsi `get_secret` SQL, lihat[get\$1secret (secretID, secretType, kunci, roLearn)](iot-sql-functions.md#iot-sql-function-get-secret). Gunakan parameter `SecretString`.  
sasl.mekanisme  
Mekanisme keamanan yang digunakan untuk terhubung ke broker Kafka Anda. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk`security.protocol`.  
Nilai-nilai yang valid: `PLAIN`, `SCRAM-SHA-512`, `GSSAPI`.  
`SCRAM-SHA-512`adalah satu-satunya mekanisme keamanan yang didukung di Wilayah cn-north-1, cn-northwest-1, -1, dan -1. us-gov-east us-gov-west  
sasl.plain.username  
Nama pengguna yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `PLAIN` untuk`sasl.mechanism`.  
sasl.plain.password  
Kata sandi yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `PLAIN` untuk`sasl.mechanism`.  
sasl.scram.username  
Nama pengguna yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `SCRAM-SHA-512` untuk`sasl.mechanism`.  
sasl.scram.password  
Kata sandi yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `SCRAM-SHA-512` untuk`sasl.mechanism`.  
sasl.kerberos.keytab  
File keytab untuk otentikasi Kerberos di Secrets Manager. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `GSSAPI` untuk`sasl.mechanism`.  
Bidang ini mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensil yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan fungsi `get_secret` SQL. Untuk informasi selengkapnya tentang templat substitusi, lihat[Templat substitusi](iot-substitution-templates.md). Untuk informasi selengkapnya tentang fungsi `get_secret` SQL, lihat[get\$1secret (secretID, secretType, kunci, roLearn)](iot-sql-functions.md#iot-sql-function-get-secret). Gunakan parameter `SecretBinary`.  
sasl.kerberos.service.name  
Nama utama Kerberos di mana Apache Kafka berjalan. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `GSSAPI` untuk`sasl.mechanism`.  
sasl.kerberos.krb5.kdc  
Nama host dari pusat distribusi kunci (KDC) yang terhubung dengan klien produsen Apache Kafka Anda. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `GSSAPI` untuk`sasl.mechanism`.  
sasl.kerberos.krb5.realm  
Ranah yang terhubung dengan klien produser Apache Kafka Anda. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `GSSAPI` untuk`sasl.mechanism`.  
sasl.kerberos.principal  
Identitas unik Kerberos tempat Kerberos dapat menetapkan tiket untuk mengakses layanan yang sadar Kerberos. Nilai ini diperlukan saat Anda menentukan `SASL_SSL` untuk `security.protocol` dan `GSSAPI` untuk`sasl.mechanism`.

## Contoh
<a name="apache-kafka-rule-action-examples"></a>

Contoh JSON berikut mendefinisikan tindakan Apache Kafka dalam sebuah aturan. AWS IoT Contoh berikut meneruskan fungsi inline [sourceIP (](iot-sql-functions.md#iot-function-sourceip)) sebagai template [substitusi](https://docs.aws.amazon.com//iot/latest/developerguide/iot-substitution-templates.html) di header Kafka Action.

```
{
	"topicRulePayload": {
		"sql": "SELECT * FROM 'some/topic'",
		"ruleDisabled": false,
		"awsIotSqlVersion": "2016-03-23",
		"actions": [
			{
				"kafka": {
					"destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN",
					"topic": "TopicName",
					"clientProperties": {
						"bootstrap.servers": "kafka.com:9092",
						"security.protocol": "SASL_SSL",
						"ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}",
						"ssl.truststore.password": "kafka password",
						"sasl.mechanism": "GSSAPI",
						"sasl.kerberos.service.name": "kafka",
						"sasl.kerberos.krb5.kdc": "kerberosdns.com",
						"sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}",
						"sasl.kerberos.krb5.realm": "KERBEROSREALM",
						"sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com"
					},
					"headers": [
						{
							"key": "static_header_key",
							"value": "static_header_value"
						},
						{
							"key": "substitutable_header_key",
							"value": "${value_from_payload}"
						},
						{
							"key": "source_ip",
							"value": "${sourceIp()}"
						}
					]
				}
			}
		]
	}
}
```

**Catatan penting tentang pengaturan Kerberos Anda**
+ Pusat distribusi kunci (KDC) Anda harus dapat diselesaikan melalui Private Domain Name System (DNS) dalam VPC target Anda. Salah satu pendekatan yang mungkin adalah menambahkan entri DNS KDC ke zona host pribadi. Untuk informasi selengkapnya tentang pendekatan ini, lihat [Bekerja dengan zona yang dihosting pribadi](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/hosted-zones-private.html).
+ Setiap VPC harus memiliki resolusi DNS yang diaktifkan. Untuk informasi selengkapnya, lihat [Menggunakan DNS dengan VPC Anda](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html).
+ Grup keamanan antarmuka jaringan dan grup keamanan tingkat instans di tujuan VPC harus mengizinkan lalu lintas dari dalam VPC Anda pada port berikut.
  + Lalu lintas TCP pada port pendengar broker bootstrap (seringkali 9092, tetapi harus berada dalam kisaran 9000—9100)
  + Lalu lintas TCP dan UDP di port 88 untuk KDC
+ `SCRAM-SHA-512`adalah satu-satunya mekanisme keamanan yang didukung di Wilayah cn-north-1, cn-northwest-1, -1, dan -1. us-gov-east us-gov-west