Amazon DynamoDB Construct Library

The DynamoDB construct library has two table constructs - Table and TableV2. TableV2 is the preferred construct for all use cases, including creating a single table or a table with multiple replicas.

Table API documentation

Here is a minimal deployable DynamoDB table using TableV2:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING)
)

By default, TableV2 will create a single table in the main deployment region referred to as the primary table. The properties of the primary table are configurable via TableV2 properties. For example, consider the following DynamoDB table created using the TableV2 construct defined in a Stack being deployed to us-west-2:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    contributor_insights=True,
    table_class=dynamodb.TableClass.STANDARD_INFREQUENT_ACCESS,
    point_in_time_recovery=True
)

The above TableV2 definition will result in the provisioning of a single table in us-west-2 with properties that match the properties set on the TableV2 instance.

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/GlobalTables.html

Replicas

The TableV2 construct can be configured with replica tables. This will enable you to work with your table as a global table. To do this, the TableV2 construct must be defined in a Stack with a defined region. The main deployment region must not be given as a replica because this is created by default with the TableV2 construct. The following is a minimal example of defining TableV2 with replicas. This TableV2 definition will provision three copies of the table - one in us-west-2 (primary deployment region), one in us-east-1, and one in us-east-2.

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

Alternatively, you can add new replicas to an instance of the TableV2 construct using the addReplica method:

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1")]
)

global_table.add_replica(region="us-east-2", deletion_protection=True)

The following properties are configurable on a per-replica basis, but will be inherited from the TableV2 properties if not specified:

  • contributorInsights

  • deletionProtection

  • pointInTimeRecovery

  • tableClass

  • readCapacity (only configurable if the TableV2 billing mode is PROVISIONED)

  • globalSecondaryIndexes (only contributorInsights and readCapacity)

The following example shows how to define properties on a per-replica basis:

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    contributor_insights=True,
    point_in_time_recovery=True,
    replicas=[dynamodb.ReplicaTableProps(
        region="us-east-1",
        table_class=dynamodb.TableClass.STANDARD_INFREQUENT_ACCESS,
        point_in_time_recovery=False
    ), dynamodb.ReplicaTableProps(
        region="us-east-2",
        contributor_insights=False
    )
    ]
)

To obtain an ITableV2 reference to a specific replica table, call the replica method on an instance of the TableV2 construct and pass the replica region as an argument:

import aws_cdk as cdk

# user: iam.User


class FooStack(cdk.Stack):

    def __init__(self, scope, id, *, description=None, env=None, stackName=None, tags=None, notificationArns=None, synthesizer=None, terminationProtection=None, analyticsReporting=None, crossRegionReferences=None, permissionsBoundary=None, suppressTemplateIndentation=None):
        super().__init__(scope, id, description=description, env=env, stackName=stackName, tags=tags, notificationArns=notificationArns, synthesizer=synthesizer, terminationProtection=terminationProtection, analyticsReporting=analyticsReporting, crossRegionReferences=crossRegionReferences, permissionsBoundary=permissionsBoundary, suppressTemplateIndentation=suppressTemplateIndentation)

        self.global_table = dynamodb.TableV2(self, "GlobalTable",
            partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
            replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
            ]
        )

class BarStack(cdk.Stack):
    def __init__(self, scope, id, *, replicaTable, description=None, env=None, stackName=None, tags=None, notificationArns=None, synthesizer=None, terminationProtection=None, analyticsReporting=None, crossRegionReferences=None, permissionsBoundary=None, suppressTemplateIndentation=None):
        super().__init__(scope, id, replicaTable=replicaTable, description=description, env=env, stackName=stackName, tags=tags, notificationArns=notificationArns, synthesizer=synthesizer, terminationProtection=terminationProtection, analyticsReporting=analyticsReporting, crossRegionReferences=crossRegionReferences, permissionsBoundary=permissionsBoundary, suppressTemplateIndentation=suppressTemplateIndentation)

        # user is given grantWriteData permissions to replica in us-east-1
        replica_table.grant_write_data(user)

