

# Review Table API components
<a name="how-table"></a>

Your Apache Flink application uses the [Apache Flink Table API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/) to interact with data in a stream using a relational model. You use the Table API to access data using Table sources, and then use Table functions to transform and filter table data. You can transform and filter tabular data using either API functions or SQL commands. 

This section contains the following topics:
+ [Table API connectors](how-table-connectors.md): These components move data between your application and external data sources and destinations.
+ [Table API time attributes](how-table-timeattributes.md): This topic describes how Managed Service for Apache Flink tracks events when using the Table API.

# Table API connectors
<a name="how-table-connectors"></a>

In the Apache Flink programming model, connectors are components that your application uses to read or write data from external sources, such as other AWS services.

With the Apache Flink Table API, you can use the following types of connectors:
+ [Table API sources](#how-table-connectors-source): You use Table API source connectors to create tables within your `TableEnvironment` using either API calls or SQL queries.
+ [Table API sinks](#how-table-connectors-sink): You use SQL commands to write table data to external sources such as an Amazon MSK topic or an Amazon S3 bucket.

## Table API sources
<a name="how-table-connectors-source"></a>

You create a table source from a data stream. The following code creates a table from an Amazon MSK topic:

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

For more information about table sources, see [Table & SQL Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) in the Apache Flink Documentation.

## Table API sinks
<a name="how-table-connectors-sink"></a>

To write table data to a sink, you create the sink in SQL, and then run the SQL-based sink on the `StreamTableEnvironment` object.

The following code example demonstrates how to write table data to an Amazon S3 sink:

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 You can use the `format` parameter to control what format Managed Service for Apache Flink uses to write the output to the sink. For information about formats, see [ Supported Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) in the Apache Flink Documentation.

## User-defined sources and sinks
<a name="how-table-connectors-userdef"></a>

You can use existing Apache Kafka connectors for sending data to and from other AWS services, such as Amazon MSK and Amazon S3. For interacting with other data sources and destinations, you can define your own sources and sinks. For more information, see [ User-defined Sources and Sinks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/) in the Apache Flink Documentation.

# Table API time attributes
<a name="how-table-timeattributes"></a>

Each record in a data stream has several timestamps that define when events related to the record occurred:
+ **Event Time**: A user-defined timestamp that defines when the event that created the record occurred.
+ **Ingestion Time**: The time when your application retrieved the record from the data stream.
+ **Processing Time**: The time when your application processed the record.

When the Apache Flink Table API creates windows based on record times, you define which of these timestamps it uses by using the `setStreamTimeCharacteristic` method. 

For more information about using timestamps with the Table API, see [ Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) and [ Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) in the Apache Flink Documentation.