

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 檢閱資料表 API 元件
<a name="how-table"></a>

Apache Flink 應用程式使用 [Apache Flink 資料表 API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/)，透過關聯式模型與串流中的資料互動。您可以使用資料表 API 來存取使用資料表來源的資料，然後使用資料表函數來轉換和篩選資料表資料。您可以使用 API 函數或 SQL 命令來轉換和篩選表格式資料。

本節包含下列主題：
+ [資料表 API 連接器](how-table-connectors.md)：這些元件可以在應用程式與外部資料來源和目的地之間移動資料。
+ [資料表 API 時間屬性](how-table-timeattributes.md)：本主題說明 Managed Service for Apache Flink 如何在使用資料表 API 時追蹤事件。

# 資料表 API 連接器
<a name="how-table-connectors"></a>

在 Apache Flink 程式設計模型中，連接器是您的應用程式用來從外部來源讀取或寫入資料的元件，例如其他服務 AWS 。

透過 Apache Flink 資料表 API，您可以使用下列類型的連接器：
+ [資料表 API 來源](#how-table-connectors-source)：您可以使用資料表 API 來源連接器以及 API 呼叫或 SQL 查詢，在 `TableEnvironment` 中建立資料表。
+ [資料表 API 接收器](#how-table-connectors-sink)：您可以使用 SQL 命令將資料表資料寫入外部來源，例如 Amazon MSK 主題或 Amazon S3 儲存貯體。

## 資料表 API 來源
<a name="how-table-connectors-source"></a>

您可以從資料串流建立資料表來源。下列程式碼會從 Amazon MSK 主題建立資料表：

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

如需資料表來源的詳細資訊，請參閱 Apache Flink 文件中的[資料表和 SQL 連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)。

## 資料表 API 接收器
<a name="how-table-connectors-sink"></a>

若要將資料表資料寫入接收器，請在 SQL 中建立接收器，然後在 `StreamTableEnvironment` 物件上執行 SQL 型接收器。

下列程式碼範例示範如何將資料表資料寫入 Amazon S3 接收器：

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 您可以使用 `format` 參數來控制 Managed Service for Apache Flink 用來將輸出寫入接收器的格式。如需格式的相關資訊，請參閱 Apache Flink 文件中的[支援的連接器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)。

## 使用者定義的來源和接收器
<a name="how-table-connectors-userdef"></a>

您可以使用現有的 Apache Kafka 連接器與其他 AWS 服務 (例如 Amazon MSK 和 Amazon S3) 之間相互傳送資料。若要與其他資料來源和目的地互動，您可以定義自己的來源和接收器。如需詳細資訊，請參閱 Apache Flink 文件中的[使用者定義來源和接收器](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/)。

# 資料表 API 時間屬性
<a name="how-table-timeattributes"></a>

資料串流中的每個記錄都有數個時間戳記，用來定義與記錄相關的事件發生的時間：
+ **事件時間**：使用者定義的時間戳記，定義建立記錄的事件發生的時間。
+ **擷取時間**：應用程式從資料串流擷取記錄的時間。
+ **處理時間**：您的應用程式處理記錄的時間。

當 Apache Flink Table API 根據記錄時間建立視窗時，您可以使用 `setStreamTimeCharacteristic`方法定義其使用的時間戳記。

如需搭配資料表 API 使用時間戳記的詳細資訊，請參閱 Apache Flink 文件中的[時間屬性](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/)和[及時串流處理](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/)。