SNSEsempi di utilizzo di Amazon SDK per Go V2 - Esempi di codice dell'AWS SDK

Ci sono altri AWS SDK esempi disponibili nel repository AWS Doc SDK Examples GitHub .

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

SNSEsempi di utilizzo di Amazon SDK per Go V2

I seguenti esempi di codice mostrano come eseguire azioni e implementare scenari comuni utilizzando la AWS SDK for Go versione 2 con AmazonSNS.

Le operazioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Sebbene le azioni mostrino come richiamare le singole funzioni di servizio, puoi vedere le azioni nel loro contesto negli scenari correlati.

Gli scenari sono esempi di codice che mostrano come eseguire attività specifiche richiamando più funzioni all'interno di un servizio o combinandole con altre Servizi AWS.

Ogni esempio include un collegamento al codice sorgente completo, in cui è possibile trovare istruzioni su come configurare ed eseguire il codice nel contesto.

Nozioni di base

I seguenti esempi di codice mostrano come iniziare a usare AmazonSNS.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

package main import ( "context" "fmt" "log" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // main uses the AWS SDK for Go V2 to create an Amazon Simple Notification Service // (Amazon SNS) client and list the topics in your account. // This example uses the default settings specified in your shared credentials // and config files. func main() { ctx := context.Background() sdkConfig, err := config.LoadDefaultConfig(ctx) if err != nil { fmt.Println("Couldn't load default configuration. Have you set up your AWS account?") fmt.Println(err) return } snsClient := sns.NewFromConfig(sdkConfig) fmt.Println("Let's list the topics for your account.") var topics []types.Topic paginator := sns.NewListTopicsPaginator(snsClient, &sns.ListTopicsInput{}) for paginator.HasMorePages() { output, err := paginator.NextPage(ctx) if err != nil { log.Printf("Couldn't get topics. Here's why: %v\n", err) break } else { topics = append(topics, output.Topics...) } } if len(topics) == 0 { fmt.Println("You don't have any topics!") } else { for _, topic := range topics { fmt.Printf("\t%v\n", *topic.TopicArn) } } }

Azioni

Il seguente esempio di codice mostra come utilizzareCreateTopic.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

import ( "context" "encoding/json" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // SnsActions encapsulates the Amazon Simple Notification Service (Amazon SNS) actions // used in the examples. type SnsActions struct { SnsClient *sns.Client } // CreateTopic creates an Amazon SNS topic with the specified name. You can optionally // specify that the topic is created as a FIFO topic and whether it uses content-based // deduplication instead of ID-based deduplication. func (actor SnsActions) CreateTopic(ctx context.Context, topicName string, isFifoTopic bool, contentBasedDeduplication bool) (string, error) { var topicArn string topicAttributes := map[string]string{} if isFifoTopic { topicAttributes["FifoTopic"] = "true" } if contentBasedDeduplication { topicAttributes["ContentBasedDeduplication"] = "true" } topic, err := actor.SnsClient.CreateTopic(ctx, &sns.CreateTopicInput{ Name: aws.String(topicName), Attributes: topicAttributes, }) if err != nil { log.Printf("Couldn't create topic %v. Here's why: %v\n", topicName, err) } else { topicArn = *topic.TopicArn } return topicArn, err }

Il seguente esempio di codice mostra come utilizzareDeleteTopic.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

import ( "context" "encoding/json" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // SnsActions encapsulates the Amazon Simple Notification Service (Amazon SNS) actions // used in the examples. type SnsActions struct { SnsClient *sns.Client } // DeleteTopic delete an Amazon SNS topic. func (actor SnsActions) DeleteTopic(ctx context.Context, topicArn string) error { _, err := actor.SnsClient.DeleteTopic(ctx, &sns.DeleteTopicInput{ TopicArn: aws.String(topicArn)}) if err != nil { log.Printf("Couldn't delete topic %v. Here's why: %v\n", topicArn, err) } return err }

Il seguente esempio di codice mostra come utilizzareListTopics.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

package main import ( "context" "fmt" "log" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // main uses the AWS SDK for Go V2 to create an Amazon Simple Notification Service // (Amazon SNS) client and list the topics in your account. // This example uses the default settings specified in your shared credentials // and config files. func main() { ctx := context.Background() sdkConfig, err := config.LoadDefaultConfig(ctx) if err != nil { fmt.Println("Couldn't load default configuration. Have you set up your AWS account?") fmt.Println(err) return } snsClient := sns.NewFromConfig(sdkConfig) fmt.Println("Let's list the topics for your account.") var topics []types.Topic paginator := sns.NewListTopicsPaginator(snsClient, &sns.ListTopicsInput{}) for paginator.HasMorePages() { output, err := paginator.NextPage(ctx) if err != nil { log.Printf("Couldn't get topics. Here's why: %v\n", err) break } else { topics = append(topics, output.Topics...) } } if len(topics) == 0 { fmt.Println("You don't have any topics!") } else { for _, topic := range topics { fmt.Printf("\t%v\n", *topic.TopicArn) } } }

Il seguente esempio di codice mostra come utilizzarePublish.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

import ( "context" "encoding/json" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // SnsActions encapsulates the Amazon Simple Notification Service (Amazon SNS) actions // used in the examples. type SnsActions struct { SnsClient *sns.Client } // Publish publishes a message to an Amazon SNS topic. The message is then sent to all // subscribers. When the topic is a FIFO topic, the message must also contain a group ID // and, when ID-based deduplication is used, a deduplication ID. An optional key-value // filter attribute can be specified so that the message can be filtered according to // a filter policy. func (actor SnsActions) Publish(ctx context.Context, topicArn string, message string, groupId string, dedupId string, filterKey string, filterValue string) error { publishInput := sns.PublishInput{TopicArn: aws.String(topicArn), Message: aws.String(message)} if groupId != "" { publishInput.MessageGroupId = aws.String(groupId) } if dedupId != "" { publishInput.MessageDeduplicationId = aws.String(dedupId) } if filterKey != "" && filterValue != "" { publishInput.MessageAttributes = map[string]types.MessageAttributeValue{ filterKey: {DataType: aws.String("String"), StringValue: aws.String(filterValue)}, } } _, err := actor.SnsClient.Publish(ctx, &publishInput) if err != nil { log.Printf("Couldn't publish message to topic %v. Here's why: %v", topicArn, err) } return err }
  • Per API i dettagli, consulta Publish in AWS SDK for Go APIReference.

Il seguente esempio di codice mostra come utilizzareSubscribe.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

Iscrivi una coda a un argomento con filtri opzionali.

import ( "context" "encoding/json" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // SnsActions encapsulates the Amazon Simple Notification Service (Amazon SNS) actions // used in the examples. type SnsActions struct { SnsClient *sns.Client } // SubscribeQueue subscribes an Amazon Simple Queue Service (Amazon SQS) queue to an // Amazon SNS topic. When filterMap is not nil, it is used to specify a filter policy // so that messages are only sent to the queue when the message has the specified attributes. func (actor SnsActions) SubscribeQueue(ctx context.Context, topicArn string, queueArn string, filterMap map[string][]string) (string, error) { var subscriptionArn string var attributes map[string]string if filterMap != nil { filterBytes, err := json.Marshal(filterMap) if err != nil { log.Printf("Couldn't create filter policy, here's why: %v\n", err) return "", err } attributes = map[string]string{"FilterPolicy": string(filterBytes)} } output, err := actor.SnsClient.Subscribe(ctx, &sns.SubscribeInput{ Protocol: aws.String("sqs"), TopicArn: aws.String(topicArn), Attributes: attributes, Endpoint: aws.String(queueArn), ReturnSubscriptionArn: true, }) if err != nil { log.Printf("Couldn't susbscribe queue %v to topic %v. Here's why: %v\n", queueArn, topicArn, err) } else { subscriptionArn = *output.SubscriptionArn } return subscriptionArn, err }
  • Per API i dettagli, consulta Subscribe in AWS SDK for Go APIReference.

Scenari

L'esempio di codice seguente mostra come:

  • Crea un argomento (FIFOo nonFIFO).

  • Sottoscrizione di diverse code all'argomento con la possibilità di applicare un filtro.

  • Pubblicazione di un messaggio nell'argomento.

  • Esame delle code per i messaggi ricevuti.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

Esegui uno scenario interattivo al prompt dei comandi.

import ( "context" "encoding/json" "fmt" "log" "strings" "topics_and_queues/actions" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/awsdocs/aws-doc-sdk-examples/gov2/demotools" ) const FIFO_SUFFIX = ".fifo" const TONE_KEY = "tone" var ToneChoices = []string{"cheerful", "funny", "serious", "sincere"} // MessageBody is used to deserialize the body of a message from a JSON string. type MessageBody struct { Message string } // ScenarioRunner separates the steps of this scenario into individual functions so that // they are simpler to read and understand. type ScenarioRunner struct { questioner demotools.IQuestioner snsActor *actions.SnsActions sqsActor *actions.SqsActions } func (runner ScenarioRunner) CreateTopic(ctx context.Context) (string, string, bool, bool) { log.Println("SNS topics can be configured as FIFO (First-In-First-Out) or standard.\n" + "FIFO topics deliver messages in order and support deduplication and message filtering.") isFifoTopic := runner.questioner.AskBool("\nWould you like to work with FIFO topics? (y/n) ", "y") contentBasedDeduplication := false if isFifoTopic { log.Println(strings.Repeat("-", 88)) log.Println("Because you have chosen a FIFO topic, deduplication is supported.\n" + "Deduplication IDs are either set in the message or are automatically generated\n" + "from content using a hash function. If a message is successfully published to\n" + "an SNS FIFO topic, any message published and determined to have the same\n" + "deduplication ID, within the five-minute deduplication interval, is accepted\n" + "but not delivered. For more information about deduplication, see:\n" + "\thttps://docs.aws.amazon.com/sns/latest/dg/fifo-message-dedup.html.") contentBasedDeduplication = runner.questioner.AskBool( "\nDo you want to use content-based deduplication instead of entering a deduplication ID? (y/n) ", "y") } log.Println(strings.Repeat("-", 88)) topicName := runner.questioner.Ask("Enter a name for your SNS topic. ") if isFifoTopic { topicName = fmt.Sprintf("%v%v", topicName, FIFO_SUFFIX) log.Printf("Because you have selected a FIFO topic, '%v' must be appended to\n"+ "the topic name.", FIFO_SUFFIX) } topicArn, err := runner.snsActor.CreateTopic(ctx, topicName, isFifoTopic, contentBasedDeduplication) if err != nil { panic(err) } log.Printf("Your new topic with the name '%v' and Amazon Resource Name (ARN) \n"+ "'%v' has been created.", topicName, topicArn) return topicName, topicArn, isFifoTopic, contentBasedDeduplication } func (runner ScenarioRunner) CreateQueue(ctx context.Context, ordinal string, isFifoTopic bool) (string, string) { queueName := runner.questioner.Ask(fmt.Sprintf("Enter a name for the %v SQS queue. ", ordinal)) if isFifoTopic { queueName = fmt.Sprintf("%v%v", queueName, FIFO_SUFFIX) if ordinal == "first" { log.Printf("Because you are creating a FIFO SQS queue, '%v' must "+ "be appended to the queue name.\n", FIFO_SUFFIX) } } queueUrl, err := runner.sqsActor.CreateQueue(ctx, queueName, isFifoTopic) if err != nil { panic(err) } log.Printf("Your new SQS queue with the name '%v' and the queue URL "+ "'%v' has been created.", queueName, queueUrl) return queueName, queueUrl } func (runner ScenarioRunner) SubscribeQueueToTopic( ctx context.Context, queueName string, queueUrl string, topicName string, topicArn string, ordinal string, isFifoTopic bool) (string, bool) { queueArn, err := runner.sqsActor.GetQueueArn(ctx, queueUrl) if err != nil { panic(err) } log.Printf("The ARN of your queue is: %v.\n", queueArn) err = runner.sqsActor.AttachSendMessagePolicy(ctx, queueUrl, queueArn, topicArn) if err != nil { panic(err) } log.Println("Attached an IAM policy to the queue so the SNS topic can send " + "messages to it.") log.Println(strings.Repeat("-", 88)) var filterPolicy map[string][]string if isFifoTopic { if ordinal == "first" { log.Println("Subscriptions to a FIFO topic can have filters.\n" + "If you add a filter to this subscription, then only the filtered messages\n" + "will be received in the queue.\n" + "For information about message filtering, see\n" + "\thttps://docs.aws.amazon.com/sns/latest/dg/sns-message-filtering.html\n" + "For this example, you can filter messages by a \"tone\" attribute.") } wantFiltering := runner.questioner.AskBool( fmt.Sprintf("Do you want to filter messages that are sent to \"%v\"\n"+ "from the %v topic? (y/n) ", queueName, topicName), "y") if wantFiltering { log.Println("You can filter messages by one or more of the following \"tone\" attributes.") var toneSelections []string askAboutTones := true for askAboutTones { toneIndex := runner.questioner.AskChoice( "Enter the number of the tone you want to filter by:\n", ToneChoices) toneSelections = append(toneSelections, ToneChoices[toneIndex]) askAboutTones = runner.questioner.AskBool("Do you want to add another tone to the filter? (y/n) ", "y") } log.Printf("Your subscription will be filtered to only pass the following tones: %v\n", toneSelections) filterPolicy = map[string][]string{TONE_KEY: toneSelections} } } subscriptionArn, err := runner.snsActor.SubscribeQueue(ctx, topicArn, queueArn, filterPolicy) if err != nil { panic(err) } log.Printf("The queue %v is now subscribed to the topic %v with the subscription ARN %v.\n", queueName, topicName, subscriptionArn) return subscriptionArn, filterPolicy != nil } func (runner ScenarioRunner) PublishMessages(ctx context.Context, topicArn string, isFifoTopic bool, contentBasedDeduplication bool, usingFilters bool) { var message string var groupId string var dedupId string var toneSelection string publishMore := true for publishMore { groupId = "" dedupId = "" toneSelection = "" message = runner.questioner.Ask("Enter a message to publish: ") if isFifoTopic { log.Println("Because you are using a FIFO topic, you must set a message group ID.\n" + "All messages within the same group will be received in the order they were published.") groupId = runner.questioner.Ask("Enter a message group ID: ") if !contentBasedDeduplication { log.Println("Because you are not using content-based deduplication,\n" + "you must enter a deduplication ID.") dedupId = runner.questioner.Ask("Enter a deduplication ID: ") } } if usingFilters { if runner.questioner.AskBool("Add a tone attribute so this message can be filtered? (y/n) ", "y") { toneIndex := runner.questioner.AskChoice( "Enter the number of the tone you want to filter by:\n", ToneChoices) toneSelection = ToneChoices[toneIndex] } } err := runner.snsActor.Publish(ctx, topicArn, message, groupId, dedupId, TONE_KEY, toneSelection) if err != nil { panic(err) } log.Println(("Your message was published.")) publishMore = runner.questioner.AskBool("Do you want to publish another messsage? (y/n) ", "y") } } func (runner ScenarioRunner) PollForMessages(ctx context.Context, queueUrls []string) { log.Println("Polling queues for messages...") for _, queueUrl := range queueUrls { var messages []types.Message for { currentMsgs, err := runner.sqsActor.GetMessages(ctx, queueUrl, 10, 1) if err != nil { panic(err) } if len(currentMsgs) == 0 { break } messages = append(messages, currentMsgs...) } if len(messages) == 0 { log.Printf("No messages were received by queue %v.\n", queueUrl) } else if len(messages) == 1 { log.Printf("One message was received by queue %v:\n", queueUrl) } else { log.Printf("%v messages were received by queue %v:\n", len(messages), queueUrl) } for msgIndex, message := range messages { messageBody := MessageBody{} err := json.Unmarshal([]byte(*message.Body), &messageBody) if err != nil { panic(err) } log.Printf("Message %v: %v\n", msgIndex+1, messageBody.Message) } if len(messages) > 0 { log.Printf("Deleting %v messages from queue %v.\n", len(messages), queueUrl) err := runner.sqsActor.DeleteMessages(ctx, queueUrl, messages) if err != nil { panic(err) } } } } // RunTopicsAndQueuesScenario is an interactive example that shows you how to use the // AWS SDK for Go to create and use Amazon SNS topics and Amazon SQS queues. // // 1. Create a topic (FIFO or non-FIFO). // 2. Subscribe several queues to the topic with an option to apply a filter. // 3. Publish messages to the topic. // 4. Poll the queues for messages received. // 5. Delete the topic and the queues. // // This example creates service clients from the specified sdkConfig so that // you can replace it with a mocked or stubbed config for unit testing. // // It uses a questioner from the `demotools` package to get input during the example. // This package can be found in the ..\..\demotools folder of this repo. func RunTopicsAndQueuesScenario( ctx context.Context, sdkConfig aws.Config, questioner demotools.IQuestioner) { resources := Resources{} defer func() { if r := recover(); r != nil { log.Println("Something went wrong with the demo.\n" + "Cleaning up any resources that were created...") resources.Cleanup(ctx) } }() queueCount := 2 log.Println(strings.Repeat("-", 88)) log.Printf("Welcome to messaging with topics and queues.\n\n"+ "In this workflow, you will create an SNS topic and subscribe %v SQS queues to the\n"+ "topic. You can select from several options for configuring the topic and the\n"+ "subscriptions for the queues. You can then post to the topic and see the results\n"+ "in the queues.\n", queueCount) log.Println(strings.Repeat("-", 88)) runner := ScenarioRunner{ questioner: questioner, snsActor: &actions.SnsActions{SnsClient: sns.NewFromConfig(sdkConfig)}, sqsActor: &actions.SqsActions{SqsClient: sqs.NewFromConfig(sdkConfig)}, } resources.snsActor = runner.snsActor resources.sqsActor = runner.sqsActor topicName, topicArn, isFifoTopic, contentBasedDeduplication := runner.CreateTopic(ctx) resources.topicArn = topicArn log.Println(strings.Repeat("-", 88)) log.Printf("Now you will create %v SQS queues and subscribe them to the topic.\n", queueCount) ordinals := []string{"first", "next"} usingFilters := false for _, ordinal := range ordinals { queueName, queueUrl := runner.CreateQueue(ctx, ordinal, isFifoTopic) resources.queueUrls = append(resources.queueUrls, queueUrl) _, filtering := runner.SubscribeQueueToTopic(ctx, queueName, queueUrl, topicName, topicArn, ordinal, isFifoTopic) usingFilters = usingFilters || filtering } log.Println(strings.Repeat("-", 88)) runner.PublishMessages(ctx, topicArn, isFifoTopic, contentBasedDeduplication, usingFilters) log.Println(strings.Repeat("-", 88)) runner.PollForMessages(ctx, resources.queueUrls) log.Println(strings.Repeat("-", 88)) wantCleanup := questioner.AskBool("Do you want to remove all AWS resources created for this scenario? (y/n) ", "y") if wantCleanup { log.Println("Cleaning up resources...") resources.Cleanup(ctx) } log.Println(strings.Repeat("-", 88)) log.Println("Thanks for watching!") log.Println(strings.Repeat("-", 88)) }

Definisci una struttura che racchiuda SNS le azioni di Amazon utilizzate in questo esempio.

import ( "context" "encoding/json" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" ) // SnsActions encapsulates the Amazon Simple Notification Service (Amazon SNS) actions // used in the examples. type SnsActions struct { SnsClient *sns.Client } // CreateTopic creates an Amazon SNS topic with the specified name. You can optionally // specify that the topic is created as a FIFO topic and whether it uses content-based // deduplication instead of ID-based deduplication. func (actor SnsActions) CreateTopic(ctx context.Context, topicName string, isFifoTopic bool, contentBasedDeduplication bool) (string, error) { var topicArn string topicAttributes := map[string]string{} if isFifoTopic { topicAttributes["FifoTopic"] = "true" } if contentBasedDeduplication { topicAttributes["ContentBasedDeduplication"] = "true" } topic, err := actor.SnsClient.CreateTopic(ctx, &sns.CreateTopicInput{ Name: aws.String(topicName), Attributes: topicAttributes, }) if err != nil { log.Printf("Couldn't create topic %v. Here's why: %v\n", topicName, err) } else { topicArn = *topic.TopicArn } return topicArn, err } // DeleteTopic delete an Amazon SNS topic. func (actor SnsActions) DeleteTopic(ctx context.Context, topicArn string) error { _, err := actor.SnsClient.DeleteTopic(ctx, &sns.DeleteTopicInput{ TopicArn: aws.String(topicArn)}) if err != nil { log.Printf("Couldn't delete topic %v. Here's why: %v\n", topicArn, err) } return err } // SubscribeQueue subscribes an Amazon Simple Queue Service (Amazon SQS) queue to an // Amazon SNS topic. When filterMap is not nil, it is used to specify a filter policy // so that messages are only sent to the queue when the message has the specified attributes. func (actor SnsActions) SubscribeQueue(ctx context.Context, topicArn string, queueArn string, filterMap map[string][]string) (string, error) { var subscriptionArn string var attributes map[string]string if filterMap != nil { filterBytes, err := json.Marshal(filterMap) if err != nil { log.Printf("Couldn't create filter policy, here's why: %v\n", err) return "", err } attributes = map[string]string{"FilterPolicy": string(filterBytes)} } output, err := actor.SnsClient.Subscribe(ctx, &sns.SubscribeInput{ Protocol: aws.String("sqs"), TopicArn: aws.String(topicArn), Attributes: attributes, Endpoint: aws.String(queueArn), ReturnSubscriptionArn: true, }) if err != nil { log.Printf("Couldn't susbscribe queue %v to topic %v. Here's why: %v\n", queueArn, topicArn, err) } else { subscriptionArn = *output.SubscriptionArn } return subscriptionArn, err } // Publish publishes a message to an Amazon SNS topic. The message is then sent to all // subscribers. When the topic is a FIFO topic, the message must also contain a group ID // and, when ID-based deduplication is used, a deduplication ID. An optional key-value // filter attribute can be specified so that the message can be filtered according to // a filter policy. func (actor SnsActions) Publish(ctx context.Context, topicArn string, message string, groupId string, dedupId string, filterKey string, filterValue string) error { publishInput := sns.PublishInput{TopicArn: aws.String(topicArn), Message: aws.String(message)} if groupId != "" { publishInput.MessageGroupId = aws.String(groupId) } if dedupId != "" { publishInput.MessageDeduplicationId = aws.String(dedupId) } if filterKey != "" && filterValue != "" { publishInput.MessageAttributes = map[string]types.MessageAttributeValue{ filterKey: {DataType: aws.String("String"), StringValue: aws.String(filterValue)}, } } _, err := actor.SnsClient.Publish(ctx, &publishInput) if err != nil { log.Printf("Couldn't publish message to topic %v. Here's why: %v", topicArn, err) } return err }

Definisci una struttura che racchiuda SQS le azioni di Amazon utilizzate in questo esempio.

import ( "context" "encoding/json" "fmt" "log" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" ) // SqsActions encapsulates the Amazon Simple Queue Service (Amazon SQS) actions // used in the examples. type SqsActions struct { SqsClient *sqs.Client } // CreateQueue creates an Amazon SQS queue with the specified name. You can specify // whether the queue is created as a FIFO queue. func (actor SqsActions) CreateQueue(ctx context.Context, queueName string, isFifoQueue bool) (string, error) { var queueUrl string queueAttributes := map[string]string{} if isFifoQueue { queueAttributes["FifoQueue"] = "true" } queue, err := actor.SqsClient.CreateQueue(ctx, &sqs.CreateQueueInput{ QueueName: aws.String(queueName), Attributes: queueAttributes, }) if err != nil { log.Printf("Couldn't create queue %v. Here's why: %v\n", queueName, err) } else { queueUrl = *queue.QueueUrl } return queueUrl, err } // GetQueueArn uses the GetQueueAttributes action to get the Amazon Resource Name (ARN) // of an Amazon SQS queue. func (actor SqsActions) GetQueueArn(ctx context.Context, queueUrl string) (string, error) { var queueArn string arnAttributeName := types.QueueAttributeNameQueueArn attribute, err := actor.SqsClient.GetQueueAttributes(ctx, &sqs.GetQueueAttributesInput{ QueueUrl: aws.String(queueUrl), AttributeNames: []types.QueueAttributeName{arnAttributeName}, }) if err != nil { log.Printf("Couldn't get ARN for queue %v. Here's why: %v\n", queueUrl, err) } else { queueArn = attribute.Attributes[string(arnAttributeName)] } return queueArn, err } // AttachSendMessagePolicy uses the SetQueueAttributes action to attach a policy to an // Amazon SQS queue that allows the specified Amazon SNS topic to send messages to the // queue. func (actor SqsActions) AttachSendMessagePolicy(ctx context.Context, queueUrl string, queueArn string, topicArn string) error { policyDoc := PolicyDocument{ Version: "2012-10-17", Statement: []PolicyStatement{{ Effect: "Allow", Action: "sqs:SendMessage", Principal: map[string]string{"Service": "sns.amazonaws.com"}, Resource: aws.String(queueArn), Condition: PolicyCondition{"ArnEquals": map[string]string{"aws:SourceArn": topicArn}}, }}, } policyBytes, err := json.Marshal(policyDoc) if err != nil { log.Printf("Couldn't create policy document. Here's why: %v\n", err) return err } _, err = actor.SqsClient.SetQueueAttributes(ctx, &sqs.SetQueueAttributesInput{ Attributes: map[string]string{ string(types.QueueAttributeNamePolicy): string(policyBytes), }, QueueUrl: aws.String(queueUrl), }) if err != nil { log.Printf("Couldn't set send message policy on queue %v. Here's why: %v\n", queueUrl, err) } return err } // PolicyDocument defines a policy document as a Go struct that can be serialized // to JSON. type PolicyDocument struct { Version string Statement []PolicyStatement } // PolicyStatement defines a statement in a policy document. type PolicyStatement struct { Effect string Action string Principal map[string]string `json:",omitempty"` Resource *string `json:",omitempty"` Condition PolicyCondition `json:",omitempty"` } // PolicyCondition defines a condition in a policy. type PolicyCondition map[string]map[string]string // GetMessages uses the ReceiveMessage action to get messages from an Amazon SQS queue. func (actor SqsActions) GetMessages(ctx context.Context, queueUrl string, maxMessages int32, waitTime int32) ([]types.Message, error) { var messages []types.Message result, err := actor.SqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{ QueueUrl: aws.String(queueUrl), MaxNumberOfMessages: maxMessages, WaitTimeSeconds: waitTime, }) if err != nil { log.Printf("Couldn't get messages from queue %v. Here's why: %v\n", queueUrl, err) } else { messages = result.Messages } return messages, err } // DeleteMessages uses the DeleteMessageBatch action to delete a batch of messages from // an Amazon SQS queue. func (actor SqsActions) DeleteMessages(ctx context.Context, queueUrl string, messages []types.Message) error { entries := make([]types.DeleteMessageBatchRequestEntry, len(messages)) for msgIndex := range messages { entries[msgIndex].Id = aws.String(fmt.Sprintf("%v", msgIndex)) entries[msgIndex].ReceiptHandle = messages[msgIndex].ReceiptHandle } _, err := actor.SqsClient.DeleteMessageBatch(ctx, &sqs.DeleteMessageBatchInput{ Entries: entries, QueueUrl: aws.String(queueUrl), }) if err != nil { log.Printf("Couldn't delete messages from queue %v. Here's why: %v\n", queueUrl, err) } return err } // DeleteQueue deletes an Amazon SQS queue. func (actor SqsActions) DeleteQueue(ctx context.Context, queueUrl string) error { _, err := actor.SqsClient.DeleteQueue(ctx, &sqs.DeleteQueueInput{ QueueUrl: aws.String(queueUrl)}) if err != nil { log.Printf("Couldn't delete queue %v. Here's why: %v\n", queueUrl, err) } return err }

Pulisci le risorse.

import ( "context" "fmt" "log" "topics_and_queues/actions" ) // Resources keeps track of AWS resources created during an example and handles // cleanup when the example finishes. type Resources struct { topicArn string queueUrls []string snsActor *actions.SnsActions sqsActor *actions.SqsActions } // Cleanup deletes all AWS resources created during an example. func (resources Resources) Cleanup(ctx context.Context) { defer func() { if r := recover(); r != nil { fmt.Println("Something went wrong during cleanup. Use the AWS Management Console\n" + "to remove any remaining resources that were created for this scenario.") } }() var err error if resources.topicArn != "" { log.Printf("Deleting topic %v.\n", resources.topicArn) err = resources.snsActor.DeleteTopic(ctx, resources.topicArn) if err != nil { panic(err) } } for _, queueUrl := range resources.queueUrls { log.Printf("Deleting queue %v.\n", queueUrl) err = resources.sqsActor.DeleteQueue(ctx, queueUrl) if err != nil { panic(err) } } }

Esempi serverless

Il seguente esempio di codice mostra come implementare una funzione Lambda che riceve un evento attivato dalla ricezione di messaggi da un argomento. SNS La funzione recupera i messaggi dal parametro dell'evento e registra il contenuto di ogni messaggio.

SDKper Go V2
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Consumare un SNS evento con Lambda utilizzando Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, snsEvent events.SNSEvent) { for _, record := range snsEvent.Records { processMessage(record) } fmt.Println("done") } func processMessage(record events.SNSEventRecord) { message := record.SNS.Message fmt.Printf("Processed message: %s\n", message) // TODO: Process your record here } func main() { lambda.Start(handler) }