使用 Amazon S3 SNS 向亚马逊发布一条大消息 AWS SDK - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon S3 SNS 向亚马逊发布一条大消息 AWS SDK

以下代码示例演示如何SNS使用 Amazon S3 向亚马逊发布大消息来存储消息负载。

Java
SDK适用于 Java 1.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

要发布大型消息,请使用适用于 Java 的 Amazon SNS 扩展客户端库。您发送的消息将引用包含实际消息内容的 Amazon S3 对象。

import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.PublishRequest; import com.amazonaws.services.sns.model.SetSubscriptionAttributesRequest; import com.amazonaws.services.sns.util.Topics; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import software.amazon.sns.AmazonSNSExtendedClient; import software.amazon.sns.SNSExtendedClientConfiguration; public class Example { public static void main(String[] args) { final String BUCKET_NAME = "extended-client-bucket"; final String TOPIC_NAME = "extended-client-topic"; final String QUEUE_NAME = "extended-client-queue"; final Regions region = Regions.DEFAULT_REGION; // Message threshold controls the maximum message size that will be allowed to // be published // through SNS using the extended client. Payload of messages exceeding this // value will be stored in // S3. The default value of this parameter is 256 KB which is the maximum // message size in SNS (and SQS). final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32; // Initialize SNS, SQS and S3 clients final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build(); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build(); // Create bucket, topic, queue and subscription s3Client.createBucket(BUCKET_NAME); final String topicArn = snsClient.createTopic( new CreateTopicRequest().withName(TOPIC_NAME)).getTopicArn(); final String queueUrl = sqsClient.createQueue( new CreateQueueRequest().withQueueName(QUEUE_NAME)).getQueueUrl(); final String subscriptionArn = Topics.subscribeQueue( snsClient, sqsClient, topicArn, queueUrl); // To read message content stored in S3 transparently through SQS extended // client, // set the RawMessageDelivery subscription attribute to TRUE final SetSubscriptionAttributesRequest subscriptionAttributesRequest = new SetSubscriptionAttributesRequest(); subscriptionAttributesRequest.setSubscriptionArn(subscriptionArn); subscriptionAttributesRequest.setAttributeName("RawMessageDelivery"); subscriptionAttributesRequest.setAttributeValue("TRUE"); snsClient.setSubscriptionAttributes(subscriptionAttributesRequest); // Initialize SNS extended client // PayloadSizeThreshold triggers message content storage in S3 when the // threshold is exceeded // To store all messages content in S3, use AlwaysThroughS3 flag final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME) .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD); final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); // Publish message via SNS with storage in S3 final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above."; snsExtendedClient.publish(topicArn, message); // Initialize SQS extended client final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME); final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration); // Read the message from the queue final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(queueUrl); System.out.println("Received message is " + result.getMessages().get(0).getBody()); } }