のターゲットとしての Amazon Kinesis Data Streams の使用 AWS Database Migration Service - AWS Database Migration Service

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

のターゲットとしての Amazon Kinesis Data Streams の使用 AWS Database Migration Service

を使用して AWS DMS 、Amazon Kinesis データストリームにデータを移行できます。Amazon Kinesis Data Streams サービスは、Amazon Kinesis Data Streams サービスの一部です。データレコードの大量のストリームをリアルタイムで収集し、処理するには、Kinesis データストリームを使用します。

Kinesis データストリームはシャードで構成されています。シャードは、ストリーム内のデータレコードの一意に識別されたシーケンスです。Amazon Kinesis Data Streams におけるシャードの詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「シャード」をご参照ください。

AWS Database Migration Service は、JSON を使用してレコードを Kinesis データストリームに発行します。変換時に、 AWS DMS はソースデータベースから各レコードを JSON 形式または JSON_UNFORMATTED メッセージ形式の属性と値のペアにシリアル化します。JSON_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。これにより、Amazon Data Firehose は Amazon S3 の宛先に Kinesis データを配信し、Amazon Athena を含むさまざまなクエリ エンジンを使用してクエリを実行できます。

サポートされている任意のデータソースから、ターゲットストリームにデータを移行するために、オブジェクトのマッピングを使用します。オブジェクトマッピングを使用して、ストリームにデータレコードを構築する方法を決定します。データをそのシャードにグループ化するために Kinesis Data Streams で使用する、各テーブルのパーティション キーも定義します。

が Kinesis Data Streams ターゲットエンドポイントにテーブル AWS DMS を作成すると、ソースデータベースエンドポイントと同じ数のテーブルが作成されます。 AWS DMS また、 はいくつかの Kinesis Data Streams パラメータ値も設定します。テーブル作成のコストは、データの量および移行するテーブルの数によって異なります。

注記

AWS DMS コンソールまたは API の SSL モードオプションは、一部のデータストリーミングや、Kinesis や DynamoDB などの NoSQL サービスには適用されません。これらはデフォルトで安全であるため、SSL モードの設定が none (SSL Mode=None) と等しい AWS DMS ことを示しています。SSL を使用するために、エンドポイントに追加の設定は必要ありません。例えば、Kinesis をターゲット エンドポイントとして使用する場合、デフォルトでセキュリティで保護されます。Kinesis へのすべての API コールは SSL を使用するため、 AWS DMS エンドポイントに追加の SSL オプションは必要ありません。 AWS DMS が Kinesis Data Stream への接続時にデフォルトで使用する HTTPS プロトコルを使用して、SSL エンドポイント経由で安全にデータを保存して取得できます。

Kinesis Data Streams エンドポイント設定

Kinesis Data Streams ターゲットエンドポイントを使用すると、 AWS DMS API の KinesisSettingsオプションを使用してトランザクションとコントロールの詳細を取得できます。

接続は、次のいずれかの方法で設定できます。

  • AWS DMS コンソールで、エンドポイント設定を使用します。

  • CLI では、CreateEndpoint コマンド のkinesis-settings オプションを使用します。

CLI で、次の kinesis-settings オプションのリクエストパラメータを使用します。

注記

AWS DMS バージョン 3.4.1 以降では、IncludeNullAndEmpty エンドポイント設定に対応しています。ただし、Kinesis Data Streams ターゲットの他のエンドポイント設定のサポートは で利用できます AWS DMS。

  • MessageFormat - エンドポイントで作成されたレコードの出力形式。メッセージ形式は JSON (デフォルト) または JSON_UNFORMATTED (タブなし 1 行) です。

  • IncludeControlDetails - Kinesis メッセージ出力に、テーブル定義、列定義、テーブル、列の変更の詳細な制御情報を表示します。デフォルト: false

  • IncludeNullAndEmpty — ターゲットに NULL 列と空の列を含めます。デフォルト: false

  • IncludePartitionValue – パーティションタイプが schema-table-type でない限り、Kinesis メッセージ出力内のパーティション値を表示します。デフォルト: false

  • IncludeTableAlterOperationsrename-tabledrop-tableadd-columndrop-columnrename-column など、制御データのテーブルを変更するデータ定義言語 (DDL) オペレーションが含まれます。デフォルト: false

  • IncludeTransactionDetails - ソースデータベースからの詳細のトランザクション情報を提供します。この情報には、コミットタイムスタンプ、ログの位置、transaction_idprevious_transaction_id、および transaction_record_id (トランザクション内のレコードオフセット) の値が含まれます。デフォルト: false

  • PartitionIncludeSchemaTable パーティションタイプが primary-key-type の場合、スキーマとテーブル名をパーティション値にプレフィックスします。これにより、Kinesis シャード間のデータ分散が増加します。例えば、SysBench スキーマに数千のテーブルがあり、各テーブルのプライマリ キーの範囲が制限されているとします。この場合、同じプライマリキーが数千のテーブルから同じシャードに送信され、スロットリングが発生します。デフォルト: false

  • useLargeIntegerValue – AWS DMS バージョン 3.5.4 から入手可能な、整数を倍精度でキャストするのではなく、最大 18 桁の整数を使用します。デフォルトは False です。