app = cdk.App()

foo_stack = FooStack(app, "FooStack", env=cdk.Environment(region="us-west-2"))
bar_stack = BarStack(app, "BarStack",
    replica_table=foo_stack.global_table.replica("us-east-1"),
    env=cdk.Environment(region="us-east-1")
)

Note: You can create an instance of the TableV2 construct with as many replicas as needed as long as there is only one replica per region. After table creation you can add or remove replicas, but you can only add or remove a single replica in each update.

Billing

The TableV2 construct can be configured with on-demand or provisioned billing:

  • On-demand - The default option. This is a flexible billing option capable of serving requests without capacity planning. The billing mode will be PAY_PER_REQUEST.

  • You can optionally specify the maxReadRequestUnits or maxWriteRequestUnits on individual tables and associated global secondary indexes (GSIs). When you configure maximum throughput for an on-demand table, throughput requests that exceed the maximum amount specified will be throttled.

  • Provisioned - Specify the readCapacity and writeCapacity that you need for your application. The billing mode will be PROVISIONED. Capacity can be configured using one of the following modes:

    • Fixed - provisioned throughput capacity is configured with a fixed number of I/O operations per second.

    • Autoscaled - provisioned throughput capacity is dynamically adjusted on your behalf in response to actual traffic patterns.

Note: writeCapacity can only be configured using autoscaled capacity.

The following example shows how to configure TableV2 with on-demand billing:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    billing=dynamodb.Billing.on_demand()
)

The following example shows how to configure TableV2 with on-demand billing with optional maximum throughput configured:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    billing=dynamodb.Billing.on_demand(
        max_read_request_units=100,
        max_write_request_units=115
    )
)

When using provisioned billing, you must also specify readCapacity and writeCapacity. You can choose to configure readCapacity with fixed capacity or autoscaled capacity, but writeCapacity can only be configured with autoscaled capacity. The following example shows how to configure TableV2 with provisioned billing:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    billing=dynamodb.Billing.provisioned(
        read_capacity=dynamodb.Capacity.fixed(10),
        write_capacity=dynamodb.Capacity.autoscaled(max_capacity=15)
    )
)

When using provisioned billing, you can configure the readCapacity on a per-replica basis:

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    billing=dynamodb.Billing.provisioned(
        read_capacity=dynamodb.Capacity.fixed(10),
        write_capacity=dynamodb.Capacity.autoscaled(max_capacity=15)
    ),
    replicas=[dynamodb.ReplicaTableProps(
        region="us-east-1"
    ), dynamodb.ReplicaTableProps(
        region="us-east-2",
        read_capacity=dynamodb.Capacity.autoscaled(max_capacity=20, target_utilization_percent=50)
    )
    ]
)

When changing the billing for a table from provisioned to on-demand or from on-demand to provisioned, seedCapacity must be configured for each autoscaled resource:

global_table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    billing=dynamodb.Billing.provisioned(
        read_capacity=dynamodb.Capacity.fixed(10),
        write_capacity=dynamodb.Capacity.autoscaled(max_capacity=10, seed_capacity=20)
    )
)

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html

Warm Throughput

Warm throughput refers to the number of read and write operations your DynamoDB table can instantaneously support.

This optional configuration allows you to pre-warm your table or index to handle anticipated throughput, ensuring optimal performance under expected load.

The Warm Throughput configuration settings are automatically replicated across all Global Table replicas.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
    warm_throughput=dynamodb.WarmThroughput(
        read_units_per_second=15000,
        write_units_per_second=20000
    )
)

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/warm-throughput.html

Encryption

All user data stored in a DynamoDB table is fully encrypted at rest. When creating an instance of the TableV2 construct, you can select the following table encryption options:

  • AWS owned keys - Default encryption type. The keys are owned by DynamoDB (no additional charge).

  • AWS managed keys - The keys are stored in your account and are managed by AWS KMS (AWS KMS charges apply).

  • Customer managed keys - The keys are stored in your account and are created, owned, and managed by you. You have full control over the KMS keys (AWS KMS charges apply).

