

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Vérifier les composants de DataStream l'API
<a name="how-datastream"></a>

Votre application Apache Flink utilise l'[ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) pour transformer les données en flux de données. 

Cette section décrit les différents composants qui déplacent, transforment et suivent les données :
+ [Utilisez des connecteurs pour déplacer des données dans le service géré pour Apache Flink avec l'API DataStream](how-connectors.md) : ces composants déplacent les données entre votre application et les sources de données et destinations externes.
+ [Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec l'API DataStream](how-operators.md) : ces composants transforment ou regroupent des éléments de données au sein de votre application.
+ [Suivez les événements dans le service géré pour Apache Flink à l'aide de l'API DataStream](how-time.md): cette rubrique décrit comment le service géré pour Apache Flink suit les événements lors de l'utilisation de l' DataStream API.

# Utilisez des connecteurs pour déplacer des données dans le service géré pour Apache Flink avec l'API DataStream
<a name="how-connectors"></a>

Dans l' DataStream API Amazon Managed Service for Apache Flink, les *connecteurs* sont des composants logiciels qui déplacent les données vers et depuis une application Managed Service for Apache Flink. Les connecteurs sont des intégrations flexibles qui vous permettent de lire des fichiers et des répertoires. Les connecteurs sont constitués de modules complets permettant d’interagir avec les services Amazon et les systèmes tiers.

Les types de connecteurs sont les suivants :
+ [Ajouter des sources de données de streaming](how-sources.md) : fournit des données à votre application à partir d’un flux de données Kinesis, d’un fichier ou d’une autre source de données.
+ [Écrire des données à l'aide de récepteurs](how-sinks.md): envoyez des données depuis votre application vers un flux de données Kinesis, un flux Firehose ou une autre destination de données.
+ [Utiliser des E/S asynchrones](how-async.md) : fournit un accès asynchrone à une source de données (telle qu’une base de données) pour enrichir les événements de flux. 

## Connecteurs disponibles
<a name="how-connectors-list"></a>

L’environnement Apache Flink contient des connecteurs permettant d’accéder aux données provenant de diverses sources. Pour obtenir des informations sur les connecteurs disponibles dans l’environnement Apache Flink, consultez [Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) dans la [documentation Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Avertissement**  
Si vous avez des applications exécutées sur Flink 1.6, 1.8, 1.11 ou 1.13 et que vous souhaitez les exécuter dans les régions du Moyen-Orient (EAU), de l'Asie-Pacifique (Hyderabad), d'Israël (Tel Aviv), de l'Europe (Zurich), du Moyen-Orient (EAU), de l'Asie-Pacifique (Melbourne) ou de l'Asie-Pacifique (Jakarta), vous devrez peut-être reconstruire l'archive de vos applications avec un connecteur mis à jour ou passer à Flink 1.18.   
Les connecteurs Apache Flink sont stockés dans leurs propres référentiels open source. Si vous effectuez une mise à niveau vers la version 1.18 ou ultérieure, vous devez mettre à jour vos dépendances. Pour accéder au référentiel des AWS connecteurs Apache Flink, consultez [flink-connector-aws](https://github.com/apache/flink-connector-aws).  
L'ancienne source Kinesis `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` n'est plus disponible et pourrait être supprimée dans une future version de Flink. Utilisez [plutôt Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Il n'y a aucune compatibilité entre les états `FlinkKinesisConsumer` et`KinesisStreamsSource`. Pour plus de détails, consultez la section [Migration de tâches existantes vers la nouvelle source Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) dans la documentation d'Apache Flink.  
 Voici les directives recommandées :   


**Améliorations de connecteurs**  

| Version Flink | Connecteur utilisé | Résolution | 
| --- | --- | --- | 
| 1,19, 1,20 | Source de Kinesis |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur source Kinesis Data Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1,19, 1,20 | Évier Kinesis |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur Kinesis Data Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1,19, 1,20 | Source des flux DynamoDB |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur source DynamoDB Streams le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1,19, 1,20 | Récepteur DynamoDB | Lors de la mise à niveau vers le service géré pour Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur DynamoDB le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1,19, 1,20 | Évier Amazon SQS |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous que vous utilisez le connecteur récepteur Amazon SQS le plus récent. Il doit s'agir de n'importe quelle version 5.0.0 ou ultérieure. Pour plus d'informations, consultez [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1,19, 1,20 | Service géré par Amazon pour Prometheus Sink |  Lors de la mise à niveau vers Managed Service for Apache Flink versions 1.19 et 1.20, assurez-vous d'utiliser le connecteur récepteur Amazon Managed Service for Prometheus le plus récent. Il doit s'agir de n'importe quelle version 1.0.0 ou ultérieure. Pour plus d'informations, consultez [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Ajouter des sources de données de streaming au service géré pour Apache Flink
<a name="how-sources"></a>

Apache Flink fournit des connecteurs pour lire à partir de fichiers, de sockets, de collections et de sources personnalisées. Dans le code de votre application, vous utilisez une [source Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) pour recevoir les données d’un flux. Cette section décrit les sources disponibles pour les services Amazon.

## Utiliser les flux de données Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`fournit des données de streaming à votre application à partir d'un flux de données Amazon Kinesis. 

### Créer une `KinesisStreamsSource`
<a name="input-streams-create"></a>

L’exemple de code suivant illustre la création d’un `KinesisStreamsSource` :

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Pour plus d'informations sur l'utilisation d'un`KinesisStreamsSource`, consultez le connecteur [Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) dans la documentation d'Apache Flink [et notre exemple KinesisConnectors public](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) sur Github.

### Créez un `KinesisStreamsSource` qui utilise un consommateur EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Il est désormais compatible avec [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Si un client Kinesis utilise EFO, le service Kinesis Data Streams lui fournit sa propre bande passante dédiée, au lieu que le consommateur partage la bande passante fixe du flux avec les autres consommateurs lisant le flux.

Pour plus d'informations sur l'utilisation d'EFO avec les consommateurs Kinesis, [consultez FLIP-128 : Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) for Kinesis Consumers. AWS 

Vous activez le consommateur EFO en définissant les paramètres suivants sur le consommateur Kinesis :
+ **READER\$1TYPE :** définissez ce paramètre **sur EFO** pour que votre application utilise un consommateur EFO pour accéder aux données Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME :** définissez ce paramètre sur une valeur de chaîne unique parmi les consommateurs de ce flux. La réutilisation d’un nom de consommateur dans le même flux de données Kinesis entraînera la résiliation du client qui utilisait ce nom précédemment. 

Pour configurer un `KinesisStreamsSource` afin d’utiliser EFO, ajoutez les paramètres suivants au consommateur :

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Pour un exemple de service géré pour une application Apache Flink utilisant un client EFO, consultez [notre exemple public de Kinesis Connectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) sur Github.

## Utiliser Amazon MSK
<a name="input-msk"></a>

La source `KafkaSource` fournit des données de streaming à votre application à partir d’une rubrique Amazon MSK. 

### Créer une `KafkaSource`
<a name="input-msk-create"></a>

L’exemple de code suivant illustre la création d’un `KafkaSource` :

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Pour plus d’informations sur l’utilisation d’un `KafkaSource`, consultez [Réplication MSK](earlier.md#example-msk).

# Écrire des données à l'aide de récepteurs dans le service géré pour Apache Flink
<a name="how-sinks"></a>

Dans le code de votre application, vous pouvez utiliser n'importe quel connecteur [récepteur Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) pour écrire dans des systèmes externes, y compris AWS des services tels que Kinesis Data Streams et DynamoDB.

Apache Flink fournit également des récepteurs pour les fichiers et les sockets, et vous pouvez implémenter des récepteurs personnalisés. Parmi les différents éviers pris en charge, les suivants sont fréquemment utilisés :

## Utiliser les flux de données Kinesis
<a name="sinks-streams"></a>

Apache Flink fournit des informations sur le connecteur [Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) dans la documentation d’Apache Flink.

Pour un exemple d’application qui utilise un flux de données Kinesis pour l’entrée et la sortie, consultez [Tutoriel : Commencez à utiliser l' DataStream API dans Managed Service pour Apache Flink](getting-started.md).

## Utiliser Apache Kafka et Amazon Managed Streaming pour Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Le [connecteur Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) fournit un support complet pour la publication de données sur Apache Kafka et Amazon MSK, y compris des garanties « une seule fois ». Pour savoir comment écrire dans Kafka, consultez les [exemples de connecteurs Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) dans la documentation d'Apache Flink.

## Utiliser Amazon S3
<a name="sinks-s3"></a>

Vous pouvez utiliser le `StreamingFileSink` Apache Flink pour écrire des objets dans un compartiment Amazon S3.

Pour un exemple sur la façon d’écrire des objets dans S3, consultez[Exemple : écriture dans un compartiment Amazon S3](earlier.md#examples-s3). 

## Utilisez Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`Il s'agit d'un récepteur Apache Flink fiable et évolutif permettant de stocker les résultats des applications à l'aide du service [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). Cette section décrit comment configurer un projet Maven pour créer et utiliser un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Créer une `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Exemple de code `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Créer une `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

L’exemple de code suivant illustre la création d’un `FlinkKinesisFirehoseProducer` :

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Exemple de code `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

L'exemple de code suivant montre comment créer et configurer un flux de données Apache Flink `FlinkKinesisFirehoseProducer` et comment envoyer des données au service Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Pour un didacticiel complet sur l'utilisation du lavabo Firehose, voir. [Exemple : écrire dans Firehose](earlier.md#get-started-exercise-fh)

# Utiliser l'asynchrone I/O dans le service géré pour Apache Flink
<a name="how-async"></a>

Un I/O opérateur asynchrone enrichit les données de flux à l'aide d'une source de données externe telle qu'une base de données. Le service géré pour Apache Flink enrichit les événements du flux de manière asynchrone afin que les demandes puissent être groupées pour une plus grande efficacité. 

Pour plus d'informations, consultez la section [E/S asynchrones dans la documentation d'](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)Apache Flink.

# Transformez les données à l'aide d'opérateurs dans Managed Service pour Apache Flink avec l'API DataStream
<a name="how-operators"></a>

Pour transformer les données entrantes dans un service géré pour Apache Flink, vous devez utiliser un *opérateur* Apache Flink. Un opérateur Apache Flink transforme un ou plusieurs flux de données en un nouveau flux de données. Le nouveau flux de données contient des données modifiées par rapport au flux de données d’origine. Apache Flink fournit plus de 25 opérateurs de traitement de flux prédéfinis. Pour plus d'informations, consultez la section [Opérateurs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) dans la documentation d'Apache Flink.

**Topics**
+ [Utiliser des opérateurs de transformation](#how-operators-transform)
+ [Utiliser des opérateurs d'agrégation](#how-operators-agg)

## Utiliser des opérateurs de transformation
<a name="how-operators-transform"></a>

Voici un exemple de transformation de texte simple sur l’un des champs d’un flux de données JSON. 

Ce code crée un flux de données transformé. Le nouveau flux de données contient les mêmes données que le flux d’origine, la chaîne « ` Company` » étant ajoutée au contenu du champ `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Utiliser des opérateurs d'agrégation
<a name="how-operators-agg"></a>

Voici un exemple d’opérateur d’agrégation. Le code crée un flux de données agrégé. L’opérateur crée une fenêtre variable de 5 secondes et renvoie la somme des valeurs `PRICE` des enregistrements de la fenêtre avec la même valeur `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Pour plus d’exemples de code, consultez [Exemples de création et d'utilisation d'un service géré pour les applications Apache Flink](examples-collapsibles.md). 

# Suivez les événements dans le service géré pour Apache Flink à l'aide de l'API DataStream
<a name="how-time"></a>

Le service géré pour Apache Flink suit les événements à l’aide des horodatages suivants :
+ **Heure de traitement :** fait référence à l’heure système de la machine qui exécute l’opération correspondante.
+ **Heure de l’événement :** fait référence à l’heure à laquelle chaque événement individuel s’est produit sur son appareil producteur.
+ **Heure d’ingestion :** fait référence à l’heure à laquelle les événements entrent dans le service géré pour Apache Flink.

Vous réglez le temps utilisé par l'environnement de streaming à l'aide de`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Pour plus d'informations sur les horodatages, consultez la section [Génération de filigranes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) dans la documentation d'Apache Flink.