

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Table API コンポーネントを確認する
<a name="how-table"></a>

Apache Flink アプリケーションは、「[Apache Flink テーブル API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/)」を使用して、リレーショナルモデルを使用してストリーム内のデータを操作します。Table API を使用してテーブルソースを使用してデータにアクセスし、次にテーブル関数を使用してテーブルデータを変換およびフィルタリングします。API 関数または SQL コマンドを使用して、表形式のデータを変換およびフィルタリングできます。

このセクションは、以下のトピックで構成されます。
+ [Table API connectors](how-table-connectors.md): これらのコンポーネントは、アプリケーションと外部データソースおよび宛先との間でデータを移動します。
+ [Table API 時間属性](how-table-timeattributes.md): このトピックでは、Managed Service for Apache Flinkが Table API を使用する際にイベントをトラッキングする方法について説明します。

# Table API connectors
<a name="how-table-connectors"></a>

Apache Flink プログラミングモデルでは、コネクタは、アプリケーションが他の AWS サービスなどの外部ソースからデータを読み書きするために使用するコンポーネントです。

Apache Flink テーブル API では、以下のタイプのコネクタを使用できます。
+ [Table API ソース](#how-table-connectors-source): テーブルAPIソースコネクタを使用して、APIコールまたはSQLクエリを使用して `TableEnvironment` 内にテーブルを作成します。
+ [Table API シンク](#how-table-connectors-sink): SQL コマンドを使用して、Amazon MSK トピックや Amazon S3 バケットなどの外部ソースにテーブルデータを書き込みます。

## Table API ソース
<a name="how-table-connectors-source"></a>

データストリームからテーブルソースを作成します。次のコードは Amazon MSK トピックからテーブルを作成します。

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

テーブルソースの詳細については、「Apache Flink ドキュメント」の「[Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)」を参照してください。

## Table API シンク
<a name="how-table-connectors-sink"></a>

テーブルデータをシンクに書き込むには、SQL でシンクを作成し、その `StreamTableEnvironment` オブジェクトで SQL ベースのシンクを実行します。

次のコードの例は、テーブルのデータを Amazon S3 シンクに書き込む方法を示しています。

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 この `format` パラメータを使用して、Apache Flink 用 Managed Serviceが出力をシンクに書き込む際に使用するフォーマットを制御できます。フォーマットについて詳しくは、「Apache Flink ドキュメント」の「[Supported Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)」を参照してください。

## ユーザー定義のソースとシンク
<a name="how-table-connectors-userdef"></a>

既存の Apache Kafka コネクタを使用して、Amazon MSK や Amazon S3 などの他の AWS サービスとの間でデータを送受信できます。他のデータソースや送信先とやり取りする場合は、独自のソースとシンクを定義できます。詳細については、「Apache Flink ドキュメント」の「[User-defined Sources and Sinks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/)」を参照してください。

# Table API 時間属性
<a name="how-table-timeattributes"></a>

データストリーム内の各レコードには、そのレコードに関連するイベントがいつ発生したかを定義する複数のタイムスタンプがあります。
+ 「**イベント時間**」:レコードを作成したイベントがいつ発生したかを定義するユーザー定義のタイムスタンプ。
+ 「**取り込み時間**」:アプリケーションがデータストリームからレコードを取得した時刻。
+ 「**処理時間**」:アプリケーションがレコードを処理した時間。

Apache Flink Table API がレコード時間に基づいてウィンドウを作成する場合、`setStreamTimeCharacteristic` メソッドを使用して、どのタイムスタンプを使用するかを定義します。

Table API でのタイムスタンプの使用について詳しくは、「Apache Flink ドキュメント」の「[Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/)」と「[Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/)」を参照してください。