The following is an example of how to configure TableV2 with encryption using an AWS owned key:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    encryption=dynamodb.TableEncryptionV2.dynamo_owned_key()
)

The following is an example of how to configure TableV2 with encryption using an AWS managed key:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    encryption=dynamodb.TableEncryptionV2.aws_managed_key()
)

When configuring TableV2 with encryption using customer managed keys, you must specify the KMS key for the primary table as the tableKey. A map of replicaKeyArns must be provided containing each replica region and the associated KMS key ARN:

import aws_cdk as cdk
import aws_cdk.aws_kms as kms


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

table_key = kms.Key(stack, "Key")
replica_key_arns = {
    "us-east-1": "arn:aws:kms:us-east-1:123456789012:key/g24efbna-az9b-42ro-m3bp-cq249l94fca6",
    "us-east-2": "arn:aws:kms:us-east-2:123456789012:key/h90bkasj-bs1j-92wp-s2ka-bh857d60bkj8"
}

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    encryption=dynamodb.TableEncryptionV2.customer_managed_key(table_key, replica_key_arns),
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

Note: When encryption is configured with customer managed keys, you must have a key already created in each replica region.

Further reading: https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#key-mgmt

Secondary Indexes

Secondary indexes allow efficient access to data with attributes other than the primaryKey. DynamoDB supports two types of secondary indexes:

  • Global secondary index - An index with a partitionKey and a sortKey that can be different from those on the base table. A globalSecondaryIndex is considered “global” because queries on the index can span all of the data in the base table, across all partitions. A globalSecondaryIndex is stored in its own partition space away from the base table and scales separately from the base table.

  • Local secondary index - An index that has the same partitionKey as the base table, but a different sortKey. A localSecondaryIndex is “local” in the sense that every partition of a localSecondaryIndex is scoped to a base table partition that has the same partitionKey value.

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/SecondaryIndexes.html

Global Secondary Indexes

TableV2 can be configured with globalSecondaryIndexes by providing them as a TableV2 property:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    global_secondary_indexes=[dynamodb.GlobalSecondaryIndexPropsV2(
        index_name="gsi",
        partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING)
    )
    ]
)

Alternatively, you can add a globalSecondaryIndex using the addGlobalSecondaryIndex method:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    global_secondary_indexes=[dynamodb.GlobalSecondaryIndexPropsV2(
        index_name="gsi1",
        partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING)
    )
    ]
)

table.add_global_secondary_index(
    index_name="gsi2",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING)
)

You can configure readCapacity and writeCapacity on a globalSecondaryIndex when an TableV2 is configured with provisioned billing. If TableV2 is configured with provisioned billing but readCapacity or writeCapacity are not configured on a globalSecondaryIndex, then they will be inherited from the capacity settings specified with the billing configuration:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    billing=dynamodb.Billing.provisioned(
        read_capacity=dynamodb.Capacity.fixed(10),
        write_capacity=dynamodb.Capacity.autoscaled(max_capacity=10)
    ),
    global_secondary_indexes=[dynamodb.GlobalSecondaryIndexPropsV2(
        index_name="gsi1",
        partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
        read_capacity=dynamodb.Capacity.fixed(15)
    ), dynamodb.GlobalSecondaryIndexPropsV2(
        index_name="gsi2",
        partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
        write_capacity=dynamodb.Capacity.autoscaled(min_capacity=5, max_capacity=20)
    )
    ]
)

