Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
テーブル API コネクタ
Apache Flink プログラミングモデルでは、コネクタは、アプリケーションが他の AWS サービスなどの外部ソースからデータを読み書きするために使用するコンポーネントです。
Apache Flink テーブル API では、以下のタイプのコネクタを使用できます。
テーブル API ソース: テーブルAPIソースコネクタを使用して、APIコールまたはSQLクエリを使用して
TableEnvironment
内にテーブルを作成します。テーブル API シンク: SQL コマンドを使用して、Amazon MSK トピックや Amazon S3 バケットなどの外部ソースにテーブルデータを書き込みます。
テーブル API ソース
データストリームからテーブルソースを作成します。次のコードは Amazon MSK トピックからテーブルを作成します。
//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);
テーブルソースの詳細については、Apache Flink ドキュメントの「テーブルと SQL コネクタ
テーブル API シンク
テーブルデータをシンクに書き込むには、SQL でシンクを作成し、その StreamTableEnvironment
オブジェクトで SQL ベースのシンクを実行します。
次のコードの例は、テーブルのデータを Amazon S3 シンクに書き込む方法を示しています。
final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");
この format
パラメータを使用して、Apache Flink 用 Managed Serviceが出力をシンクに書き込む際に使用するフォーマットを制御できます。形式の詳細については、「Apache Flink ドキュメント」の「サポートされているコネクタ
ユーザー定義のソースとシンク
既存の Apache Kafka コネクタを使用して、Amazon MSK や Amazon S3 などの他の AWS サービスとの間でデータを送受信できます。他のデータソースや送信先とやり取りする場合は、独自のソースとシンクを定義できます。詳細については、「Apache Flink ドキュメント」の「ユーザー定義のソースとシンク