Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
テーブルAPIコネクタ
Apache Flink プログラミングモデルでは、コネクタは、アプリケーションが他の AWS サービスなどの外部ソースからデータを読み取りまたは書き込みするために使用するコンポーネントです。
Apache Flink Table ではAPI、次のタイプのコネクタを使用できます。
テーブルAPIソース: テーブルAPIソースコネクタを使用して、API呼び出しまたはSQLクエリ
TableEnvironment
を使用して 内にテーブルを作成します。テーブルAPIシンク: SQL コマンドを使用して、Amazon MSKトピックや Amazon S3 バケットなどの外部ソースにテーブルデータを書き込みます。
テーブルAPIソース
データストリームからテーブルソースを作成します。次のコードは、Amazon MSKトピックからテーブルを作成します。
//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);
テーブルソースの詳細については、Apache Flink ドキュメントの「テーブルとSQLコネクタ
テーブルAPIシンク
テーブルデータをシンクに書き込むには、 でシンクを作成しSQL、StreamTableEnvironment
オブジェクトで SQLベースのシンクを実行します。
次のコードの例は、テーブルのデータを Amazon S3 シンクに書き込む方法を示しています。
final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");
この format
パラメータを使用して、Apache Flink 用 Managed Serviceが出力をシンクに書き込む際に使用するフォーマットを制御できます。形式の詳細については、Apache Flink ドキュメントの「サポートされているコネクタ
ユーザー定義のソースとシンク
既存の Apache Kafka コネクタを使用して、Amazon MSKや Amazon S3 などの他の AWS のサービスとの間でデータを送信できます。他のデータソースや送信先とやり取りする場合は、独自のソースとシンクを定義できます。詳細については、Apache Flink ドキュメントの「ユーザー定義のソースとシンク