All globalSecondaryIndexes for replica tables are inherited from the primary table. You can configure contributorInsights and readCapacity for each globalSecondaryIndex on a per-replica basis:

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    contributor_insights=True,
    billing=dynamodb.Billing.provisioned(
        read_capacity=dynamodb.Capacity.fixed(10),
        write_capacity=dynamodb.Capacity.autoscaled(max_capacity=10)
    ),
    # each global secondary index will inherit contributor insights as true
    global_secondary_indexes=[dynamodb.GlobalSecondaryIndexPropsV2(
        index_name="gsi1",
        partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
        read_capacity=dynamodb.Capacity.fixed(15)
    ), dynamodb.GlobalSecondaryIndexPropsV2(
        index_name="gsi2",
        partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
        write_capacity=dynamodb.Capacity.autoscaled(min_capacity=5, max_capacity=20)
    )
    ],
    replicas=[dynamodb.ReplicaTableProps(
        region="us-east-1",
        global_secondary_index_options={
            "gsi1": dynamodb.ReplicaGlobalSecondaryIndexOptions(
                read_capacity=dynamodb.Capacity.autoscaled(min_capacity=1, max_capacity=10)
            )
        }
    ), dynamodb.ReplicaTableProps(
        region="us-east-2",
        global_secondary_index_options={
            "gsi2": dynamodb.ReplicaGlobalSecondaryIndexOptions(
                contributor_insights=False
            )
        }
    )
    ]
)

Local Secondary Indexes

TableV2 can only be configured with localSecondaryIndexes when a sortKey is defined as a TableV2 property.

You can provide localSecondaryIndexes as a TableV2 property:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    sort_key=dynamodb.Attribute(name="sk", type=dynamodb.AttributeType.NUMBER),
    local_secondary_indexes=[dynamodb.LocalSecondaryIndexProps(
        index_name="lsi",
        sort_key=dynamodb.Attribute(name="sk", type=dynamodb.AttributeType.NUMBER)
    )
    ]
)

Alternatively, you can add a localSecondaryIndex using the addLocalSecondaryIndex method:

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    sort_key=dynamodb.Attribute(name="sk", type=dynamodb.AttributeType.NUMBER),
    local_secondary_indexes=[dynamodb.LocalSecondaryIndexProps(
        index_name="lsi1",
        sort_key=dynamodb.Attribute(name="sk", type=dynamodb.AttributeType.NUMBER)
    )
    ]
)

table.add_local_secondary_index(
    index_name="lsi2",
    sort_key=dynamodb.Attribute(name="sk", type=dynamodb.AttributeType.NUMBER)
)

Streams

Each DynamoDB table produces an independent stream based on all its writes, regardless of the origination point for those writes. DynamoDB supports two stream types:

  • DynamoDB streams - Capture item-level changes in your table, and push the changes to a DynamoDB stream. You then can access the change information through the DynamoDB Streams API.

  • Kinesis streams - Amazon Kinesis Data Streams for DynamoDB captures item-level changes in your table, and replicates the changes to a Kinesis data stream. You then can consume and manage the change information from Kinesis.

DynamoDB Streams

A dynamoStream can be configured as a TableV2 property. If the TableV2 instance has replica tables, then all replica tables will inherit the dynamoStream setting from the primary table. If replicas are configured, but dynamoStream is not configured, then the primary table and all replicas will be automatically configured with the NEW_AND_OLD_IMAGES stream view type.

import aws_cdk as cdk
import aws_cdk.aws_kinesis as kinesis


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(self, "GlobalTable",
    partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
    dynamo_stream=dynamodb.StreamViewType.OLD_IMAGE,
    # tables in us-west-2, us-east-1, and us-east-2 all have dynamo stream type of OLD_IMAGES
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html

Kinesis Streams

A kinesisStream can be configured as a TableV2 property. Replica tables will not inherit the kinesisStream configured for the primary table and should added on a per-replica basis.

import aws_cdk as cdk
import aws_cdk.aws_kinesis as kinesis


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

stream1 = kinesis.Stream(stack, "Stream1")
stream2 = kinesis.Stream.from_stream_arn(stack, "Stream2", "arn:aws:kinesis:us-east-2:123456789012:stream/my-stream")

global_table = dynamodb.TableV2(self, "GlobalTable",
    partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
    kinesis_stream=stream1,  # for table in us-west-2
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(
        region="us-east-2",
        kinesis_stream=stream2
    )
    ]
)

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html

