

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、[こちら](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis
<a name="Kinesis"></a>

## の使用 Amazon Managed Service for Apache Flink
<a name="kinesis-via-flink"></a>

Managed Service for Apache Flink のサンプルデータコネクタを使用して、Kinesis Data Streams から Timestream for LiveAnalytics に Timestream データを送信できます。詳細については、「[Amazon Managed Service for Apache Flink](ApacheFlink.md) for Apache Flink」を参照してください。

## EventBridge Pipes を使用して Kinesis データを に送信する Timestream
<a name="Kinesis-via-pipes"></a>

 EventBridge Pipes を使用して、Kinesis ストリームから Amazon Timestream for LiveAnalytics テーブルにデータを送信できます。

パイプは、サポートされているソースとターゲット間のポイントツーポイント統合を目的としており、高度な変換とエンリッチメントをサポートしています。パイプを使用すると、イベント駆動型アーキテクチャを開発する際に専門知識や統合コードが不要になります。パイプをセットアップするには、ソースを選択し、オプションのフィルタリングを追加し、オプションのエンリッチメントを定義し、イベントデータのターゲットを選択します。

![\[ソースは、一致するイベントをフィルタリングしてターゲットにルーティングするイベントを EventBridge パイプに送信します。\]](http://docs.aws.amazon.com/ja_jp/timestream/latest/developerguide/images/pipes-overview_shared_architecture.png)


この統合により、データの取り込みパイプラインを簡素化しながら、 Timestream時系列データ分析機能の能力を活用できます。

 EventBridge Pipes を で使用する Timestream と、次の利点があります。
+ リアルタイムのデータインジェスト: Kinesis から Timestream for LiveAnalytics にデータを直接ストリーミングすることで、リアルタイムの分析とモニタリングが可能になります。
+ シームレスな統合: EventBridge Pipes を使用して、複雑なカスタム統合を必要とせずにデータフローを管理します。
+ 拡張フィルタリングと変換: 特定のデータ処理要件を満たす Timestream ために、 に保存される前に Kinesis レコードをフィルタリングまたは変換します。
+ スケーラビリティ: 高スループットのデータストリームを処理し、組み込みの並列処理とバッチ処理機能を使用して効率的なデータ処理を実現します。

### 設定
<a name="Kinesis-via-pipes-config"></a>

Kinesis から にデータをストリーミングするように EventBridge Pipe を設定するには Timestream、次の手順に従います。

1. Kinesis Stream を作成する

   データを取り込むアクティブな Kinesis データストリームがあることを確認します。

1.  Timestream データベースとテーブルを作成する

   データを保存する Timestream データベースとテーブルを設定します。

1.  EventBridge パイプを設定します。
   + ソース: ソースとして Kinesis ストリームを選択します。
   + ターゲット: ターゲット Timestream として を選択します。
   + バッチ処理設定: バッチ処理ウィンドウとバッチサイズを定義してデータ処理を最適化し、レイテンシーを短縮します。

**重要**  
パイプを設定するときは、いくつかのレコードを取り込んで、すべての設定の正確性をテストすることをお勧めします。パイプの作成が成功しても、パイプラインの正確性とエラーのないデータフローが保証されるわけではありません。マッピングの適用後に、誤ったテーブル、誤った動的パスパラメータ、無効な Timestream レコードなどのランタイムエラーが発生し、実際のデータがパイプを通過するときに検出される可能性があります。

以下の設定により、データが取り込まれる速度が決まります。
+ BatchSize: Timestream for LiveAnalytics に送信されるバッチの最大サイズ。範囲は 0～100 です。スループットを最大化するには、この値を 100 にすることをお勧めします。
+ MaximumBatchingWindowInSeconds: バッチが Timestream for LiveAnalytics ターゲットに送信される前に batchSize を満たすまでの最大待機時間。受信イベントのレートに応じて、この設定によって取り込みの遅延が決まります。この値は < 10 秒のままにして、データを Timestream ほぼリアルタイムで に送信し続けることをお勧めします。
+ ParallelizationFactor: 各シャードから同時に処理するバッチの数です。スループットを最大化し、ほぼリアルタイムで取り込むには、最大値の 10 を使用することをお勧めします。

  ストリームが複数のターゲットによって読み取られる場合は、拡張ファンアウトを使用してパイプに専用のコンシューマーを提供すると、高いスループットを実現できます。詳細については、「 *Kinesis Data Streams ユーザーガイド*[」の Kinesis Data Streams 「 API を使用した拡張ファンアウトコンシューマーの開発](https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html)」を参照してください。

**注記**  
達成できる最大スループットは、アカウントあたりの[同時パイプ実行数](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-quota.html#eb-pipes-limits)によって制限されます。

次の設定によってデータ損失を防止できます。
+ DeadLetterConfig: ユーザーエラーが原因でイベントを Timestream for LiveAnalytics に取り込むことができなかった場合にデータが失われないよう、DeadLetterConfig を常に設定することをお勧めします。

次の設定によってパイプのパフォーマンスを最適化することで、レコードが原因の速度低下やブロックを防ぐことができます。
+ MaximumRecordAgeInSeconds: この値より古いレコードは処理されず、DLQ に直接移動されます。この値は、ターゲット Timestream テーブルの設定済みメモリストアの保持期間を超えないように設定することをお勧めします。
+ MaximumRetryAttempts: レコードが DeadLetterQueue に送信されるまでの再試行回数。この値は 10 に設定することをお勧めします。これによって一時的な問題に対応できます。永続的な問題の場合、レコードは DeadLetterQueue に移動され、残りのストリームのブロックが解除されます。
+ OnPartialBatchItemFailure: 部分的なバッチ処理をサポートするソースの場合は、これを有効にして、DLQ にドロップ/送信する前に失敗レコードをさらに再試行するよう AUTOMATIC\$1BISECT に設定することをお勧めします。

#### 設定例
<a name="Kinesis-via-pipes-config-example"></a>

Kinesis ストリームから Timestream テーブルにデータをストリーミングするように EventBridge Pipe を設定する方法の例を次に示します。

**Example IAM の ポリシーの更新 Timestream**    
****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "timestream:WriteRecords"
            ],
            "Resource": [
                "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "timestream:DescribeEndpoints"
            ],
            "Resource": "*"
        }
    ]
}
```

**Example Kinesis ストリーム設定**  <a name="kinesis-stream-config.example"></a>

```
{
  "Source": "arn:aws:kinesis:us-east-1:123456789012:stream/my-kinesis-stream",
  "SourceParameters": {
    "KinesisStreamParameters": {
        "BatchSize": 100,
        "DeadLetterConfig": {
            "Arn": "arn:aws:sqs:us-east-1:123456789012:my-sqs-queue"
        },
       "MaximumBatchingWindowInSeconds": 5,
        "MaximumRecordAgeInSeconds": 1800,
        "MaximumRetryAttempts": 10,
        "StartingPosition": "LATEST",
       "OnPartialBatchItemFailure": "AUTOMATIC_BISECT"
    }
  }
}
```

**Example Timestream ターゲット設定**  <a name="kinesis-stream-config.example"></a>

```
{
    "Target": "arn:aws:timestream:us-east-1:123456789012:database/my-database/table/my-table",
    "TargetParameters": {
        "TimestreamParameters": {
            "DimensionMappings": [
                {
                    "DimensionName": "sensor_id",
                    "DimensionValue": "$.data.device_id",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "DimensionName": "sensor_type",
                    "DimensionValue": "$.data.sensor_type",
                    "DimensionValueType": "VARCHAR"
                },
                {
                    "DimensionName": "sensor_location",
                    "DimensionValue": "$.data.sensor_loc",
                    "DimensionValueType": "VARCHAR"
                }
            ],
            "MultiMeasureMappings": [
                {
                    "MultiMeasureName": "readings",
                    "MultiMeasureAttributeMappings": [
                        {
                            "MultiMeasureAttributeName": "temperature",
                            "MeasureValue": "$.data.temperature",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "MultiMeasureAttributeName": "humidity",
                            "MeasureValue": "$.data.humidity",
                            "MeasureValueType": "DOUBLE"
                        },
                        {
                            "MultiMeasureAttributeName": "pressure",
                            "MeasureValue": "$.data.pressure",
                            "MeasureValueType": "DOUBLE"
                        }
                    ]
                }
            ],
            "SingleMeasureMappings": [],
            "TimeFieldType": "TIMESTAMP_FORMAT",
            "TimestampFormat": "yyyy-MM-dd HH:mm:ss.SSS",
            "TimeValue": "$.data.time",
            "VersionValue": "$.approximateArrivalTimestamp"
        }
    }
}
```



### イベントの変換
<a name="Kinesis-via-pipes-trans"></a>

EventBridge パイプを使用すると、到達する前にデータを変換できます Timestream。変換ルールを定義して、フィールド名の変更など、受信 Kinesis レコードを変更できます。

 Kinesis ストリームに温度と湿度のデータが含まれているとします。 EventBridge 変換を使用して、これらのフィールドを挿入する前に名前を変更できます Timestream。

### ベストプラクティス
<a name="Kinesis-via-pipes-best"></a>

**バッチ処理とバッファリング**
+ 書き込みレイテンシーと処理効率のバランスを取るように、バッチ処理ウィンドウとバッチサイズを設定します。
+ バッチ処理ウィンドウを使用して処理前に十分なデータを蓄積し、高頻度の小さなバッチによるオーバーヘッドを削減します。

**並列処理**

**ParallelizationFactor** 設定を使用して、特にスループットの高いストリームの同時実行を増やします。これにより、各シャードの複数のバッチを同時に処理できます。

**データ変換**

 EventBridge Pipes の変換機能を活用して、レコードを保存する前にレコードをフィルタリングおよび強化します Timestream。これにより、データを分析要件に合わせることができます。

**セキュリティ**
+  EventBridge Pipes に使用される IAM ロールに、読み Kinesis 書きに必要なアクセス許可があることを確認します Timestream。
+ 暗号化とアクセス制御手段を使用して、転送中のデータおよび保管中のデータを保護します。

### デバッグの失敗
<a name="Kinesis-via-pipes-debug"></a>
+ **パイプの自動無効化**

  ターゲットが存在しないか、アクセス許可の問題がある場合、パイプは約 2 時間後に自動的に無効になります。
+ **Throttles**

  パイプには、スロットルが減少するまで自動的にバックオフして再試行する機能があります。
+ **ログの有効化**

  ログを ERROR レベルで有効にし、実行データを含めて、失敗に関する追加情報を取得することをお勧めします。障害が発生した場合、これらのログには、送受信されたリクエスト/レスポンスが含まれます Timestream。これにより、関連するエラーを理解し、必要に応じて修正後にレコードを再処理できます。

### モニタリング
<a name="Kinesis-via-pipes-monitor"></a>

データフローの問題を検出するために、以下に関するアラームを設定することをお勧めします。
+ ソースのレコードの最大保存期間
  + `GetRecords.IteratorAgeMilliseconds`
+ パイプの失敗メトリクス
  + `ExecutionFailed`
  + `TargetStageFailed`
+ Timestream API エラーの書き込み
  + `UserErrors`

その他のモニタリングメトリクスについては、「*EventBridge ユーザーガイド*」の「[EventBridgeのモニタリング](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-monitoring.html#eb-metrics)」を参照してください。