

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Usa i connettori per spostare i dati in Managed Service for Apache Flink con l'API DataStream
<a name="how-connectors"></a>

Nell' DataStream API Amazon Managed Service for Apache Flink, i *connettori* sono componenti software che spostano i dati da e verso un'applicazione Managed Service for Apache Flink. I connettori sono integrazioni flessibili che consentono di leggere file e directory. I connettori sono costituiti da moduli completi per l'interazione con i servizi Amazon e i sistemi di terze parti.

I tipi di connettori comprendono:
+ [Aggiungi sorgenti di dati in streaming](how-sources.md): invio di dati all'applicazione da un flusso di dati, un file o un'altra origine dati Kinesis.
+ [Scrivi dati usando i sink](how-sinks.md): invia dati dall'applicazione a un flusso di dati Kinesis, a un flusso Firehose o a un'altra destinazione di dati.
+ [Usa I/O asincrono](how-async.md): fornisce l'accesso asincrono a un'origine dati (ad esempio, un database) per arricchire gli eventi di flusso. 

## Connettori disponibili
<a name="how-connectors-list"></a>

Il framework Apache Flink contiene connettori per l'accesso ai dati da vari tipi di origini. Per informazioni sui connettori disponibili nel framework Apache Flink, consulta [Connettori](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) nella [documentazione di Apache Flink.](https://nightlies.apache.org/flink/flink-docs-release-1.15/)