Keys

When an instance of the TableV2 construct is defined, you must define its schema using the partitionKey (required) and sortKey (optional) properties.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    sort_key=dynamodb.Attribute(name="sk", type=dynamodb.AttributeType.NUMBER)
)

Contributor Insights

Enabling contributorInsights for TableV2 will provide information about the most accessed and throttled items in a table or globalSecondaryIndex. DynamoDB delivers this information to you via CloudWatch Contributor Insights rules, reports, and graphs of report data.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    contributor_insights=True
)

When you use Table, you can enable contributor insights for a table or specific global secondary index by setting contributorInsightsEnabled to true.

table = dynamodb.Table(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    contributor_insights_enabled=True
)

table.add_global_secondary_index(
    contributor_insights_enabled=True,  # for a specific global secondary index
    index_name="gsi",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING)
)

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/contributorinsights_HowItWorks.html

Deletion Protection

deletionProtection determines if your DynamoDB table is protected from deletion and is configurable as a TableV2 property. When enabled, the table cannot be deleted by any user or process.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    deletion_protection=True
)

You can also specify the removalPolicy as a property of the TableV2 construct. This property allows you to control what happens to tables provisioned using TableV2 during stack deletion. By default, the removalPolicy is RETAIN which will cause all tables provisioned using TableV2 to be retained in the account, but orphaned from the stack they were created in. You can also set the removalPolicy to DESTROY which will delete all tables created using TableV2 during stack deletion:

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    # applys to all replicas, i.e., us-west-2, us-east-1, us-east-2
    removal_policy=cdk.RemovalPolicy.DESTROY,
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

deletionProtection is configurable on a per-replica basis. If the removalPolicy is set to DESTROY, but some replicas have deletionProtection enabled, then only the replicas without deletionProtection will be deleted during stack deletion:

import aws_cdk as cdk


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    removal_policy=cdk.RemovalPolicy.DESTROY,
    deletion_protection=True,
    # only the replica in us-east-1 will be deleted during stack deletion
    replicas=[dynamodb.ReplicaTableProps(
        region="us-east-1",
        deletion_protection=False
    ), dynamodb.ReplicaTableProps(
        region="us-east-2",
        deletion_protection=True
    )
    ]
)

Point-in-Time Recovery

pointInTimeRecovery provides automatic backups of your DynamoDB table data which helps protect your tables from accidental write or delete operations.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    point_in_time_recovery=True
)

Table Class

You can configure a TableV2 instance with table classes:

  • STANDARD - the default mode, and is recommended for the vast majority of workloads.

  • STANDARD_INFREQUENT_ACCESS - optimized for tables where storage is the dominant cost.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    table_class=dynamodb.TableClass.STANDARD_INFREQUENT_ACCESS
)

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.TableClasses.html

Tags

You can add tags to a TableV2 in several ways. By adding the tags to the construct itself it will apply the tags to the primary table.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    tags=[CfnTag(key="primaryTableTagKey", value="primaryTableTagValue")]
)

You can also add tags to replica tables by specifying them within the replica table properties.

table = dynamodb.TableV2(self, "Table",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    replicas=[dynamodb.ReplicaTableProps(
        region="us-west-1",
        tags=[CfnTag(key="replicaTableTagKey", value="replicaTableTagValue")]
    )
    ]
)

Referencing Existing Global Tables

To reference an existing DynamoDB table in your CDK application, use the TableV2.fromTableName, TableV2.fromTableArn, or TableV2.fromTableAttributes factory methods:

# user: iam.User


table = dynamodb.TableV2.from_table_arn(self, "ImportedTable", "arn:aws:dynamodb:us-east-1:123456789012:table/my-table")
# now you can call methods on the referenced table
table.grant_read_write_data(user)

