테이블 커넥터 사용 API - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

테이블 커넥터 사용 API

Apache Flink 프로그래밍 모델에서 커넥터는 애플리케이션이 다른 AWS 서비스와 같은 외부 소스에서 데이터를 읽거나 쓰는 데 사용하는 구성 요소입니다.

Apache Flink 테이블을 사용하면 다음과 같은 API 유형의 커넥터를 사용할 수 있습니다.

  • 테이블 API 소스: 테이블 API 소스 커넥터를 사용하면 API 호출 또는 SQL 쿼리를 TableEnvironment 사용하여 내에서 테이블을 만들 수 있습니다.

  • 테이블 싱크 API: SQL 명령을 사용하여 Amazon MSK 주제 또는 Amazon S3 버킷과 같은 외부 소스에 테이블 데이터를 씁니다.

테이블 API 소스

데이터 스트림에서 표 소스를 생성합니다. 다음 코드는 Amazon MSK 주제에서 테이블을 생성합니다.

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

테이블 소스에 대한 자세한 내용은 Apache Flink 설명서의 테이블 및 SQL 커넥터를 참조하십시오.

테이블 싱크 API

테이블 데이터를 싱크에 쓰려면 싱크인을 만든 다음 StreamTableEnvironment 객체에서 SQL 기반 싱크를 실행합니다. SQL

다음 코드 예는 Amazon S3 싱크에 표 데이터를 쓰는 방법을 보여줍니다.

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

format 파라미터를 사용하여 Apache Flink용 관리형 서비스가 싱크에 출력을 기록하는 데 사용하는 형식을 제어할 수 있습니다. 형식에 대한 자세한 내용은 Apache Flink 설명서의 지원되는 커넥터를 참조하십시오.

사용자 정의 소스 및 싱크

기존 Apache Kafka 커넥터를 사용하여 Amazon 및 MSK Amazon S3와 같은 다른 AWS 서비스와 데이터를 주고 받을 수 있습니다. 다른 데이터 소스 및 목적지과 상호 작용하기 위해 자체 소스 및 싱크를 정의할 수 있습니다. 자세한 내용은 Apache Flink 설명서의 사용자 정의 소스 및 싱크를 참조하십시오.