次の例で、 AWS CLIを使用して発行された例 create-endpointコマンドを使用中の kinesis-settings オプションを示します。

aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
マルチスレッド全ロードタスク設定

転送速度を向上させるために、 は Kinesis Data Streams ターゲットインスタンスへのマルチスレッド全ロード AWS DMS をサポートします。DMS では、このマルチスレッドでのタスク設定を、以下でサポートします。

  • MaxFullLoadSubTasks - 並列ロードするソーステーブルの最大数を指定するにはこのオプションを使用します。DMS は、専用のサブタスクを使用して、対応する Kinesis ターゲットテーブルに各テーブルをロードします。デフォルトは 8、最大値は 49 です。

  • ParallelLoadThreads – このオプションを使用して、 AWS DMS が各テーブルを Kinesis ターゲットテーブルにロードするために使用するスレッドの数を指定します。Kinesis Data Streams ターゲットの最大値は 32 です。この上限を増やすよう依頼できます。

  • ParallelLoadBufferSize – Kinesis ターゲットにデータをロードするために並列ロードスレッドが使用する、バッファ内に保存するレコードの最大数を指定するには、このオプションを使用します。デフォルト値は 50 です。最大値は 1000 です。この設定は ParallelLoadThreads で使用します。ParallelLoadBufferSize は、複数のスレッドがある場合にのみ有効です。

  • ParallelLoadQueuesPerThread - このオプションを使用して、各同時スレッドがキューからデータレコードを取り出し、ターゲットのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは 1 です。ただし、さまざまなペイロードサイズの Kinesis ターゲットの場合、有効な範囲は 1 スレッドあたり 5 ~ 512 キューです。

マルチスレッド CDC ロードタスクの設定

タスク設定を使用して PutRecords API コールの動作を変更するなど、Kinesis などのリアルタイム データストリーミング ターゲット エンドポイントの変更データキャプチャ (CDC) のパフォーマンスを向上させることができます。これを行うには、ParallelApply* タスク設定を使用して、同時スレッドの数、スレッドあたりのキュー数、バッファに格納するレコード数を指定します。例えば、CDC ロードを実行し、128 本のスレッドを並列に適用するとします。また、スレッドあたり 64 個のキューにアクセスして、バッファあたり 50 個のレコードを保存する必要があります。

CDC のパフォーマンスを向上させるために、 では以下のタスク設定 AWS DMS をサポートしています。

  • ParallelApplyThreads – CDC ロード中に AWS DMS がデータレコードを Kinesis ターゲットエンドポイントにプッシュするために使用する同時スレッドの数を指定します。デフォルト値は 0 で、最大値は 32 です。

  • ParallelApplyBufferSize – CDC ロード中に同時スレッドが Kinesis ターゲット エンドポイントにプッシュする場合に、各バッファキューに保存するレコードの最大数を指定します。デフォルト値は 100 で、最大値は 1,000 です。このオプションは、ParallelApplyThreads で複数のスレッドを指定する場合に使用します。

  • ParallelApplyQueuesPerThread – 各スレッドがキューからデータレコードを取り出し、CDC 中に Kinesis エンドポイントのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルト値は 1、最大値は 512 です。

ParallelApply* タスク設定を使用する場合、partition-key-type のデフォルトは schema-name.table-name ではなくテーブルの primary-key です。

前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示

Kinesis のようなデータストリーミングターゲットに CDC 更新を書き込む場合、更新によって変更される前に、ソースデータベースの行の元の値を表示できます。これを可能にするために、 はソースデータベースエンジンから提供されたデータに基づいて更新イベントの前イメージ AWS DMS を設定します。