If you intend to use the tableStreamArn (including indirectly, for example by creating an aws-cdk-lib/aws-lambda-event-sources.DynamoEventSource on the referenced table), you must use the TableV2.fromTableAttributes method and the tableStreamArn property must be populated.

To grant permissions to indexes for a referenced table you can either set grantIndexPermissions to true, or you can provide the indexes via the globalIndexes or localIndexes properties. This will enable grant* methods to also grant permissions to all table indexes.

Resource Policy

Using resourcePolicy you can add a resource policy to a table in the form of a PolicyDocument:

    // resource policy document
    const policy = new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['dynamodb:GetItem'],
          principals: [new iam.AccountRootPrincipal()],
          resources: ['*'],
        }),
      ],
    });

    // table with resource policy
    new dynamodb.TableV2(this, 'TableTestV2-1', {
      partitionKey: {
        name: 'id',
        type: dynamodb.AttributeType.STRING,
      },
      removalPolicy: RemovalPolicy.DESTROY,
      resourcePolicy: policy,
    });

TableV2 doesn’t support creating a replica and adding a resource-based policy to that replica in the same stack update in Regions other than the Region where you deploy the stack update. To incorporate a resource-based policy into a replica, you’ll need to initially deploy the replica without the policy, followed by a subsequent update to include the desired policy.

Grants

Using any of the grant* methods on an instance of the TableV2 construct will only apply to the primary table, its indexes, and any associated encryptionKey. As an example, grantReadData used below will only apply the table in us-west-2:

import aws_cdk as cdk
import aws_cdk.aws_kms as kms

# user: iam.User


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

table_key = kms.Key(stack, "Key")
replica_key_arns = {
    "us-east-1": "arn:aws:kms:us-east-1:123456789012:key/g24efbna-az9b-42ro-m3bp-cq249l94fca6",
    "us-east-2": "arn:aws:kms:us-east-2:123456789012:key/g24efbna-az9b-42ro-m3bp-cq249l94fca6"
}

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    encryption=dynamodb.TableEncryptionV2.customer_managed_key(table_key, replica_key_arns),
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

# grantReadData only applys to the table in us-west-2 and the tableKey
global_table.grant_read_data(user)

The replica method can be used to grant to a specific replica table:

import aws_cdk as cdk
import aws_cdk.aws_kms as kms

# user: iam.User


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

table_key = kms.Key(stack, "Key")
replica_key_arns = {
    "us-east-1": "arn:aws:kms:us-east-1:123456789012:key/g24efbna-az9b-42ro-m3bp-cq249l94fca6",
    "us-east-2": "arn:aws:kms:us-east-2:123456789012:key/g24efbna-az9b-42ro-m3bp-cq249l94fca6"
}

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    encryption=dynamodb.TableEncryptionV2.customer_managed_key(table_key, replica_key_arns),
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

# grantReadData applys to the table in us-east-2 and the key arn for the key in us-east-2
global_table.replica("us-east-2").grant_read_data(user)

Metrics

You can use metric* methods to generate metrics for a table that can be used when configuring an Alarm or Graphs. The metric* methods only apply to the primary table provisioned using the TableV2 construct. As an example, metricConsumedReadCapacityUnits used below is only for the table in us-west-2:

import aws_cdk as cdk
import aws_cdk.aws_cloudwatch as cloudwatch


app = cdk.App()
stack = cdk.Stack(app, "Stack", env=cdk.Environment(region="us-west-2"))

global_table = dynamodb.TableV2(stack, "GlobalTable",
    partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
    replicas=[dynamodb.ReplicaTableProps(region="us-east-1"), dynamodb.ReplicaTableProps(region="us-east-2")
    ]
)

# metric is only for the table in us-west-2
metric = global_table.metric_consumed_read_capacity_units()

cloudwatch.Alarm(self, "Alarm",
    metric=metric,
    evaluation_periods=1,
    threshold=1
)

The replica method can be used to generate a metric for a specific replica table:

import * as cdk form 'aws-cdk-lib';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';

