Utiliser des API connecteurs de table - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser des API connecteurs de table

Dans le modèle de programmation Apache Flink, les connecteurs sont des composants que votre application utilise pour lire ou écrire des données provenant de sources externes, telles que d'autres AWS services.

Avec la table Apache FlinkAPI, vous pouvez utiliser les types de connecteurs suivants :

  • APISources du tableau: vous utilisez les connecteurs de API source de table pour créer des tables au sein de votre ordinateur TableEnvironment à l'aide d'APIappels ou de SQL requêtes.

  • APIéviers de table: vous utilisez SQL des commandes pour écrire des données de table dans des sources externes telles qu'une MSK rubrique Amazon ou un compartiment Amazon S3.

APISources du tableau

Vous créez une source de table à partir d’un flux de données. Le code suivant crée un tableau à partir d'une MSK rubrique Amazon :

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Pour plus d'informations sur les sources de tables, voir Table et SQL connecteurs dans la documentation d'Apache Flink.

APIéviers de table

Pour écrire des données de table dans un récepteur, vous créez le récepteur dansSQL, puis vous exécutez le récepteur SQL basé sur l'StreamTableEnvironmentobjet.

L’exemple de code suivant illustre comment écrire des données de table sur un récepteur Amazon S3 :

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Vous pouvez utiliser le paramètre format pour contrôler le format utilisé par le service géré pour Apache Flink pour écrire la sortie sur le récepteur. Pour plus d'informations sur les formats, consultez la section Connecteurs pris en charge dans la documentation d'Apache Flink.

Sources et récepteurs définis par l'utilisateur

Vous pouvez utiliser les connecteurs Apache Kafka existants pour envoyer des données vers et depuis d'autres AWS services, tels qu'Amazon MSK et Amazon S3. Pour interagir avec d’autres sources de données et destinations, vous pouvez définir vos propres sources et récepteurs. Pour plus d'informations, consultez la section Sources et récepteurs définis par l'utilisateur dans la documentation d'Apache Flink.