テーブル API コネクタ - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

テーブル API コネクタ

Apache Flink プログラミングモデルでは、コネクタは、アプリケーションが他の AWS サービスなどの外部ソースからデータを読み書きするために使用するコンポーネントです。

Apache Flink テーブル API では、以下のタイプのコネクタを使用できます。

  • テーブル API ソース: テーブルAPIソースコネクタを使用して、APIコールまたはSQLクエリを使用して TableEnvironment 内にテーブルを作成します。

  • テーブル API シンク: SQL コマンドを使用して、Amazon MSK トピックや Amazon S3 バケットなどの外部ソースにテーブルデータを書き込みます。

テーブル API ソース

データストリームからテーブルソースを作成します。次のコードは Amazon MSK トピックからテーブルを作成します。

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

テーブルソースの詳細については、Apache Flink ドキュメントの「テーブルと SQL コネクタ」を参照してください。

テーブル API シンク

テーブルデータをシンクに書き込むには、SQL でシンクを作成し、その StreamTableEnvironment オブジェクトで SQL ベースのシンクを実行します。

次のコードの例は、テーブルのデータを Amazon S3 シンクに書き込む方法を示しています。

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

この format パラメータを使用して、Apache Flink 用 Managed Serviceが出力をシンクに書き込む際に使用するフォーマットを制御できます。形式の詳細については、「Apache Flink ドキュメント」の「サポートされているコネクタ」を参照してください。

ユーザー定義のソースとシンク

既存の Apache Kafka コネクタを使用して、Amazon MSK や Amazon S3 などの他の AWS サービスとの間でデータを送受信できます。他のデータソースや送信先とやり取りする場合は、独自のソースとシンクを定義できます。詳細については、「Apache Flink ドキュメント」の「ユーザー定義のソースとシンク」を参照してください。