Utilice conectores de tabla API - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Utilice conectores de tabla API

En el modelo de programación de Apache Flink, los conectores son componentes que la aplicación utiliza para leer o escribir datos de fuentes externas, como otros AWS servicios.

Con la tabla Flink de ApacheAPI, puede usar los siguientes tipos de conectores:

  • APIFuentes de tablas: Utiliza los conectores API fuente de tablas para crear tablas en su interior TableEnvironment mediante API llamadas o SQL consultas.

  • Lavabos de mesa API: Utilizas SQL comandos para escribir datos de tablas en fuentes externas, como un MSK tema de Amazon o un bucket de Amazon S3.

APIFuentes de tablas

Se crea una fuente de tabla a partir de un flujo de datos. El siguiente código crea una tabla a partir de un MSK tema de Amazon:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Para obtener más información sobre las fuentes de las tablas, consulte Tabla y SQL conectores en la documentación de Apache Flink.

Lavabos de mesa API

Para escribir los datos de una tabla en un sumidero, se crea el sumidero en el objeto ySQL, a continuación, se ejecuta el sumidero SQL basado en el StreamTableEnvironment objeto.

En el siguiente código de ejemplo, se muestra cómo escribir datos de tablas a un receptor de Amazon S3:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Puede usar el parámetro format para controlar el formato que Managed Service para Apache Flink utiliza para escribir el resultado en el receptor. Para obtener información sobre los formatos, consulte los conectores compatibles en la documentación de Apache Flink.

Fuentes y sumideros definidos por el usuario

Puede utilizar los conectores Apache Kafka existentes para enviar datos a y desde otros AWS servicios, como Amazon MSK y Amazon S3. Para interactuar con otros orígenes y destinos de datos, puede definir sus propios orígenes y receptores. Para obtener más información, consulte Fuentes y receptores definidos por el usuario en la documentación de Apache Flink.