适用于 Java 的 Amazon SNS 扩展型客户端库
先决条件
以下是使用适用于 Java 的 Amazon SNS 扩展型客户端库
-
AWS 开发工具包。
本页面上的示例使用 AWS Java 开发工具包。要安装和设置开发工具包,请参阅 AWS SDK for Java 开发人员指南中的设置 AWS SDK for Java。
-
具有合适凭证的 AWS 账户。
要创建 AWS 账户,导航到 AWS 主页
,然后选择 Create an AWS Account(创建 Amazon 账户)。按照说明进行操作。 有关凭证的信息,请参阅 AWS SDK for Java 开发人员指南中的设置用于开发的 AWS 凭证和区域。
-
Java 8 或更高版本。
-
适用于 Java 的 Amazon SNS 扩展型客户端库(也可从 Maven
中获得)。
配置消息存储
Amazon SNS 扩展型客户端库使用 AWS 的有效负载卸载 Java 公共库,以进行消息存储和检索。您可以配置以下 Amazon S3 消息存储选项
-
自定义消息大小阈值 – 具有超过此大小的负载和属性的消息将自动存储在 Amazon S3 中。
-
alwaysThroughS3
标志 – 将此值设置为true
以强制将所有消息负载存储在 Amazon S3 中。例如:SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME).withAlwaysThroughS3(true);
-
自定义 KMS 密钥 – 用于 Amazon S3 存储桶中的服务器端加密的密钥。
-
存储桶名称 – 用于存储消息负载的 Amazon S3 存储桶的名称。
示例:使用存储在 Amazon S3 中的负载将消息发布到 Amazon SNS
以下代码示例展示了如何:
-
创建示例主题和队列。
-
订阅队列以接收来自主题的消息。
-
发布测试消息。
消息负载存储在 Amazon S3,以及发布到的引用中。Amazon SQS 扩展客户端用于接收消息。
- SDK for Java 1.x
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 要发布大型消息,请使用适用于 Java 的 Amazon SNS 扩展客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest; import com.amazonaws.services.sns.util.Topics; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import software.amazon.sns.AmazonSNSExtendedClient; import software.amazon.sns.SNSExtendedClientConfiguration; public class Example { public static void main(String[] args) { final String BUCKET_NAME = "extended-client-bucket"; final String TOPIC_NAME = "extended-client-topic"; final String QUEUE_NAME = "extended-client-queue"; final Regions region = Regions.DEFAULT_REGION; // Message threshold controls the maximum message size that will be allowed to // be published // through SNS using the extended client. Payload of messages exceeding this // value will be stored in // S3. The default value of this parameter is 256 KB which is the maximum // message size in SNS (and SQS). final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32; // Initialize SNS, SQS and S3 clients final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build(); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build(); // Create bucket, topic, queue and subscription s3Client.createBucket(BUCKET_NAME); final String topicArn = snsClient.createTopic( new CreateTopicRequest().withName(TOPIC_NAME)).getTopicArn(); final String queueUrl = sqsClient.createQueue( new CreateQueueRequest().withQueueName(QUEUE_NAME)).getQueueUrl(); final String subscriptionArn = Topics.subscribeQueue( snsClient, sqsClient, topicArn, queueUrl); // To read message content stored in S3 transparently through SQS extended // client, // set the RawMessageDelivery subscription attribute to TRUE final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest(); subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn); subscriptionAttributesRequest.setAttributeName("RawMessageDelivery"); subscriptionAttributesRequest.setAttributeValue("TRUE"); snsClient.setSubscriptionAttributes(subscriptionAttributesRequest); // Initialize SNS extended client // PayloadSizeThreshold triggers message content storage in S3 when the // threshold is exceeded // To store all messages content in S3, use AlwaysThroughS3 flag final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME) .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD); final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); // Publish message via SNS with storage in S3 final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above."; snsExtendedClient.publish(topicArn, message); // Initialize SQS extended client final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME); final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration); // Read the message from the queue final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl); System.out.println("Received message is " + result.getMessages().get(0).getBody()); } }
其他终端节点协议
Amazon SNS 和 Amazon SQS 库都使用 AWS 的负载卸载 Java 公共库
无法使用 AWS 的负载卸载 Java 公共库的终端节点仍可以通过存储在 Amazon S3 中的负载发布消息。以下是由上面的代码示例发布的 Amazon S3 引用的示例:
[ "software.amazon.payloadoffloading.PayloadS3Pointer", { "s3BucketName": "extended-client-bucket", "s3Key": "xxxx-xxxxx-xxxxx-xxxxxx" } ]