

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 查看 Table API 组件
<a name="how-table"></a>

你的 Apache Flink 应用程序使用 [Apache Flink Table API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) 使用关系模型与流中的数据进行交互。您可以使用表 API 通过表源访问数据，然后使用表函数转换和筛选表格数据。您可以使用 API 函数或 SQL 命令转换和筛选表格数据。

本节包含以下主题：
+ [Table API 连接器](how-table-connectors.md)：这些组件在您的应用程序与外部数据源和目标之间移动数据。
+ [Table API 时间属性](how-table-timeattributes.md)：本主题介绍 Managed Service for Apache Flink 在使用表 API 时如何跟踪事件。

# Table API 连接器
<a name="how-table-connectors"></a>

在 Apache Flink 编程模型中，连接器是应用程序用来从外部源（例如其他 AWS 服务）读取或写入数据的组件。

使用 Apache Flink Table API，您可以使用以下类型的连接器：
+ [Table API 来源](#how-table-connectors-source)：您可以使用表 API 源连接器通过 API 调`TableEnvironment`用或 SQL 查询在中创建表。
+ [Table API 接收器](#how-table-connectors-sink)：您可以使用 SQL 命令将表数据写入外部来源，例如 Amazon MSK 主题或 Amazon S3 存储桶。

## Table API 来源
<a name="how-table-connectors-source"></a>

您可以从数据流创建表源。以下代码根据 Amazon MSK 主题创建表：

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

有关表源的更多信息，请参阅 Apache Flink 文档中的[表和 SQL 连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)。

## Table API 接收器
<a name="how-table-connectors-sink"></a>

要将表数据写入接收器，可以在 SQL 中创建接收器，然后在对象上运行基于 SQL 的`StreamTableEnvironment`接收器。

以下代码示例演示了如何将表数据写入 Amazon S3 接收器：

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 您可以使用`format`参数来控制 Managed Service for Apache Flink 使用何种格式将输出写入接收器。有关格式的信息，请参阅 Apache Flink 文档中的[支持的连接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)。

## 用户定义的源和接收器
<a name="how-table-connectors-userdef"></a>

您可以使用现有的 Apache Kafka 连接器向其他 AWS 服务（例如Amazon MSK 和Amazon S3）发送数据。为了与其他数据源和目标进行交互，您可以定义自己的源和接收器。有关更多信息，请参阅 Apache Flink 文档中的[用户定义源和接收器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/)。

# Table API 时间属性
<a name="how-table-timeattributes"></a>

数据流中的每条记录都有多个时间戳，用于定义与该记录相关的事件何时发生：
+ **事件时间**：用户定义的时间戳，用于定义创建记录的事件发生的时间。
+ **摄取时间**：您的应用程序从数据流中检索记录的时间。
+ **处理时间**：您的申请处理记录的时间。

当 Apache Flink Table API 根据记录时间创建窗口时，你可以使用 `setStreamTimeCharacteristic` 方法定义它使用哪些时间戳。

有关在 Table API 中使用时间戳的更多信息，请参阅 Apache Flink 文档中的[时间属性](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/)和[及时的流处理](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/)。