翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
のターゲットとしての Amazon Kinesis Data Streams の使用 AWS Database Migration Service
を使用して AWS DMS 、Amazon Kinesis データストリームにデータを移行できます。Amazon Kinesis Data Streams サービスは、Amazon Kinesis Data Streams サービスの一部です。データレコードの大量のストリームをリアルタイムで収集し、処理するには、Kinesis データストリームを使用します。
Kinesis データストリームはシャードで構成されています。シャードは、ストリーム内のデータレコードの一意に識別されたシーケンスです。Amazon Kinesis Data Streams におけるシャードの詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「シャード」をご参照ください。
AWS Database Migration Service は、JSON を使用してレコードを Kinesis データストリームに発行します。変換時に、 AWS DMS はソースデータベースから各レコードを JSON 形式または JSON_UNFORMATTED メッセージ形式の属性と値のペアにシリアル化します。JSON_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。これにより、Amazon Data Firehose は Amazon S3 の宛先に Kinesis データを配信し、Amazon Athena を含むさまざまなクエリ エンジンを使用してクエリを実行できます。
サポートされている任意のデータソースから、ターゲットストリームにデータを移行するために、オブジェクトのマッピングを使用します。オブジェクトマッピングを使用して、ストリームにデータレコードを構築する方法を決定します。データをそのシャードにグループ化するために Kinesis Data Streams で使用する、各テーブルのパーティション キーも定義します。
が Kinesis Data Streams ターゲットエンドポイントにテーブル AWS DMS を作成すると、ソースデータベースエンドポイントと同じ数のテーブルが作成されます。 AWS DMS また、 はいくつかの Kinesis Data Streams パラメータ値も設定します。テーブル作成のコストは、データの量および移行するテーブルの数によって異なります。
注記
AWS DMS コンソールまたは API の SSL モードオプションは、一部のデータストリーミングや、Kinesis や DynamoDB などの NoSQL サービスには適用されません。これらはデフォルトで安全であるため、SSL モードの設定が none (SSL Mode=None) と等しい AWS DMS ことを示しています。SSL を使用するために、エンドポイントに追加の設定は必要ありません。例えば、Kinesis をターゲット エンドポイントとして使用する場合、デフォルトでセキュリティで保護されます。Kinesis へのすべての API コールは SSL を使用するため、 AWS DMS エンドポイントに追加の SSL オプションは必要ありません。 AWS DMS が Kinesis Data Stream への接続時にデフォルトで使用する HTTPS プロトコルを使用して、SSL エンドポイント経由で安全にデータを保存して取得できます。
Kinesis Data Streams エンドポイント設定
Kinesis Data Streams ターゲットエンドポイントを使用すると、 AWS DMS API の KinesisSettings
オプションを使用してトランザクションとコントロールの詳細を取得できます。
接続は、次のいずれかの方法で設定できます。
AWS DMS コンソールで、エンドポイント設定を使用します。
CLI では、CreateEndpoint コマンド の
kinesis-settings
オプションを使用します。
CLI で、次の kinesis-settings
オプションのリクエストパラメータを使用します。
注記
AWS DMS バージョン 3.4.1 以降では、IncludeNullAndEmpty
エンドポイント設定に対応しています。ただし、Kinesis Data Streams ターゲットの他のエンドポイント設定のサポートは で利用できます AWS DMS。
MessageFormat
- エンドポイントで作成されたレコードの出力形式。メッセージ形式はJSON
(デフォルト) またはJSON_UNFORMATTED
(タブなし 1 行) です。-
IncludeControlDetails
- Kinesis メッセージ出力に、テーブル定義、列定義、テーブル、列の変更の詳細な制御情報を表示します。デフォルト:false
。 -
IncludeNullAndEmpty
— ターゲットに NULL 列と空の列を含めます。デフォルト:false
。 -
IncludePartitionValue
– パーティションタイプがschema-table-type
でない限り、Kinesis メッセージ出力内のパーティション値を表示します。デフォルト:false
。 -
IncludeTableAlterOperations
–rename-table
、drop-table
、add-column
、drop-column
、rename-column
など、制御データのテーブルを変更するデータ定義言語 (DDL) オペレーションが含まれます。デフォルト:false
。 -
IncludeTransactionDetails
- ソースデータベースからの詳細のトランザクション情報を提供します。この情報には、コミットタイムスタンプ、ログの位置、transaction_id
、previous_transaction_id
、およびtransaction_record_id
(トランザクション内のレコードオフセット) の値が含まれます。デフォルト:false
。 -
PartitionIncludeSchemaTable
パーティションタイプがprimary-key-type
の場合、スキーマとテーブル名をパーティション値にプレフィックスします。これにより、Kinesis シャード間のデータ分散が増加します。例えば、SysBench
スキーマに数千のテーブルがあり、各テーブルのプライマリ キーの範囲が制限されているとします。この場合、同じプライマリキーが数千のテーブルから同じシャードに送信され、スロットリングが発生します。デフォルト:false
。 -
useLargeIntegerValue
– AWS DMS バージョン 3.5.4 から入手可能な、整数を倍精度でキャストするのではなく、最大 18 桁の整数を使用します。デフォルトは False です。
次の例で、 AWS CLIを使用して発行された例 create-endpoint
コマンドを使用中の kinesis-settings
オプションを示します。
aws dms create-endpoint --endpoint-identifier=$target_name --engine-name kinesis --endpoint-type target --region us-east-1 --kinesis-settings ServiceAccessRoleArn=arn:aws:iam::333333333333:role/dms-kinesis-role, StreamArn=arn:aws:kinesis:us-east-1:333333333333:stream/dms-kinesis-target-doc,MessageFormat=json-unformatted, IncludeControlDetails=true,IncludeTransactionDetails=true,IncludePartitionValue=true,PartitionIncludeSchemaTable=true, IncludeTableAlterOperations=true
マルチスレッド全ロードタスク設定
転送速度を向上させるために、 は Kinesis Data Streams ターゲットインスタンスへのマルチスレッド全ロード AWS DMS をサポートします。DMS では、このマルチスレッドでのタスク設定を、以下でサポートします。
-
MaxFullLoadSubTasks
- 並列ロードするソーステーブルの最大数を指定するにはこのオプションを使用します。DMS は、専用のサブタスクを使用して、対応する Kinesis ターゲットテーブルに各テーブルをロードします。デフォルトは 8、最大値は 49 です。 -
ParallelLoadThreads
– このオプションを使用して、 AWS DMS が各テーブルを Kinesis ターゲットテーブルにロードするために使用するスレッドの数を指定します。Kinesis Data Streams ターゲットの最大値は 32 です。この上限を増やすよう依頼できます。 -
ParallelLoadBufferSize
– Kinesis ターゲットにデータをロードするために並列ロードスレッドが使用する、バッファ内に保存するレコードの最大数を指定するには、このオプションを使用します。デフォルト値は 50 です。最大値は 1000 です。この設定はParallelLoadThreads
で使用します。ParallelLoadBufferSize
は、複数のスレッドがある場合にのみ有効です。 -
ParallelLoadQueuesPerThread
- このオプションを使用して、各同時スレッドがキューからデータレコードを取り出し、ターゲットのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルトは 1 です。ただし、さまざまなペイロードサイズの Kinesis ターゲットの場合、有効な範囲は 1 スレッドあたり 5 ~ 512 キューです。
マルチスレッド CDC ロードタスクの設定
タスク設定を使用して PutRecords
API コールの動作を変更するなど、Kinesis などのリアルタイム データストリーミング ターゲット エンドポイントの変更データキャプチャ (CDC) のパフォーマンスを向上させることができます。これを行うには、ParallelApply*
タスク設定を使用して、同時スレッドの数、スレッドあたりのキュー数、バッファに格納するレコード数を指定します。例えば、CDC ロードを実行し、128 本のスレッドを並列に適用するとします。また、スレッドあたり 64 個のキューにアクセスして、バッファあたり 50 個のレコードを保存する必要があります。
CDC のパフォーマンスを向上させるために、 では以下のタスク設定 AWS DMS をサポートしています。
-
ParallelApplyThreads
– CDC ロード中に AWS DMS がデータレコードを Kinesis ターゲットエンドポイントにプッシュするために使用する同時スレッドの数を指定します。デフォルト値は 0 で、最大値は 32 です。 -
ParallelApplyBufferSize
– CDC ロード中に同時スレッドが Kinesis ターゲット エンドポイントにプッシュする場合に、各バッファキューに保存するレコードの最大数を指定します。デフォルト値は 100 で、最大値は 1,000 です。このオプションは、ParallelApplyThreads
で複数のスレッドを指定する場合に使用します。 -
ParallelApplyQueuesPerThread
– 各スレッドがキューからデータレコードを取り出し、CDC 中に Kinesis エンドポイントのバッチロードを生成するためにアクセスするキューの数を指定します。デフォルト値は 1、最大値は 512 です。
ParallelApply*
タスク設定を使用する場合、partition-key-type
のデフォルトは schema-name.table-name
ではなくテーブルの primary-key
です。
前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示
Kinesis のようなデータストリーミングターゲットに CDC 更新を書き込む場合、更新によって変更される前に、ソースデータベースの行の元の値を表示できます。これを可能にするために、 はソースデータベースエンジンから提供されたデータに基づいて更新イベントの前イメージ AWS DMS を設定します。
ソースデータベースエンジンによって、前イメージに対してさまざまな量の情報が提供されます。
-
Oracle では、列が変更された場合にのみ列の更新が提供されます。
-
PostgreSQL は、プライマリキーの一部である列のデータ (変更されたかどうか) のみを提供します。すべての列に (変更の有無を問わず) データを提供するには、
REPLICA_IDENTITY
をDEFAULT
でなくFULL
に設定する必要があります。各テーブルをREPLICA_IDENTITY
に設定する際は、慎重に行うべきであることに注意します。REPLICA_IDENTITY
をFULL
に設定すると、すべての列値がログ先行書き込み (WAL) に継続的に書き込まれます。これにより、頻繁に更新されるテーブルではパフォーマンスやリソースの問題が発生する可能性があります。 -
MySQL は通常、BLOB および CLOB データ型を除くすべての列のデータを (変更の有無を問わず) 提供します。
前イメージを有効にして、ソースデータベースから元の値を AWS DMS 出力に追加するには、BeforeImageSettings
タスク設定または add-before-image-columns
パラメータを使用します。このパラメータは、列変換ルールを適用します。
BeforeImageSettings
は、次に示すように、ソースデータベースシステムから収集された値を使用して、すべての更新オペレーションに新しい JSON 属性を追加します。
"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注記
フルロードと CDC BeforeImageSettings
AWS DMS タスク (既存のデータを移行して継続的な変更をレプリケートする) などの CDC コンポーネントを含むタスク、または CDC のみのタスク (データ変更のみをレプリケートする) にのみ適用されます。全ロードのタスクには BeforeImageSettings
を適用しないでください。
BeforeImageSettings
オプションには、次の項目が適用されます。
-
EnableBeforeImage
オプションをtrue
に設定して、前イメージを有効にします。デフォルト:false
。 -
FieldName
オプションを使用して、新しい JSON 属性に名前を割り当てます。EnableBeforeImage
がtrue
の場合、FieldName
は必須であり、空にすることはできません。 -
ColumnFilter
オプションは、前イメージを使用して追加する列を指定します。テーブルのプライマリキーの一部である列だけを追加するには、デフォルト値pk-only
を使用します。前イメージ値を持つ列を追加するには、all
を使用します。変更前イメージには、CLOB や BLOB などの LOB データ型の列が含まれていないことに注意します。"BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }
注記
Amazon S3 ターゲットは BeforeImageSettings
をサポートしていません。S3 ターゲットの場合、CDC 中に前イメージを実行するには add-before-image-columns
変換ルールのみを使用します。
前イメージ変換ルールの使用
タスク設定の代わりに、列変換ルールを適用する add-before-image-columns
パラメータを使用できます。このパラメータを使用すると、Kinesis のようなデータストリーミングターゲットで CDC 中に前イメージを有効にできます。
変換ルールで add-before-image-columns
を使用すると、前イメージの結果のよりきめ細かい制御を適用することができます。変換ルールを使用すると、オブジェクトロケーターを使用し、ルールに選択したテーブルを制御できます。また、変換ルールを連結することもできます。これにより、テーブルごとに異なるルールを適用できます。その後、他のルールを使用して生成された列を操作できます。
注記
同じタスク内で、add-before-image-columns
パラメータと同時に BeforeImageSettings
タスク設定を使用しないでください。代わりに、1 つのタスクにこのパラメータとこの設定のいずれかを使用し、両方を使用しないでください。
列の add-before-image-columns
パラメータを持つ transformation
ルールタイプは、before-image-def
セクションを提供する必要があります。例を以下に示します。
{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }
column-prefix
の値は列名の前に付加され、column-prefix
のデフォルト値は BI_
です。column-suffix
の値は列名に追加され、デフォルトは空です。column-prefix
と column-suffix
の両方を空の文字列に設定しないでください。
column-filter
の値を 1 つ選択します。テーブルのプライマリキーの一部である列だけを追加するには、pk-only
を選択します。LOB タイプではない列のみを追加するように non-lob
を選択します。または、前イメージの値を持つ任意の列を追加するように all
を選択します。
前イメージ変換前ルールの例
次の例の変換ルールは、ターゲットに BI_emp_no
という新しい列を追加します。したがって、UPDATE
employees SET emp_no = 3 WHERE emp_no = 1;
のようなステートメントは、BI_emp_no
フィールドに 1 を設定します。CDC 更新を Amazon S3 ターゲットに書き込むと、更新された元の行は BI_emp_no
列からわかります。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }
add-before-image-columns
ルールアクションの使用方法については、「 変換ルールおよび変換アクション」をご参照ください。
のターゲットとして Kinesis データストリームを使用するための前提条件 AWS Database Migration Service
のターゲットとして Kinesis データストリームを使用するための IAM ロール AWS Database Migration Service
Kinesis データストリームを のターゲットとして設定する前に AWS DMS、IAM ロールを作成してください。このロールは AWS DMS 、移行先の Kinesis データストリームへのアクセスを が引き受け、許可できるようにする必要があります。アクセス許可の最小設定は、次の IAM ポリシーで示します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "1", "Effect": "Allow", "Principal": { "Service": "dms.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
Kinesis データストリームに移行する際に使用するロールには、次のアクセス許可が必要です。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords" ], "Resource": "arn:aws:kinesis:
region
:accountID
:stream/streamName
" } ] }
のターゲットとしての Kinesis データストリームへのアクセス AWS Database Migration Service
AWS DMS バージョン 3.4.7 以降では、Kinesis エンドポイントに接続するには、次のいずれかを実行する必要があります。
VPC エンドポイントを使用するように DMS を設定します。VPC エンドポイントを使用するように DMS を設定する方法については、「AWS DMS ソースエンドポイントとターゲットエンドポイントとしての VPC エンドポイントの設定」を参照してください。
パブリックルートを使用するように DMS を設定します。つまり、レプリケーションインスタンスをパブリックにします。パブリックレプリケーションインスタンスの詳細については、「パブリックおよびプライベートレプリケーション インスタンス」を参照してください。
のターゲットとして Kinesis Data Streams を使用する場合の制限 AWS Database Migration Service
ターゲットとして Kinesis Data Streams を使用する場合、以下の制限が適用されます。
-
AWS DMS は、トランザクションに関係なく、各更新を特定の Kinesis データストリーム内の 1 つのデータレコードとしてソースデータベース内の 1 つのレコードに発行します。ただし、
KinesisSettings
API の関連パラメータを使用して、各データレコードのトランザクションの詳細を含めることができます。 -
完全 LOB モードはサポートされていません。
-
サポートされる最大 LOB サイズは 1 MB です。
-
Kinesis Data Streamsは、重複排除をサポートしていません。ストリームからデータを消費するアプリケーションは、重複したレコードを処理する必要があります。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「重複レコードの処理」をご参照ください。
-
AWS DMS では、パーティションキーに次の 2 つのフォームがサポートされています。
-
SchemaName.TableName
: スキーマとテーブル名の組み合わせ。 -
${AttributeName}
: JSON のいずれかのフィールドの値、またはソースデータベースのテーブルのプライマリキー。
-
-
Kinesis Data Streams 内の保管中のデータの暗号化の詳細については、AWS Key Management Service デベロッパーガイドの「Kinesis Data Streams でのデータ保護」をご参照ください。
-
BatchApply
は Kinesis エンドポイントではサポートされていません。Kinesis ターゲットにバッチ 適用を使用 (例えば、BatchApplyEnabled
ターゲットメタデータ タスク設定) すると、データが失われる可能性があります。 -
Kinesis ターゲットは、レプリケーションインスタンスと同じ AWS AWS リージョン アカウントおよび の Kinesis データストリームでのみサポートされます。
-
MySQL ソースから移行する場合、BeforeImage データには CLOB データ型と BLOB データ型は含まれません。詳細については、「前イメージを使用した Kinesis データストリームの CDC 行の元の値のターゲットとしての表示」を参照してください。
-
AWS DMS は、16 桁を超える
BigInt
データ型の値の移行をサポートしていません。この制限を回避するには、次の変換ルールを使用してBigInt
列を文字列に変換できます。変換ルールの詳細については、「 変換ルールおよび変換アクション」を参照してください。{ "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }
-
1 つのトランザクション内の複数の DML オペレーションがソースデータベースの Large Object (LOB) 列を変更すると、ターゲットデータベースはそのトランザクションの最後のオペレーションの最終的な LOB 値のみを保持します。同じトランザクションの以前のオペレーションによって設定された中間 LOB 値は上書きされ、データ損失や不整合が発生する可能性があります。この動作は、レプリケーション中に LOB データが処理される方法が原因で発生します。
Kinesis データストリームにデータを移行するためのオブジェクトマッピングの使用
AWS DMS はテーブルマッピングルールを使用して、ソースからターゲット Kinesis データストリームにデータをマッピングします。ターゲットストリームにデータをマッピングするために、オブジェクトマッピングと呼ばれるテーブルマッピングルールのタイプを使用します。オブジェクトマッピングを使用して、ソースのデータレコードがどのように Kinesis データストリームに発行されたデータレコードにマッピングされるかを定義します。
Kinesis データストリームには、パーティション キー以外にプリセット構造はありません。オブジェクトマッピングルールでは、データレコードの partition-key-type
に指定できる値は、schema-table
、transaction-id
、primary-key
、constant
、attribute-name
です。
オブジェクトマッピングルールを作成するには、object-mapping
として rule-type
を指定します。このルールが、使用したいオブジェクトマッピングのタイプを指定します。
ルールの構造は次のとおりです。
{ "rules": [ { "rule-type": "object-mapping", "rule-id": "
id
", "rule-name": "name
", "rule-action": "valid object-mapping rule action
", "object-locator": { "schema-name": "case-sensitive schema name
", "table-name": "" } } ] }
AWS DMS は現在、 rule-action
パラメータの唯一の有効な値map-record-to-document
として map-record-to-record
および をサポートしています。これらの設定は、exclude-columns
属性リストの一部として除外されない値に影響します。map-record-to-record
と map-record-to-document
の値は、デフォルトで がこれらのレコード AWS DMS を処理する方法を指定します。これらの値は、どのような方法でも属性マッピングに影響しません。
リレーショナルデータベースから Kinesis データストリームに移行する際に map-record-to-record
を使用します。このルールタイプでは、Kinesis データストリームのパーティション キーとしてリレーショナルデータベースから taskResourceId.schemaName.tableName
値を使用し、ソースデータベース内の各列の属性を作成します。
map-record-to-record
を使用する場合は、次の点に注意します。
この設定は、
exclude-columns
リストで除外されている列にのみ影響します。このような列ごとに、 はターゲットトピックに対応する属性 AWS DMS を作成します。
AWS DMS は、ソース列が属性マッピングで使用されているかどうかにかかわらず、この対応する属性を作成します。
map-record-to-document
を使用して、属性名「_doc」を使用しソース列を適切なターゲットストリーム内の単一のフラットドキュメントに配置します。 AWS DMS 说「_doc
」という名前のソースで 1 つのフラットマップにデータを配置します。この配置は、exclude-columns
属性リストに含まれていないソーステーブル内のすべての列に適用されます。
map-record-to-record
を理解するための 1 つの方法は、実際の動作を確認することです。この例では、次の構造とデータを含むリレーショナルデータベースのテーブルの行から始めると想定してください。
FirstName | LastName | StoreId | HomeAddress | HomePhone | WorkAddress | WorkPhone | DateofBirth |
---|---|---|---|---|---|---|---|
Randy |
Marsh | 5 |
221B Baker Street |
1234567890 |
31 Spooner Street, Quahog |
9876543210 |
02/29/1988 |
この情報を Test
という名前のスキーマから Kinesis データストリームに移行するには、データをターゲットストリームにマッピングするルールを作成します。以下のルールはマッピングを示しています。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }
次は、Kinesis データストリームの結果のレコード形式を示しています:
-
StreamName: XXX
-
PartitionKey: Test.Customers //schmaName.tableName
-
データ: //次の JSON メッセージ
{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }
ただし、同じルールを使用しますが、rule-action
パラメータを map-record-to-document
に変更して特定の列を除外します。以下のルールはマッピングを示しています。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "exclude-columns": [ "homeaddress", "homephone", "workaddress", "workphone" ] } } ] }
この場合、exclude-columns
パラメータ、FirstName
、LastName
、StoreId
、DateOfBirth
は _doc
にマッピングされます。次は、この結果のレコード形式を示しています。
{ "data":{ "_doc":{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "DateOfBirth": "02/29/1988" } } }
属性マッピングを使用したデータの再構築
属性マップを使用してデータを Kinesis データストリームに移行している間にデータを再構築できます。例えば、ソース内の複数のフィールドを結合してターゲット内に 1 つのフィールドを構成することもできます。以下の属性マップはデータを再構築する方法を示しています。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }
partition-key
の定数値を設定するには、partition-key
値を指定します。たとえば、すべてのデータを 1 つのシャードに強制的に格納するためにこれを行うことができます。以下のマッピングはこの方法を示しています。
{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKinesis", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注記
特定のテーブル用のコントロールレコードの partition-key
値は、TaskId.SchemaName.TableName
です。特定のタスク用のコントロールレコードの partition-key
値は、そのレコードの TaskId
です。オブジェクトマッピングの partition-key
値を指定することは、コントロールレコードの partition-key
には影響しません。
Kinesis Data Streams のメッセージ形式
JSON 出力は、単にキーと値のペアのリストです。JSON_UNFORMATTED メッセージ形式は、改行区切り文字を含む単一行の JSON 文字列です。
AWS DMS には、Kinesis Data Streams からのデータを簡単に使用できるように、次の予約済みフィールドが用意されています。
- RecordType
-
レコードタイプはデータまたはコントロールのいずれかです。データレコードは、ソースの実際の行を表します。コントロールレコードは、タスクの再起動など、ストリーム内の重要なイベント用です。
- Operation
-
データレコードの場合、オペレーションは
load
、insert
、update
、またはdelete
です。コントロールレコードの場合、オペレーションは
create-table
、rename-table
、drop-table
、change-columns
、add-column
、drop-column
、rename-column
、column-type-change
です。 - SchemaName
-
レコードのソーススキーマ。コントロールレコードの場合、このフィールドは空です。
- TableName
-
レコードのソーステーブル。コントロールレコードの場合、このフィールドは空です。
- Timestamp
-
JSON メッセージが構築された時刻のタイムスタンプ。このフィールドは ISO 8601 形式でフォーマットされます。