ソースデータベースエンジンによって、前イメージに対してさまざまな量の情報が提供されます。

  • Oracle では、列が変更された場合にのみ列の更新が提供されます。

  • PostgreSQL は、プライマリキーの一部である列のデータ (変更されたかどうか) のみを提供します。すべての列に (変更の有無を問わず) データを提供するには、REPLICA_IDENTITYDEFAULTでなく FULL に設定する必要があります。各テーブルを REPLICA_IDENTITY に設定する際は、慎重に行うべきであることに注意します。REPLICA_IDENTITYFULL に設定すると、すべての列値がログ先行書き込み (WAL) に継続的に書き込まれます。これにより、頻繁に更新されるテーブルではパフォーマンスやリソースの問題が発生する可能性があります。

  • MySQL は通常、BLOB および CLOB データ型を除くすべての列のデータを (変更の有無を問わず) 提供します。

前イメージを有効にして、ソースデータベースから元の値を AWS DMS 出力に追加するには、BeforeImageSettings タスク設定または add-before-image-columns パラメータを使用します。このパラメータは、列変換ルールを適用します。

BeforeImageSettings は、次に示すように、ソースデータベースシステムから収集された値を使用して、すべての更新オペレーションに新しい JSON 属性を追加します。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注記

フルロードと CDC BeforeImageSettings AWS DMS タスク (既存のデータを移行して継続的な変更をレプリケートする) などの CDC コンポーネントを含むタスク、または CDC のみのタスク (データ変更のみをレプリケートする) にのみ適用されます。全ロードのタスクには BeforeImageSettings を適用しないでください。

BeforeImageSettings オプションには、次の項目が適用されます。

  • EnableBeforeImage オプションを true に設定して、前イメージを有効にします。デフォルト: false

  • FieldName オプションを使用して、新しい JSON 属性に名前を割り当てます。EnableBeforeImagetrue の場合、FieldName は必須であり、空にすることはできません。

  • ColumnFilter オプションは、前イメージを使用して追加する列を指定します。テーブルのプライマリキーの一部である列だけを追加するには、デフォルト値 pk-only を使用します。前イメージ値を持つ列を追加するには、all を使用します。変更前イメージには、CLOB や BLOB などの LOB データ型の列が含まれていないことに注意します。

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注記

Amazon S3 ターゲットは BeforeImageSettings をサポートしていません。S3 ターゲットの場合、CDC 中に前イメージを実行するには add-before-image-columns 変換ルールのみを使用します。

前イメージ変換ルールの使用

タスク設定の代わりに、列変換ルールを適用する add-before-image-columns パラメータを使用できます。このパラメータを使用すると、Kinesis のようなデータストリーミングターゲットで CDC 中に前イメージを有効にできます。

変換ルールで add-before-image-columns を使用すると、前イメージの結果のよりきめ細かい制御を適用することができます。変換ルールを使用すると、オブジェクトロケーターを使用し、ルールに選択したテーブルを制御できます。また、変換ルールを連結することもできます。これにより、テーブルごとに異なるルールを適用できます。その後、他のルールを使用して生成された列を操作できます。

注記

同じタスク内で、add-before-image-columns パラメータと同時に BeforeImageSettings タスク設定を使用しないでください。代わりに、1 つのタスクにこのパラメータとこの設定のいずれかを使用し、両方を使用しないでください。

列の add-before-image-columns パラメータを持つ transformation ルールタイプは、before-image-def セクションを提供する必要があります。例を以下に示します。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix の値は列名の前に付加され、column-prefix のデフォルト値は BI_ です。column-suffix の値は列名に追加され、デフォルトは空です。column-prefixcolumn-suffix の両方を空の文字列に設定しないでください。

column-filter の値を 1 つ選択します。テーブルのプライマリキーの一部である列だけを追加するには、pk-only を選択します。LOB タイプではない列のみを追加するように non-lob を選択します。または、前イメージの値を持つ任意の列を追加するように all を選択します。

前イメージ変換前ルールの例