**avvertimento**  
Se hai applicazioni in esecuzione su Flink 1.6, 1.8, 1.11 o 1.13 e desideri eseguirle nelle regioni del Medio Oriente (Emirati Arabi Uniti), Asia Pacifico (Hyderabad), Israele (Tel Aviv), Europa (Zurigo), Medio Oriente (Emirati Arabi Uniti), Asia Pacifico (Melbourne) o Asia Pacifico (Giacarta), potresti dover ricostruire l'archivio delle applicazioni con un connettore aggiornato o eseguire l'aggiornamento a Flink 1.18.   
I connettori Apache Flink sono archiviati nei propri archivi open source. Se stai eseguendo l'aggiornamento alla versione 1.18 o successiva, devi aggiornare le tue dipendenze. Per accedere al repository per i connettori Apache Flink, vedi. AWS [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
Il precedente sorgente Kinesis non `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` è più disponibile e potrebbe essere rimosso con le future release di Flink. Utilizzate [invece Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Non esiste compatibilità a livello di stato tra `FlinkKinesisConsumer` e`KinesisStreamsSource`. Per i dettagli, consulta [Migrazione dei job esistenti al nuovo Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) Source nella documentazione di Apache Flink.  
 Di seguito sono riportate le linee guida consigliate:   


**Aggiornamenti dei connettori**  

| Versione di Flink | Connettore usato | Risoluzione | 
| --- | --- | --- | 
| 1.19, 1.20 | Fonte Kinesis |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sorgente Kinesis Data Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Lavello Kinesis |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink Kinesis Data Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, vedere [Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink) Sink.  | 
| 1.19, 1.20 | Sorgente DynamoDB Streams |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sorgente DynamoDB Streams più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Lavello DynamoDB | Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink DynamoDB più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Lavandino Amazon SQS |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink di Amazon SQS più recente. Deve essere una qualsiasi versione 5.0.0 o successiva. Per ulteriori informazioni, consulta [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1.19, 1,20 | Servizio gestito Amazon per Prometheus Sink |  Quando esegui l'aggiornamento a Managed Service for Apache Flink versione 1.19 e 1.20, assicurati di utilizzare il connettore sink di Amazon Managed Service for Prometheus più recente. Deve essere una qualsiasi versione 1.0.0 o successiva. Per ulteriori informazioni, vedere [Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/) Sink.  | 

# Aggiungi sorgenti di dati di streaming a Managed Service for Apache Flink
<a name="how-sources"></a>

Apache Flink fornisce connettori per la lettura da file, socket, raccolte e origini personalizzate. Nel codice dell'applicazione, utilizzi un'[origine Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) per ricevere dati da un flusso. Questa sezione descrive le origini disponibili per i servizi Amazon.

## Usa i flussi di dati Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`Fornisce dati in streaming all'applicazione da un flusso di dati Amazon Kinesis. 

### Creazione di una `KinesisStreamsSource`
<a name="input-streams-create"></a>

Il seguente esempio di codice illustra la creazione di una `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Per ulteriori informazioni sull'utilizzo di a`KinesisStreamsSource`, consulta [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams Connector nella [documentazione di Apache Flink e KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) il nostro esempio pubblico su Github.

### Crea un account che utilizza un consumatore EFO `KinesisStreamsSource`
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Ora supporta [Enhanced Fan-Out (](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)EFO). 

Se un consumatore Kinesis utilizza EFO, il servizio del flusso di dati Kinesis gli fornisce una larghezza di banda dedicata, anziché chiedere al consumatore di condividere la larghezza di banda fissa del flusso con gli altri consumatori che leggono dal flusso.

Per ulteriori informazioni sull'utilizzo di EFO con il consumatore Kinesis, [consulta FLIP-128: Enhanced Fan](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) Out for Kinesis Consumers. AWS 

È possibile abilitare il consumatore EFO impostando i seguenti parametri sul consumatore Kinesis:
+ **READER\$1TYPE:** imposta questo parametro su **EFO** per consentire all'applicazione di utilizzare un consumatore EFO per accedere ai dati di Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME**: imposta questo parametro su un valore di stringa che sia unico tra i consumatori di questo flusso. Il riutilizzo di un nome consumatore nello stesso flusso di dati Kinesis causerà l'interruzione del precedente consumatore che utilizzava quel nome. 

Per configurare un `KinesisStreamsSource` per l'utilizzo di EFO, aggiungi i seguenti parametri al consumatore:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Per un esempio di un'applicazione Managed Service for Apache Flink che utilizza un consumatore EFO, vedi il nostro esempio [pubblico di Kinesis](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) Connectors su Github.

## Usa Amazon MSK
<a name="input-msk"></a>

L'origine `KafkaSource` fornisce dati di streaming all'applicazione da un argomento di Amazon MSK. 

### Creazione di una `KafkaSource`
<a name="input-msk-create"></a>

Il seguente esempio di codice illustra la creazione di una `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Per ulteriori informazioni sull'utilizzo di una `KafkaSource`, consulta [Replica MSK](earlier.md#example-msk).

# Scrivi dati utilizzando i sinks in Managed Service for Apache Flink
<a name="how-sinks"></a>

Nel codice dell'applicazione, puoi utilizzare qualsiasi connettore [sink Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) per scrivere in sistemi esterni, inclusi AWS servizi come Kinesis Data Streams e DynamoDB.

Apache Flink fornisce anche sink per file e socket ed è possibile implementare sink personalizzati. Tra i diversi sink supportati, vengono utilizzati frequentemente i seguenti:

## Usa i flussi di dati Kinesis
<a name="sinks-streams"></a>

Apache Flink fornisce informazioni sul [connettore del flusso di dati Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) nella documentazione di Apache Flink.

Per un esempio di applicazione che utilizza un flusso di dati Kinesis per l'input e l'output, consulta [Tutorial: inizia a usare l' DataStream API in Managed Service for Apache Flink](getting-started.md).

## Usa Apache Kafka e Amazon Managed Streaming per Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Il [connettore Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) offre un supporto completo per la pubblicazione di dati su Apache Kafka e Amazon MSK, incluse le garanzie Exactly Once. [Per imparare a scrivere su Kafka, consulta gli esempi di connettori Kafka nella documentazione di Apache Flink.](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)

## Usa Amazon S3
<a name="sinks-s3"></a>

Puoi utilizzare il `StreamingFileSink` di Apache Flink per scrivere oggetti in un bucket Amazon S3.

Per un esempio su come scrivere oggetti su S3, consulta [Esempio: scrittura su un bucket Amazon S3](earlier.md#examples-s3). 

## Usa Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`[È un sink Apache Flink affidabile e scalabile per l'archiviazione dell'output delle applicazioni utilizzando il servizio Firehose.](https://docs.aws.amazon.com/firehose/latest/dev/) In questa sezione viene descritto come impostare un progetto Maven per creare e utilizzare un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Creazione di una `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Esempio di codice `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Creazione di una `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Il seguente esempio di codice illustra la creazione di una `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Esempio di codice `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

Il seguente esempio di codice dimostra come creare e configurare un flusso di dati Apache Flink `FlinkKinesisFirehoseProducer` e inviarlo al servizio Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Per un tutorial completo su come utilizzare il lavello Firehose, vedere. [Esempio: scrittura su Firehose](earlier.md#get-started-exercise-fh)

# Usa Asynchronous I/O nel servizio gestito per Apache Flink
<a name="how-async"></a>

Un I/O operatore asincrono arricchisce i dati del flusso utilizzando una fonte di dati esterna come un database. Il servizio gestito per Apache Flink arricchisce gli eventi di flusso in modo asincrono in modo che le richieste possano essere raggruppate in batch per una maggiore efficienza. 

Per ulteriori informazioni, consulta I/O [asincrono](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) nella documentazione di Apache Flink.