

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Esamina i componenti DataStream dell'API
<a name="how-datastream"></a>

La tua applicazione Apache Flink utilizza l'[ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) per trasformare i dati in un flusso di dati. 

Questa sezione descrive i diversi componenti che spostano, trasformano e tracciano i dati:
+ [Usa i connettori per spostare i dati in Managed Service for Apache Flink con l'API DataStream](how-connectors.md): questi componenti spostano i dati tra l'applicazione e le origini e le destinazioni dati esterne.
+ [Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream](how-operators.md): questi componenti trasformano o raggruppano gli elementi di dati all'interno dell'applicazione.
+ [Tieni traccia degli eventi in Managed Service for Apache Flink utilizzando l'API DataStream](how-time.md): Questo argomento descrive come Managed Service for Apache Flink tiene traccia degli eventi quando si utilizza l' DataStream API.

# Usa i connettori per spostare i dati in Managed Service for Apache Flink con l'API DataStream
<a name="how-connectors"></a>

Nell' DataStream API Amazon Managed Service for Apache Flink, i *connettori* sono componenti software che spostano i dati da e verso un'applicazione Managed Service for Apache Flink. I connettori sono integrazioni flessibili che consentono di leggere file e directory. I connettori sono costituiti da moduli completi per l'interazione con i servizi Amazon e i sistemi di terze parti.

I tipi di connettori comprendono:
+ [Aggiungi sorgenti di dati in streaming](how-sources.md): invio di dati all'applicazione da un flusso di dati, un file o un'altra origine dati Kinesis.
+ [Scrivi dati usando i sink](how-sinks.md): invia dati dall'applicazione a un flusso di dati Kinesis, a un flusso Firehose o a un'altra destinazione di dati.
+ [Usa I/O asincrono](how-async.md): fornisce l'accesso asincrono a un'origine dati (ad esempio, un database) per arricchire gli eventi di flusso. 

## Connettori disponibili
<a name="how-connectors-list"></a>

Il framework Apache Flink contiene connettori per l'accesso ai dati da vari tipi di origini. Per informazioni sui connettori disponibili nel framework Apache Flink, consulta [Connettori](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) nella [documentazione di Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

**avvertimento**  
Se hai applicazioni in esecuzione su Flink 1.6, 1.8, 1.11 o 1.13 e desideri eseguirle nelle regioni del Medio Oriente (Emirati Arabi Uniti), Asia Pacifico (Hyderabad), Israele (Tel Aviv), Europa (Zurigo), Medio Oriente (Emirati Arabi Uniti), Asia Pacifico (Melbourne) o Asia Pacifico (Giacarta), potresti dover ricostruire l'archivio delle applicazioni con un connettore aggiornato o eseguire l'aggiornamento a Flink 1.18.   
I connettori Apache Flink sono archiviati nei propri archivi open source. Se stai eseguendo l'aggiornamento alla versione 1.18 o successiva, devi aggiornare le tue dipendenze. Per accedere al repository per i connettori Apache Flink, vedi. AWS [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
Il precedente sorgente Kinesis non `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` è più disponibile e potrebbe essere rimosso con le future release di Flink. Utilizzate [invece Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Non esiste compatibilità a livello di stato tra `FlinkKinesisConsumer` e`KinesisStreamsSource`. Per i dettagli, consulta [Migrazione dei job esistenti al nuovo Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) Source nella documentazione di Apache Flink.  
 Di seguito sono riportate le linee guida consigliate:   


**Aggiornamenti dei connettori**  

| Versione di Flink | Connettore usato | Risoluzione | 
| --- | --- | --- | 
| 1.19, 1.20 | Fonte Kinesis |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sorgente Kinesis Data Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Lavello Kinesis |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink Kinesis Data Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, vedere [Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink) Sink.  | 
| 1.19, 1.20 | Sorgente DynamoDB Streams |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sorgente DynamoDB Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Lavello DynamoDB | Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink DynamoDB più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Lavandino Amazon SQS |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink di Amazon SQS più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1.19, 1,20 | Servizio gestito Amazon per Prometheus Sink |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink di Amazon Managed Service for Prometheus più recente. Deve essere una qualsiasi versione 1.0.0 o successiva. Per ulteriori informazioni, vedere [Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/) Sink.  | 

# Aggiungi sorgenti di dati di streaming a Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink fornisce connettori per la lettura da file, socket, raccolte e origini personalizzate. Nel codice dell'applicazione, utilizzi un'[origine Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) per ricevere dati da un flusso. Questa sezione descrive le origini disponibili per i servizi Amazon.

## Usa i flussi di dati Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`Fornisce dati in streaming all'applicazione da un flusso di dati Amazon Kinesis. 

### Creazione di una `KinesisStreamsSource`
<a name="input-streams-create"></a>

Il seguente esempio di codice illustra la creazione di una `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Per ulteriori informazioni sull'utilizzo di a`KinesisStreamsSource`, consulta [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams Connector nella [documentazione di Apache Flink e KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) il nostro esempio pubblico su Github.

### Crea un account che utilizza un consumatore EFO `KinesisStreamsSource`
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Ora supporta [Enhanced Fan-Out (](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)EFO). 

Se un consumatore Kinesis utilizza EFO, il servizio del flusso di dati Kinesis gli fornisce una larghezza di banda dedicata, anziché chiedere al consumatore di condividere la larghezza di banda fissa del flusso con gli altri consumatori che leggono dal flusso.

Per ulteriori informazioni sull'utilizzo di EFO con il consumatore Kinesis, [consulta FLIP-128: Enhanced Fan](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) Out for Kinesis Consumers. AWS 

È possibile abilitare il consumatore EFO impostando i seguenti parametri sul consumatore Kinesis:
+ **READER\$1TYPE:** imposta questo parametro su **EFO** per consentire all'applicazione di utilizzare un consumatore EFO per accedere ai dati di Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME**: imposta questo parametro su un valore di stringa che sia unico tra i consumatori di questo flusso. Il riutilizzo di un nome consumatore nello stesso flusso di dati Kinesis causerà l'interruzione del precedente consumatore che utilizzava quel nome. 

Per configurare un `KinesisStreamsSource` per l'utilizzo di EFO, aggiungi i seguenti parametri al consumatore:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Per un esempio di un'applicazione Managed Service for Apache Flink che utilizza un consumatore EFO, vedi il nostro esempio [pubblico di Kinesis](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) Connectors su Github.

## Usa Amazon MSK
<a name="input-msk"></a>

L'origine `KafkaSource` fornisce dati di streaming all'applicazione da un argomento di Amazon MSK. 

### Creazione di una `KafkaSource`
<a name="input-msk-create"></a>

Il seguente esempio di codice illustra la creazione di una `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Per ulteriori informazioni sull'utilizzo di una `KafkaSource`, consulta [Replica MSK](earlier.md#example-msk).

# Scrivi dati utilizzando i sinks in Managed Service for Apache Flink
<a name="how-sinks"></a>

Nel codice dell'applicazione, puoi utilizzare qualsiasi connettore [sink Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) per scrivere in sistemi esterni, inclusi AWS servizi come Kinesis Data Streams e DynamoDB.

Apache Flink fornisce anche sink per file e socket ed è possibile implementare sink personalizzati. Tra i diversi sink supportati, vengono utilizzati frequentemente i seguenti:

## Usa i flussi di dati Kinesis
<a name="sinks-streams"></a>

Apache Flink fornisce informazioni sul [connettore del flusso di dati Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) nella documentazione di Apache Flink.

Per un esempio di applicazione che utilizza un flusso di dati Kinesis per l'input e l'output, consulta [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md).

## Usa Apache Kafka e Amazon Managed Streaming per Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Il [connettore Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) offre un supporto completo per la pubblicazione di dati su Apache Kafka e Amazon MSK, incluse le garanzie Exactly Once. [Per imparare a scrivere su Kafka, consulta gli esempi di connettori Kafka nella documentazione di Apache Flink.](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)

## Usa Amazon S3
<a name="sinks-s3"></a>

Puoi utilizzare il `StreamingFileSink` di Apache Flink per scrivere oggetti in un bucket Amazon S3.

Per un esempio su come scrivere oggetti su S3, consulta [Esempio: scrittura su un bucket Amazon S3](earlier.md#examples-s3). 

## Usa Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`[È un sink Apache Flink affidabile e scalabile per l'archiviazione dell'output delle applicazioni utilizzando il servizio Firehose.](https://docs.aws.amazon.com/firehose/latest/dev/) In questa sezione viene descritto come impostare un progetto Maven per creare e utilizzare un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Creazione di una `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Esempio di codice `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Creazione di una `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Il seguente esempio di codice illustra la creazione di una `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Esempio di codice `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

Il seguente esempio di codice dimostra come creare e configurare un flusso di dati Apache Flink `FlinkKinesisFirehoseProducer` e inviarlo al servizio Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Per un tutorial completo su come utilizzare il lavello Firehose, vedere. [Esempio: scrittura su Firehose](earlier.md#get-started-exercise-fh)

# Usa Asynchronous I/O nel servizio gestito per Apache Flink
<a name="how-async"></a>

Un I/O operatore asincrono arricchisce i dati del flusso utilizzando una fonte di dati esterna come un database. Il servizio gestito per Apache Flink arricchisce gli eventi di flusso in modo asincrono in modo che le richieste possano essere raggruppate in batch per una maggiore efficienza. 

Per ulteriori informazioni, consulta I/O [asincrono](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) nella documentazione di Apache Flink.

# Trasforma i dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream
<a name="how-operators"></a>

Per trasformare i dati in entrata in un servizio gestito per Apache Flink viene utilizzato un operatore *Apache Flink*. Un operatore Apache Flink trasforma uno o più flussi di dati in un nuovo flusso di dati. Il nuovo flusso di dati contiene dati modificati dal flusso di dati originale. Apache Flink offre più di 25 operatori di elaborazione di flussi predefiniti. Per ulteriori informazioni, consulta [Operatori](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) nella documentazione di Apache Flink.

**Topics**
+ [Usa gli operatori di trasformazione](#how-operators-transform)
+ [Usa gli operatori di aggregazione](#how-operators-agg)

## Usa gli operatori di trasformazione
<a name="how-operators-transform"></a>

Di seguito è riportato un esempio di semplice trasformazione del testo su uno dei campi di un flusso di dati JSON. 

Questo codice crea un flusso di dati trasformato. Il nuovo flusso di dati contiene gli stessi dati del flusso originale, con la stringa "` Company`" aggiunta al contenuto del campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Usa gli operatori di aggregazione
<a name="how-operators-agg"></a>

Di seguito è riportato un esempio di operatore di aggregazione. Il codice crea un flusso di dati aggregato. L'operatore crea una finestra a cascata di 5 secondi e restituisce la somma dei valori `PRICE` per i record nella finestra con lo stesso valore `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Per ulteriori esempi di codice, consulta [Esempi di creazione e utilizzo di Managed Service per applicazioni Apache Flink](examples-collapsibles.md). 

# Tieni traccia degli eventi in Managed Service for Apache Flink utilizzando l'API DataStream
<a name="how-time"></a>

Servizio gestito per Apache Flink tiene traccia degli eventi utilizzando i seguenti timestamp:
+ **Tempo di elaborazione:** si riferisce all'ora di sistema della macchina che esegue la rispettiva operazione.
+ **Ora evento:** si riferisce all'ora in cui ogni singolo evento si è verificato sul dispositivo di produzione.
+ **Tempo di acquisizione:** si riferisce all'ora in cui gli eventi entrano nel servizio gestito per Apache Flink.

Si imposta il tempo utilizzato dall'ambiente di streaming utilizzando`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Per ulteriori informazioni sui timestamp, consulta [Generazione di filigrane](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) nella documentazione di Apache Flink.