

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# DataStream API-Komponenten überprüfen
<a name="how-datastream"></a>

Ihre Apache Flink-Anwendung verwendet die [Apache DataStream Flink-API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/), um Daten in einen Datenstrom umzuwandeln. 

In diesem Abschnitt werden die verschiedenen Komponenten beschrieben, die Daten verschieben, transformieren und verfolgen:
+ [Verwenden Sie Konnektoren, um Daten in Managed Service für Apache Flink mit der DataStream API zu verschieben](how-connectors.md): Diese Komponenten verschieben Daten zwischen Ihrer Anwendung und externen Datenquellen und Zielen.
+ [Transformieren Sie Daten mithilfe von Operatoren in Managed Service für Apache Flink mit der API DataStream](how-operators.md): Diese Komponenten transformieren oder gruppieren Datenelemente innerhalb Ihrer Anwendung.
+ [Ereignisse in Managed Service für Apache Flink mithilfe der DataStream API verfolgen](how-time.md): In diesem Thema wird beschrieben, wie Managed Service for Apache Flink Ereignisse bei der Verwendung der DataStream API verfolgt.

# Verwenden Sie Konnektoren, um Daten in Managed Service für Apache Flink mit der DataStream API zu verschieben
<a name="how-connectors"></a>

In der Amazon Managed Service for Apache DataStream Flink-API sind *Konnektoren* Softwarekomponenten, die Daten in und aus einer Managed Service for Apache Flink-Anwendung übertragen. Konnektoren sind flexible Integrationen, mit denen Sie aus Dateien und Verzeichnissen lesen können. Konnektoren bestehen aus kompletten Modulen für die Interaktion mit Amazon-Services und Systemen von Drittanbietern.

Zu den Konnektoren gehören die folgenden:
+ [Fügen Sie Streaming-Datenquellen hinzu](how-sources.md): Stellen Ihrer Anwendung Daten aus einem Kinesis Data Stream, einer Datei oder einer anderen Datenquelle zur Verfügung.
+ [Schreiben Sie Daten mithilfe von Senken](how-sinks.md): Senden Sie Daten aus Ihrer Anwendung an einen Kinesis-Datenstream, Firehose-Stream oder ein anderes Datenziel.
+ [Verwenden Sie asynchrone I/O](how-async.md): Ermöglicht asynchronen Zugriff auf eine Datenquelle (z. B. eine Datenbank), um Stream-Ereignisse zu bereichern. 

## Verfügbare Konnektoren
<a name="how-connectors-list"></a>

