设置 Amazon S3 接收器连接器 - Amazon Managed Streaming for Apache Kafka

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

设置 Amazon S3 接收器连接器

此示例说明如何使用 Confluent Amazon S3 接收器连接器和在 MSK Conn AWS CLI ect 中创建 Amazon S3 接收器连接器。

  1. 复制以下 JSON 并将其粘贴到新文件中。将占位符字符串替换为与 Amazon MSK 集群的引导服务器连接字符串以及集群的子网和安全组相对应的值。 IDs有关如何设置服务执行角色的信息,请参阅 IAMC MSK onnect 的角色和策略

    { "connectorConfiguration": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "s3.region": "us-east-1", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "flush.size": "1", "schema.compatibility": "NONE", "topics": "my-test-topic", "tasks.max": "2", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "s3.bucket.name": "amzn-s3-demo-bucket" }, "connectorName": "example-S3-sink-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<cluster-security-group-id>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 4 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-a-role-that-msk-connect-can-assume>", "plugins": [ { "customPlugin": { "customPluginArn": "<arn-of-custom-plugin-that-contains-connector-code>", "revision": 1 } } ], "kafkaClusterEncryptionInTransit": {"encryptionType": "PLAINTEXT"}, "kafkaClusterClientAuthentication": {"authenticationType": "NONE"} }
  2. 在上一步中保存 JSON 文件的文件夹中运行以下 AWS CLI 命令。

    aws kafkaconnect create-connector --cli-input-json file://connector-info.json

    以下是您在成功运行命令后获得的输出示例。

    { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-S3-sink-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-S3-sink-connector" }