次の例の変換ルールは、ターゲットに BI_emp_no という新しい列を追加します。したがって、UPDATE employees SET emp_no = 3 WHERE emp_no = 1; のようなステートメントは、BI_emp_no フィールドに 1 を設定します。CDC 更新を Amazon S3 ターゲットに書き込むと、更新された元の行は BI_emp_no 列からわかります。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns ルールアクションの使用方法については、「 変換ルールおよび変換アクション」をご参照ください。

のターゲットとして Kinesis データストリームを使用するための前提条件 AWS Database Migration Service

のターゲットとして Kinesis データストリームを使用するための IAM ロール AWS Database Migration Service

Kinesis データストリームを のターゲットとして設定する前に AWS DMS、IAM ロールを作成してください。このロールは AWS DMS 、移行先の Kinesis データストリームへのアクセスを が引き受け、許可できるようにする必要があります。アクセス許可の最小設定は、次の IAM ポリシーで示します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Kinesis データストリームに移行する際に使用するロールには、次のアクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:region:accountID:stream/streamName" } ] }

のターゲットとしての Kinesis データストリームへのアクセス AWS Database Migration Service

AWS DMS バージョン 3.4.7 以降では、Kinesis エンドポイントに接続するには、次のいずれかを実行する必要があります。

のターゲットとして Kinesis Data Streams を使用する場合の制限 AWS Database Migration Service

ターゲットとして Kinesis Data Streams を使用する場合、以下の制限が適用されます。

  • AWS DMS は、トランザクションに関係なく、各更新を特定の Kinesis データストリーム内の 1 つのデータレコードとしてソースデータベース内の 1 つのレコードに発行します。ただし、KinesisSettings API の関連パラメータを使用して、各データレコードのトランザクションの詳細を含めることができます。

  • 完全 LOB モードはサポートされていません。

  • サポートされる最大 LOB サイズは 1 MB です。

  • Kinesis Data Streamsは、重複排除をサポートしていません。ストリームからデータを消費するアプリケーションは、重複したレコードを処理する必要があります。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「重複レコードの処理」をご参照ください。

  • AWS DMS では、パーティションキーに次の 2 つのフォームがサポートされています。

    • SchemaName.TableName: スキーマとテーブル名の組み合わせ。

    • ${AttributeName}: JSON のいずれかのフィールドの値、またはソースデータベースのテーブルのプライマリキー。

  • Kinesis Data Streams 内の保管中のデータの暗号化の詳細については、AWS Key Management Service デベロッパーガイドの「Kinesis Data Streams でのデータ保護」をご参照ください。

  • BatchApplyは Kinesis エンドポイントではサポートされていません。Kinesis ターゲットにバッチ 適用を使用 (例えば、BatchApplyEnabled ターゲットメタデータ タスク設定) すると、データが失われる可能性があります。

  • Kinesis ターゲットは、レプリケーションインスタンスと同じ AWS AWS リージョン アカウントおよび の Kinesis データストリームでのみサポートされます。

  • MySQL ソースから移行する場合、BeforeImage データには CLOB データ型と BLOB データ型は含まれません。詳細については、「前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示」を参照してください。

  • AWS DMS は、16 桁を超えるBigIntデータ型の値の移行をサポートしていません。この制限を回避するには、次の変換ルールを使用して BigInt 列を文字列に変換できます。変換ルールの詳細については、「 変換ルールおよび変換アクション」を参照してください。

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
  • 1 つのトランザクション内の複数の DML オペレーションがソースデータベースの Large Object (LOB) 列を変更すると、ターゲットデータベースはそのトランザクションの最後のオペレーションの最終的な LOB 値のみを保持します。同じトランザクションの以前のオペレーションによって設定された中間 LOB 値は上書きされ、データ損失や不整合が発生する可能性があります。この動作は、レプリケーション中に LOB データが処理される方法が原因で発生します。

Kinesis データストリームにデータを移行するためのオブジェクトマッピングの使用

AWS DMS はテーブルマッピングルールを使用して、ソースからターゲット Kinesis データストリームにデータをマッピングします。ターゲットストリームにデータをマッピングするために、オブジェクトマッピングと呼ばれるテーブルマッピングルールのタイプを使用します。オブジェクトマッピングを使用して、ソースのデータレコードがどのように Kinesis データストリームに発行されたデータレコードにマッピングされるかを定義します。

Kinesis データストリームには、パーティション キー以外にプリセット構造はありません。オブジェクトマッピングルールでは、データレコードの partition-key-type に指定できる値は、schema-tabletransaction-idprimary-keyconstantattribute-name です。