Das Apache-Flink-Framework enthält Konnektoren für den Zugriff auf Daten aus verschiedenen Quellen. Informationen zu den im Apache Flink-Framework verfügbaren Konnektoren finden Sie unter [Konnektoren](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) in der [Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Warnung**  
Wenn Sie Anwendungen haben, die auf Flink 1.6, 1.8, 1.11 oder 1.13 laufen und in den Regionen Naher Osten (VAE), Asien-Pazifik (Hyderabad), Israel (Tel Aviv), Europa (Zürich), Naher Osten (VAE), Asien-Pazifik (Melbourne) oder Asien-Pazifik (Jakarta) laufen möchten, müssen Sie möglicherweise Ihr Anwendungsarchiv mit einem aktualisierten Konnektor neu erstellen oder auf Flink 1.18 aktualisieren.   
Apache Flink-Konnektoren werden in ihren eigenen Open-Source-Repositorys gespeichert. Wenn Sie auf Version 1.18 oder höher aktualisieren, müssen Sie Ihre Abhängigkeiten aktualisieren. Informationen zum Zugriff auf das Repository für Apache AWS Flink-Konnektoren finden Sie unter. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
Die frühere `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` Kinesis-Quelle wurde eingestellt und könnte mit einer future Version von Flink entfernt werden. Verwenden Sie stattdessen [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Es besteht keine staatliche Kompatibilität zwischen `FlinkKinesisConsumer` und`KinesisStreamsSource`. Einzelheiten finden Sie unter [Migration vorhandener Jobs auf eine neue Kinesis Streams-Quelle](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) in der Apache Flink-Dokumentation.  
 Im Folgenden finden Sie die empfohlenen Richtlinien:   


**Konnektor-Upgrades**  

| Flink-Version | Verwendeter Konnektor | Auflösung | 
| --- | --- | --- | 
| 1.19, 1.20 | Kinesis-Quelle |  Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Kinesis Data Streams Streams-Quellconnector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Kinesis-Spüle |  Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Kinesis Data Streams Streams-Sink-Connector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | DynamoDB-Streams-Quelle |  Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten DynamoDB Streams Streams-Quellconnector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | DynamoDB-Senke | Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten DynamoDB-Sink-Connector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Amazon SQS SQS-Spüle |  Stellen Sie beim Upgrade auf Managed Service für Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Amazon SQS SQS-Sink-Connector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1,19, 1,20 | Amazon Managed Service für Prometheus Sink |  Stellen Sie beim Upgrade auf Managed Service für Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Amazon Managed Service for Prometheus Sink Connector verwenden. Das muss eine beliebige Version 1.0.0 oder höher sein. Weitere Informationen finden Sie unter [Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/) Sink.  | 

# Fügen Sie Streaming-Datenquellen zu Managed Service für Apache Flink hinzu
<a name="how-sources"></a>

Apache Flink bietet Konnektoren zum Lesen aus Dateien, Sockets, Sammlungen und benutzerdefinierten Quellen. In Ihrem Anwendungscode verwenden Sie eine [Apache Flink-Quelle](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources), um Daten aus einem Stream zu empfangen. In diesem Abschnitt werden die Quellen beschrieben, die für Amazon-Services verfügbar sind.

## Verwenden Sie Kinesis-Datenstreams
<a name="input-streams"></a>

Das `KinesisStreamsSource` stellt Streaming-Daten aus einem Amazon Kinesis Kinesis-Datenstream für Ihre Anwendung bereit. 

### Erstellen eines `KinesisStreamsSource`
<a name="input-streams-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Weitere Informationen zur Verwendung von finden Sie unter [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) in der Apache Flink-Dokumentation und in [unserem öffentlichen KinesisConnectors Beispiel auf](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) Github. `KinesisStreamsSource`

### Erstellen Sie einen`KinesisStreamsSource`, der einen EFO-Consumer verwendet
<a name="input-streams-efo"></a>

Der unterstützt `KinesisStreamsSource` jetzt [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Wenn ein Kinesis-Verbraucher EFO verwendet, stellt ihm der Kinesis Data Streams-Service seine eigene dedizierte Bandbreite zur Verfügung, anstatt dass der Verbraucher die feste Bandbreite des Streams mit den anderen Verbrauchern teilt, die aus dem Stream lesen.

Weitere Informationen zur Verwendung von EFO mit Kinesis Consumer finden Sie unter [FLIP-128: Verbesserter Lüfterausgang](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) für Kinesis-Verbraucher. AWS 

Sie aktivieren den EFO-Consumer, indem Sie die folgenden Parameter für den Kinesis-Consumer festlegen:
+ **READER\$1TYPE:** Setzen Sie diesen Parameter auf **EFO**, damit Ihre Anwendung einen EFO-Consumer für den Zugriff auf die Kinesis Data Stream-Daten verwendet. 
+ **EFO\$1CONSUMER\$1NAME:** Setzen Sie diesen Parameter auf einen Zeichenfolgenwert, der unter den Verbrauchern dieses Streams eindeutig ist. Die Wiederverwendung eines Verbrauchernamens in demselben Kinesis Data Stream führt dazu, dass der vorherige Verbraucher, der diesen Namen verwendet hat, beendet wird. 

Um einen `KinesisStreamsSource` für die Verwendung von EFO zu konfigurieren, fügen Sie dem Verbraucher die folgenden Parameter hinzu:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Ein Beispiel für eine Managed Service for Apache Flink-Anwendung, die einen EFO-Consumer verwendet, finden Sie in [unserem öffentlichen Kinesis Connectors-Beispiel](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) auf Github.

## Verwenden Sie Amazon MSK
<a name="input-msk"></a>

Die `KafkaSource`-Quelle stellt Streaming-Daten aus einem Amazon MSK-Thema für Ihre Anwendung bereit. 

### Erstellen eines `KafkaSource`
<a name="input-msk-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Weitere Informationen zur Verwendung von `KafkaSource` finden Sie unter [MSK-Replikation](earlier.md#example-msk).

# Schreiben Sie Daten mithilfe von Senken in Managed Service für Apache Flink
<a name="how-sinks"></a>

In Ihrem Anwendungscode können Sie jeden [Apache Flink-Sink-Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) verwenden, um in externe Systeme zu schreiben, einschließlich AWS Dienste wie Kinesis Data Streams und DynamoDB.

Apache Flink bietet auch Senken für Dateien und Sockets, und Sie können benutzerdefinierte Senken implementieren. Unter den verschiedenen unterstützten Senken werden häufig die folgenden verwendet:

## Verwenden Sie Kinesis-Datenstreams
<a name="sinks-streams"></a>

Apache Flink bietet Informationen zum [Kinesis Data Streams-Konnector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) in der Apache Flink-Dokumentation.

Ein Beispiel für eine Anwendung, die einen Kinesis Data Stream für Eingabe und Ausgabe verwendet, finden Sie unter [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md).

## Verwenden Sie Apache Kafka und Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Der [Apache Flink Kafka Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) bietet umfassende Unterstützung für die Veröffentlichung von Daten in Apache Kafka und Amazon MSK, einschließlich exakt einmaliger Garantien. Informationen zum Schreiben in Kafka finden Sie in den [Beispielen für Kafka Connectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) in der Apache Flink-Dokumentation.

## Verwenden Sie Amazon S3
<a name="sinks-s3"></a>

Sie können Apache Flink `StreamingFileSink` verwenden, um Objekte in einen Amazon S3-Bucket zu schreiben.

Ein Beispiel dafür, wie man Objekte in S3 schreibt, finden Sie unter [Beispiel: In einen Amazon S3 S3-Bucket schreiben](earlier.md#examples-s3). 

## Benutze Firehose
<a name="sinks-firehose"></a>

Das `FlinkKinesisFirehoseProducer` ist eine zuverlässige, skalierbare Apache Flink-Senke zum Speichern von Anwendungsausgaben mithilfe des [Firehose-Dienstes.](https://docs.aws.amazon.com/firehose/latest/dev/) In diesem Abschnitt wird die Einrichtung eines Maven-Projekts beschrieben, um einen `FlinkKinesisFirehoseProducer` zu erstellen und zu verwenden.

**Topics**
+ [Erstellen eines `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer`-Codebeispiel](#sinks-firehose-sample)

### Erstellen eines `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer`-Codebeispiel
<a name="sinks-firehose-sample"></a>

Das folgende Codebeispiel zeigt, wie Sie einen Apache Flink-Datenstream erstellen `FlinkKinesisFirehoseProducer` und konfigurieren und Daten von diesem an den Firehose senden.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Ein vollständiges Tutorial zur Verwendung der Firehose-Spüle finden Sie unter[Beispiel: An Firehose schreiben](earlier.md#get-started-exercise-fh).

# Verwenden Sie Asynchronous I/O in Managed Service für Apache Flink
<a name="how-async"></a>

Ein I/O asynchroner Operator reichert Stream-Daten mithilfe einer externen Datenquelle wie einer Datenbank an. Managed Service für Apache Flink reichert die Stream-Ereignisse asynchron an, sodass Anfragen zur Steigerung der Effizienz gebündelt werden können. 

Weitere Informationen finden Sie unter [Asynchronous I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) in der Apache Flink-Dokumentation.

# Transformieren Sie Daten mithilfe von Operatoren in Managed Service für Apache Flink mit der API DataStream
<a name="how-operators"></a>

Um eingehende Daten in einem Managed Service für Apache Flink umzuwandeln, verwenden Sie einen Apache-Flink-*Operator*. Ein Apache-Flink-Operator wandelt einen oder mehrere Datenströme in einen neuen Datenstrom um. Der neue Datenstrom enthält modifizierte Daten aus dem ursprünglichen Datenstrom. Apache Flink bietet mehr als 25 vorgefertigte Operatoren zur Stream-Verarbeitung. Weitere Informationen finden Sie unter [Operatoren](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) in der Apache Flink-Dokumentation.

**Topics**
+ [Verwenden Sie Transformationsoperatoren](#how-operators-transform)
+ [Verwenden Sie Aggregationsoperatoren](#how-operators-agg)

## Verwenden Sie Transformationsoperatoren
<a name="how-operators-transform"></a>

Im Folgenden finden Sie ein Beispiel für eine einfache Texttransformation in einem der Felder eines JSON-Datenstroms. 

Dieser Code erstellt einen transformierten Datenstrom. Der neue Datenstrom enthält dieselben Daten wie der ursprüngliche Stream, wobei die Zeichenfolge „` Company`“ an den Inhalt des `TICKER`-Felds angehängt wird.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Verwenden Sie Aggregationsoperatoren
<a name="how-operators-agg"></a>

Es folgt ein Beispiel für einen Aggregationsoperator. Der Code erstellt einen aggregierten Datenstrom. Der Operator erstellt ein 5-sekündiges rollierendes Fenster und gibt die Summe der `PRICE`-Werte für die Datensätze im Fenster mit demselben `TICKER`-Wert zurück.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Weitere Codebeispiele finden Sie unter [Beispiele für die Erstellung von und die Arbeit mit Managed Service für Apache Flink-Anwendungen](examples-collapsibles.md). 

# Ereignisse in Managed Service für Apache Flink mithilfe der DataStream API verfolgen
<a name="how-time"></a>

Managed Service für Apache Flink verfolgt Ereignisse mit den folgenden Zeitstempeln:
+ **Verarbeitungszeit:** Bezieht sich auf die Systemzeit der Maschine, die den jeweiligen Vorgang ausführt.
+ **Ereigniszeit:** Bezieht sich auf die Zeit, zu der jedes einzelne Ereignis auf seinem produzierenden Gerät eingetreten ist.
+ **Erfassungszeit:** Bezieht sich auf den Zeitpunkt, zu dem Ereignisse in den Service von Managed Service für Apache Flink eingehen.

Sie legen die von der Streaming-Umgebung verwendete Zeit mithilfe von fest`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Weitere Informationen zu Zeitstempeln finden Sie unter [Generieren von Wasserzeichen](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) in der Apache Flink-Dokumentation.