Écrire des données à l'aide de récepteurs dans le service géré pour Apache Flink - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Écrire des données à l'aide de récepteurs dans le service géré pour Apache Flink

Dans le code de votre application, vous pouvez utiliser n'importe quel connecteur récepteur Apache Flink pour écrire dans des systèmes externes, y compris AWS des services tels que Kinesis Data Streams et DynamoDB.

Apache Flink fournit également des récepteurs pour les fichiers et les sockets, et vous pouvez implémenter des récepteurs personnalisés. Parmi les différents éviers pris en charge, les suivants sont fréquemment utilisés :

Utiliser les flux de données Kinesis

Apache Flink fournit des informations sur le connecteur Kinesis Data Streams dans la documentation d’Apache Flink.

Pour un exemple d’application qui utilise un flux de données Kinesis pour l’entrée et la sortie, consultez Tutoriel : Commencez à utiliser le DataStream API service géré pour Apache Flink.

Utiliser Apache Kafka et Amazon Managed Streaming pour Apache MSK Kafka ()

Le connecteur Apache Flink Kafka fournit un support complet pour la publication de données sur Apache Kafka et AmazonMSK, y compris des garanties « une seule fois ». Pour savoir comment écrire dans Kafka, consultez les exemples de connecteurs Kafka dans la documentation Apache Flink.

Utiliser Amazon S3

Vous pouvez utiliser le StreamingFileSink Apache Flink pour écrire des objets dans un compartiment Amazon S3.

Pour un exemple sur la façon d’écrire des objets dans S3, consultezExemple : écriture dans un compartiment Amazon S3.

Utilisez Firehose

FlinkKinesisFirehoseProducerIl s'agit d'un récepteur Apache Flink fiable et évolutif permettant de stocker les résultats des applications à l'aide du service Firehose. Cette section décrit comment configurer un projet Maven pour créer et utiliser un FlinkKinesisFirehoseProducer.

Créer une FlinkKinesisFirehoseProducer

L’exemple de code suivant illustre la création d’un FlinkKinesisFirehoseProducer :

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

Exemple de code FlinkKinesisFirehoseProducer

L'exemple de code suivant montre comment créer et configurer un flux de données Apache Flink FlinkKinesisFirehoseProducer et comment envoyer des données au service Firehose.

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Pour un didacticiel complet sur l'utilisation du lavabo Firehose, voir. Exemple : écrire dans Firehose