Amazon Simple Notification Service Construct Library
Add an SNS Topic to your stack:
topic = sns.Topic(self, "Topic",
display_name="Customer subscription topic"
)
Add a FIFO SNS topic with content-based de-duplication to your stack:
topic = sns.Topic(self, "Topic",
content_based_deduplication=True,
display_name="Customer subscription topic",
fifo=True
)
Add an SNS Topic to your stack with a specified signature version, which corresponds to the hashing algorithm used while creating the signature of the notifications, subscription confirmations, or unsubscribe confirmation messages sent by Amazon SNS.
The default signature version is 1
(SHA1
).
SNS also supports signature version 2
(SHA256
).
topic = sns.Topic(self, "Topic",
signature_version="2"
)
Note that FIFO topics require a topic name to be provided. The required .fifo
suffix will be automatically generated and added to the topic name if it is not explicitly provided.
Subscriptions
Various subscriptions can be added to the topic by calling the
.addSubscription(...)
method on the topic. It accepts a subscription object,
default implementations of which can be found in the
aws-cdk-lib/aws-sns-subscriptions
package:
Add an HTTPS Subscription to your topic:
my_topic = sns.Topic(self, "MyTopic")
my_topic.add_subscription(subscriptions.UrlSubscription("https://foobar.com/"))
Subscribe a queue to the topic:
# queue: sqs.Queue
my_topic = sns.Topic(self, "MyTopic")
my_topic.add_subscription(subscriptions.SqsSubscription(queue))
Note that subscriptions of queues in different accounts need to be manually confirmed by reading the initial message from the queue and visiting the link found in it.
The grantSubscribe
method adds a policy statement to the topic’s resource policy, allowing the specified principal to perform the sns:Subscribe
action.
It’s useful when you want to allow entities, such as another AWS account or resources created later, to subscribe to the topic at their own pace, separating permission granting from the actual subscription process.
# account_principal: iam.AccountPrincipal
my_topic = sns.Topic(self, "MyTopic")
my_topic.grant_subscribe(account_principal)
Filter policy
A filter policy can be specified when subscribing an endpoint to a topic.
Example with a Lambda subscription:
import aws_cdk.aws_lambda as lambda_
# fn: lambda.Function
my_topic = sns.Topic(self, "MyTopic")
# Lambda should receive only message matching the following conditions on attributes:
# color: 'red' or 'orange' or begins with 'bl'
# size: anything but 'small' or 'medium'
# price: between 100 and 200 or greater than 300
# store: attribute must be present
my_topic.add_subscription(subscriptions.LambdaSubscription(fn,
filter_policy={
"color": sns.SubscriptionFilter.string_filter(
allowlist=["red", "orange"],
match_prefixes=["bl"],
match_suffixes=["ue"]
),
"size": sns.SubscriptionFilter.string_filter(
denylist=["small", "medium"]
),
"price": sns.SubscriptionFilter.numeric_filter(
between=sns.BetweenCondition(start=100, stop=200),
greater_than=300
),
"store": sns.SubscriptionFilter.exists_filter()
}
))
Payload-based filtering
To filter messages based on the payload or body of the message, use the filterPolicyWithMessageBody
property. This type of filter policy supports creating filters on nested objects.
Example with a Lambda subscription:
import aws_cdk.aws_lambda as lambda_
# fn: lambda.Function
my_topic = sns.Topic(self, "MyTopic")
# Lambda should receive only message matching the following conditions on message body:
# color: 'red' or 'orange'
my_topic.add_subscription(subscriptions.LambdaSubscription(fn,
filter_policy_with_message_body={
"background": sns.FilterOrPolicy.policy({
"color": sns.FilterOrPolicy.filter(sns.SubscriptionFilter.string_filter(
allowlist=["red", "orange"]
))
})
}
))
Example of Firehose Subscription
from aws_cdk.aws_kinesisfirehose_alpha import DeliveryStream
# stream: DeliveryStream
topic = sns.Topic(self, "Topic")
sns.Subscription(self, "Subscription",
topic=topic,
endpoint=stream.delivery_stream_arn,
protocol=sns.SubscriptionProtocol.FIREHOSE,
subscription_role_arn="SAMPLE_ARN"
)
DLQ setup for SNS Subscription
CDK can attach provided Queue as DLQ for your SNS subscription. See the SNS DLQ configuration docs for more information about this feature.
Example of usage with user provided DLQ.
topic = sns.Topic(self, "Topic")
dl_queue = sqs.Queue(self, "DeadLetterQueue",
queue_name="MySubscription_DLQ",
retention_period=Duration.days(14)
)
sns.Subscription(self, "Subscription",
endpoint="endpoint",
protocol=sns.SubscriptionProtocol.LAMBDA,
topic=topic,
dead_letter_queue=dl_queue
)
CloudWatch Event Rule Target
SNS topics can be used as targets for CloudWatch event rules.
Use the aws-cdk-lib/aws-events-targets.SnsTopic
:
import aws_cdk.aws_codecommit as codecommit
import aws_cdk.aws_events_targets as targets
# repo: codecommit.Repository
my_topic = sns.Topic(self, "Topic")
repo.on_commit("OnCommit",
target=targets.SnsTopic(my_topic)
)
This will result in adding a target to the event rule and will also modify the topic resource policy to allow CloudWatch events to publish to the topic.
Topic Policy
A topic policy is automatically created when addToResourcePolicy
is called, if
one doesn’t already exist. Using addToResourcePolicy
is the simplest way to
add policies, but a TopicPolicy
can also be created manually.
topic = sns.Topic(self, "Topic")
topic_policy = sns.TopicPolicy(self, "TopicPolicy",
topics=[topic]
)
topic_policy.document.add_statements(iam.PolicyStatement(
actions=["sns:Subscribe"],
principals=[iam.AnyPrincipal()],
resources=[topic.topic_arn]
))
A policy document can also be passed on TopicPolicy
construction
topic = sns.Topic(self, "Topic")
policy_document = iam.PolicyDocument(
assign_sids=True,
statements=[
iam.PolicyStatement(
actions=["sns:Subscribe"],
principals=[iam.AnyPrincipal()],
resources=[topic.topic_arn]
)
]
)
topic_policy = sns.TopicPolicy(self, "Policy",
topics=[topic],
policy_document=policy_document
)
Enforce encryption of data in transit when publishing to a topic
You can enforce SSL when creating a topic policy by setting the enforceSSL
flag:
topic = sns.Topic(self, "Topic")
policy_document = iam.PolicyDocument(
assign_sids=True,
statements=[
iam.PolicyStatement(
actions=["sns:Publish"],
principals=[iam.ServicePrincipal("s3.amazonaws.com")],
resources=[topic.topic_arn]
)
]
)
topic_policy = sns.TopicPolicy(self, "Policy",
topics=[topic],
policy_document=policy_document,
enforce_sSL=True
)
Similiarly you can enforce SSL by setting the enforceSSL
flag on the topic:
topic = sns.Topic(self, "TopicAddPolicy",
enforce_sSL=True
)
topic.add_to_resource_policy(iam.PolicyStatement(
principals=[iam.ServicePrincipal("s3.amazonaws.com")],
actions=["sns:Publish"],
resources=[topic.topic_arn]
))
Delivery status logging
Amazon SNS provides support to log the delivery status of notification messages sent to topics with the following Amazon SNS endpoints:
HTTP
Amazon Kinesis Data Firehose
AWS Lambda
Platform application endpoint
Amazon Simple Queue Service
Example with a delivery status logging configuration for SQS:
# role: iam.Role
topic = sns.Topic(self, "MyTopic",
logging_configs=[sns.LoggingConfig(
protocol=sns.LoggingProtocol.SQS,
failure_feedback_role=role,
success_feedback_role=role,
success_feedback_sample_rate=50
)
]
)
A delivery status logging configuration can also be added to your topic by addLoggingConfig
method:
# role: iam.Role
topic = sns.Topic(self, "MyTopic")
topic.add_logging_config(
protocol=sns.LoggingProtocol.SQS,
failure_feedback_role=role,
success_feedback_role=role,
success_feedback_sample_rate=50
)
Note that valid values for successFeedbackSampleRate
are integer between 0-100.
Archive Policy
Message archiving provides the ability to archive a single copy of all messages published to your topic. You can store published messages within your topic by enabling the message archive policy on the topic, which enables message archiving for all subscriptions linked to that topic. Messages can be archived for a minimum of one day to a maximum of 365 days.
Example with an archive policy:
topic = sns.Topic(self, "MyTopic",
fifo=True,
message_retention_period_in_days=7
)
Note: The messageRetentionPeriodInDays
property is only available for FIFO topics.
TracingConfig
Tracing mode of an Amazon SNS topic.
If PassThrough, the topic passes trace headers received from the Amazon SNS publisher to its subscription. If set to Active, Amazon SNS will vend X-Ray segment data to topic owner account if the sampled flag in the tracing header is true.
The default TracingConfig is TracingConfig.PASS_THROUGH
.
Example with a tracingConfig set to Active:
topic = sns.Topic(self, "MyTopic",
tracing_config=sns.TracingConfig.ACTIVE
)