class FooStack extends cdk.Stack {
  public readonly globalTable: dynamodb.TableV2;

  public constructor(scope: Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    this.globalTable = new dynamodb.Tablev2(this, 'GlobalTable', {
      partitionKey: { name: 'pk', type: dynamodb.AttributeType.STRING },
      replicas: [
        { region: 'us-east-1' },
        { region: 'us-east-2' },
      ],
    });
  }
}

interface BarStack extends cdk.StackProps {
  readonly replicaTable: dynamodb.ITableV2;
}

class BarStack extends cdk.Stack {
  public constructor(scope: Construct, id: string, props: BarStackProps) {
    super(scope, id, props);

    // metric is only for the table in us-east-1
    const metric = props.replicaTable.metricConsumedReadCapacityUnits();

    new cloudwatch.Alarm(this, 'Alarm', {
      metric: metric,
      evaluationPeriods: 1,
      threshold: 1,
    });
  }
}

const app = new cdk.App();
const fooStack = new FooStack(app, 'FooStack', { env: { region: 'us-west-2' } });
const barStack = new BarStack(app, 'BarStack', {
  replicaTable: fooStack.globalTable.replica('us-east-1'),
  env: { region: 'us-east-1' },
});

import from S3 Bucket

You can import data in S3 when creating a Table using the Table construct. To import data into DynamoDB, it is required that your data is in a CSV, DynamoDB JSON, or Amazon Ion format within an Amazon S3 bucket. The data may be compressed using ZSTD or GZIP formats, or you may choose to import it without compression. The data source can be a single S3 object or multiple S3 objects sharing a common prefix.

Further reading: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/S3DataImport.HowItWorks.html

use CSV format

The InputFormat.csv method accepts delimiter and headerList options as arguments. If delimiter is not specified, , is used by default. And if headerList is specified, the first line of CSV is treated as data instead of header.

import aws_cdk as cdk
import aws_cdk.aws_s3 as s3

# bucket: s3.IBucket


app = cdk.App()
stack = cdk.Stack(app, "Stack")

dynamodb.Table(stack, "Table",
    partition_key=dynamodb.Attribute(
        name="id",
        type=dynamodb.AttributeType.STRING
    ),
    import_source=dynamodb.ImportSourceSpecification(
        compression_type=dynamodb.InputCompressionType.GZIP,
        input_format=dynamodb.InputFormat.csv(
            delimiter=",",
            header_list=["id", "name"]
        ),
        bucket=bucket,
        key_prefix="prefix"
    )
)

use DynamoDB JSON format

Use the InputFormat.dynamoDBJson() method to specify the inputFormat property. There are currently no options available.

import aws_cdk as cdk
import aws_cdk.aws_s3 as s3

# bucket: s3.IBucket


app = cdk.App()
stack = cdk.Stack(app, "Stack")

dynamodb.Table(stack, "Table",
    partition_key=dynamodb.Attribute(
        name="id",
        type=dynamodb.AttributeType.STRING
    ),
    import_source=dynamodb.ImportSourceSpecification(
        compression_type=dynamodb.InputCompressionType.GZIP,
        input_format=dynamodb.InputFormat.dynamo_dBJson(),
        bucket=bucket,
        key_prefix="prefix"
    )
)

use Amazon Ion format

Use the InputFormat.ion() method to specify the inputFormat property. There are currently no options available.

import aws_cdk as cdk
import aws_cdk.aws_s3 as s3

# bucket: s3.IBucket


app = cdk.App()
stack = cdk.Stack(app, "Stack")

dynamodb.Table(stack, "Table",
    partition_key=dynamodb.Attribute(
        name="id",
        type=dynamodb.AttributeType.STRING
    ),
    import_source=dynamodb.ImportSourceSpecification(
        compression_type=dynamodb.InputCompressionType.GZIP,
        input_format=dynamodb.InputFormat.ion(),
        bucket=bucket,
        key_prefix="prefix"
    )
)