适用于 Java 的 Amazon SNS 扩展型客户端库 - Amazon Simple Notification Service

适用于 Java 的 Amazon SNS 扩展型客户端库

先决条件

以下是使用适用于 Java 的 Amazon SNS 扩展型客户端库的先决条件:

  • AWS 开发工具包。

    本页面上的示例使用 AWS Java 开发工具包。要安装和设置开发工具包,请参阅 AWS SDK for Java 开发人员指南中的设置 AWS SDK for Java

  • 具有合适凭证的 AWS 账户。

    要创建 AWS 账户,导航到 AWS 主页,然后选择 Create an AWS Account(创建 Amazon 账户)。按照说明进行操作。

    有关凭证的信息,请参阅 AWS SDK for Java 开发人员指南中的设置用于开发的 AWS 凭证和区域

  • Java 8 或更高版本。

  • 适用于 Java 的 Amazon SNS 扩展型客户端库(也可从 Maven 中获得)。

配置消息存储

Amazon SNS 扩展型客户端库使用 AWS 的有效负载卸载 Java 公共库,以进行消息存储和检索。您可以配置以下 Amazon S3 消息存储选项

  • 自定义消息大小阈值 – 具有超过此大小的负载和属性的消息将自动存储在 Amazon S3 中。

  • alwaysThroughS3 标志 – 将此值设置为 true 以强制将所有消息负载存储在 Amazon S3 中。例如:

    SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME).withAlwaysThroughS3(true);
  • 自定义 KMS 密钥 – 用于 Amazon S3 存储桶中的服务器端加密的密钥。

  • 存储桶名称 – 用于存储消息负载的 Amazon S3 存储桶的名称。

示例:使用存储在 Amazon S3 中的负载将消息发布到 Amazon SNS

以下代码示例展示了如何:

  • 创建示例主题和队列。

  • 订阅队列以接收来自主题的消息。

  • 发布测试消息。

消息负载存储在 Amazon S3,以及发布到的引用中。Amazon SQS 扩展客户端用于接收消息。

SDK for Java 1.x
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

要发布大型消息,请使用适用于 Java 的 Amazon SNS 扩展客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest; import com.amazonaws.services.sns.util.Topics; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import software.amazon.sns.AmazonSNSExtendedClient; import software.amazon.sns.SNSExtendedClientConfiguration; public class Example { public static void main(String[] args) { final String BUCKET_NAME = "extended-client-bucket"; final String TOPIC_NAME = "extended-client-topic"; final String QUEUE_NAME = "extended-client-queue"; final Regions region = Regions.DEFAULT_REGION; // Message threshold controls the maximum message size that will be allowed to // be published // through SNS using the extended client. Payload of messages exceeding this // value will be stored in // S3. The default value of this parameter is 256 KB which is the maximum // message size in SNS (and SQS). final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32; // Initialize SNS, SQS and S3 clients final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build(); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build(); // Create bucket, topic, queue and subscription s3Client.createBucket(BUCKET_NAME); final String topicArn = snsClient.createTopic( new CreateTopicRequest().withName(TOPIC_NAME)).getTopicArn(); final String queueUrl = sqsClient.createQueue( new CreateQueueRequest().withQueueName(QUEUE_NAME)).getQueueUrl(); final String subscriptionArn = Topics.subscribeQueue( snsClient, sqsClient, topicArn, queueUrl); // To read message content stored in S3 transparently through SQS extended // client, // set the RawMessageDelivery subscription attribute to TRUE final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest(); subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn); subscriptionAttributesRequest.setAttributeName("RawMessageDelivery"); subscriptionAttributesRequest.setAttributeValue("TRUE"); snsClient.setSubscriptionAttributes(subscriptionAttributesRequest); // Initialize SNS extended client // PayloadSizeThreshold triggers message content storage in S3 when the // threshold is exceeded // To store all messages content in S3, use AlwaysThroughS3 flag final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME) .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD); final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); // Publish message via SNS with storage in S3 final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above."; snsExtendedClient.publish(topicArn, message); // Initialize SQS extended client final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME); final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration); // Read the message from the queue final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl); System.out.println("Received message is " + result.getMessages().get(0).getBody()); } }

其他终端节点协议

Amazon SNS 和 Amazon SQS 库都使用 AWS 的负载卸载 Java 公共库通过 Amazon S3 存储和检索消息负载。任何启用 Java 的终端节点(例如,在 Java 中实施的 HTTPS 终端节点)都可以使用相同的库来取消引用消息内容。

无法使用 AWS 的负载卸载 Java 公共库的终端节点仍可以通过存储在 Amazon S3 中的负载发布消息。以下是由上面的代码示例发布的 Amazon S3 引用的示例:

[ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ]