テーブルAPIコネクタ - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

テーブルAPIコネクタ

Apache Flink プログラミングモデルでは、コネクタは、アプリケーションが他の AWS サービスなどの外部ソースからデータを読み取りまたは書き込みするために使用するコンポーネントです。

Apache Flink Table ではAPI、次のタイプのコネクタを使用できます。

  • テーブルAPIソース: テーブルAPIソースコネクタを使用して、API呼び出しまたはSQLクエリTableEnvironmentを使用して 内にテーブルを作成します。

  • テーブルAPIシンク: SQL コマンドを使用して、Amazon MSKトピックや Amazon S3 バケットなどの外部ソースにテーブルデータを書き込みます。

テーブルAPIソース

データストリームからテーブルソースを作成します。次のコードは、Amazon MSKトピックからテーブルを作成します。

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

テーブルソースの詳細については、Apache Flink ドキュメントの「テーブルとSQLコネクタ」を参照してください。

テーブルAPIシンク

テーブルデータをシンクに書き込むには、 でシンクを作成しSQL、StreamTableEnvironmentオブジェクトで SQLベースのシンクを実行します。

次のコードの例は、テーブルのデータを Amazon S3 シンクに書き込む方法を示しています。

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

この format パラメータを使用して、Apache Flink 用 Managed Serviceが出力をシンクに書き込む際に使用するフォーマットを制御できます。形式の詳細については、Apache Flink ドキュメントの「サポートされているコネクタ」を参照してください。

ユーザー定義のソースとシンク

既存の Apache Kafka コネクタを使用して、Amazon MSKや Amazon S3 などの他の AWS のサービスとの間でデータを送信できます。他のデータソースや送信先とやり取りする場合は、独自のソースとシンクを定義できます。詳細については、Apache Flink ドキュメントの「ユーザー定義のソースとシンク」を参照してください。