オブジェクトマッピングルールを作成するには、object-mapping として rule-type を指定します。このルールが、使用したいオブジェクトマッピングのタイプを指定します。

ルールの構造は次のとおりです。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS は現在、 rule-actionパラメータの唯一の有効な値map-record-to-documentとして map-record-to-recordおよび をサポートしています。これらの設定は、exclude-columns 属性リストの一部として除外されない値に影響します。map-record-to-recordmap-record-to-documentの値は、デフォルトで がこれらのレコード AWS DMS を処理する方法を指定します。これらの値は、どのような方法でも属性マッピングに影響しません。

リレーショナルデータベースから Kinesis データストリームに移行する際に map-record-to-record を使用します。このルールタイプでは、Kinesis データストリームのパーティション キーとしてリレーショナルデータベースから taskResourceId.schemaName.tableName 値を使用し、ソースデータベース内の各列の属性を作成します。

map-record-to-record を使用する場合は、次の点に注意します。

  • この設定は、exclude-columns リストで除外されている列にのみ影響します。

  • このような列ごとに、 はターゲットトピックに対応する属性 AWS DMS を作成します。

  • AWS DMS は、ソース列が属性マッピングで使用されているかどうかにかかわらず、この対応する属性を作成します。

map-record-to-document を使用して、属性名「_doc」を使用しソース列を適切なターゲットストリーム内の単一のフラットドキュメントに配置します。 AWS DMS 说「_doc」という名前のソースで 1 つのフラットマップにデータを配置します。この配置は、exclude-columns 属性リストに含まれていないソーステーブル内のすべての列に適用されます。

map-record-to-record を理解するための 1 つの方法は、実際の動作を確認することです。この例では、次の構造とデータを含むリレーショナルデータベースのテーブルの行から始めると想定してください。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

02/29/1988

この情報を Test という名前のスキーマから Kinesis データストリームに移行するには、データをターゲットストリームにマッピングするルールを作成します。以下のルールはマッピングを示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

次は、Kinesis データストリームの結果のレコード形式を示しています:

  • StreamName: XXX

  • PartitionKey: Test.Customers //schmaName.tableName

  • データ: //次の JSON メッセージ

    { "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

ただし、同じルールを使用しますが、rule-action パラメータを map-record-to-document に変更して特定の列を除外します。以下のルールはマッピングを示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }

この場合、exclude-columns パラメータ、FirstNameLastNameStoreIdDateOfBirth_doc にマッピングされます。次は、この結果のレコード形式を示しています。

{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }

属性マッピングを使用したデータの再構築

属性マップを使用してデータを Kinesis データストリームに移行している間にデータを再構築できます。例えば、ソース内の複数のフィールドを結合してターゲット内に 1 つのフィールドを構成することもできます。以下の属性マップはデータを再構築する方法を示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key の定数値を設定するには、partition-key 値を指定します。たとえば、すべてのデータを 1 つのシャードに強制的に格納するためにこれを行うことができます。以下のマッピングはこの方法を示しています。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注記

特定のテーブル用のコントロールレコードの partition-key 値は、TaskId.SchemaName.TableName です。特定のタスク用のコントロールレコードの partition-key 値は、そのレコードの TaskId です。オブジェクトマッピングの partition-key 値を指定することは、コントロールレコードの partition-key には影響しません。

Kinesis Data Streams のメッセージ形式

JSON 出力は、単にキーと値のペアのリストです。JSON_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。

AWS DMS には、Kinesis Data Streams からのデータを簡単に使用できるように、次の予約済みフィールドが用意されています。

RecordType

レコードタイプはデータまたはコントロールのいずれかです。データレコードは、ソースの実際の行を表します。コントロールレコードは、タスクの再起動など、ストリーム内の重要なイベント用です。

Operation

データレコードの場合、オペレーションは loadinsertupdate、または delete です。

コントロールレコードの場合、オペレーションは create-tablerename-tabledrop-tablechange-columnsadd-columndrop-columnrename-columncolumn-type-change です。

SchemaName

レコードのソーススキーマ。コントロールレコードの場合、このフィールドは空です。

TableName

レコードのソーステーブル。コントロールレコードの場合、このフィールドは空です。

Timestamp

JSON メッセージが構築された時刻のタイムスタンプ。このフィールドは ISO 8601 形式でフォーマットされます。