

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Managed Service für Apache Flink: So funktioniert's
<a name="how-it-works"></a>

Managed Service for Apache Flink ist ein vollständig verwalteter Amazon-Service, mit dem Sie eine Apache Flink-Anwendung zur Verarbeitung von Streaming-Daten verwenden können. Zuerst programmieren Sie Ihre Apache Flink-Anwendung und dann erstellen Sie Ihre Managed Service for Apache Flink-Anwendung.

## Programmieren Sie Ihre Apache Flink-Anwendung
<a name="how-it-works-programming"></a>

Eine Apache-Flink-Anwendung ist eine Java- oder Scala-Anwendung, die mit dem Apache-Flink-Framework erstellt wurde. Sie entwickeln und erstellen Ihre Apache-Flink-Anwendung lokal. 

Anwendungen verwenden hauptsächlich entweder die [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html) oder die [Tabellen-API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/). Die anderen Apache Flink APIs stehen Ihnen ebenfalls zur Verfügung, werden jedoch weniger häufig beim Erstellen von Streaming-Anwendungen verwendet.

Die Funktionen der beiden APIs sind wie folgt:

### DataStream API
<a name="how-it-works-prog-datastream"></a>

Das Apache Flink DataStream API-Programmiermodell basiert auf zwei Komponenten:
+ **Datenstrom:** Die strukturierte Darstellung eines kontinuierlichen Flusses von Datensätzen.
+ **Transformationsoperator:** Nimmt einen oder mehrere Datenströme als Eingabe und erzeugt einen oder mehrere Datenströme als Ausgabe.

Mit der DataStream API erstellte Anwendungen haben folgende Funktionen:
+ Daten aus einer Datenquelle (z. B. einem Kinesis-Strom oder einem Amazon-MSK-Thema).
+ Transformationen auf die Daten anwenden, z. B. Filterung, Aggregation oder Anreicherung.
+ Transformierte Daten in eine Datensenke schreiben.

Anwendungen, die die DataStream API verwenden, können in Java oder Scala geschrieben werden und können aus einem Kinesis-Datenstream, einem Amazon MSK-Thema oder einer benutzerdefinierten Quelle lesen.

Ihre Anwendung verarbeitet Daten mithilfe eines *Konnektors*. Apache Flink verwendet die folgenden Arten von Konnektoren: 
+ **Quelle**: Ein Konnektor, der zum Lesen externer Daten verwendet wird.
+ **Senke**: Ein Konnektor, der zum Schreiben an externe Standorte verwendet wird. 
+ **Operator**: Ein Konnektor, der zur Verarbeitung von Daten innerhalb der Anwendung verwendet wird.

Eine typische Anwendung besteht aus mindestens einem Datenstrom mit einer Quelle, einem Datenstrom mit einem oder mehreren Operatoren und mindestens einer Datensenke.

Weitere Informationen zur Verwendung der DataStream API finden Sie unter. [DataStream API-Komponenten überprüfen](how-datastream.md)

### Tabellen-API
<a name="how-it-works-prog-table"></a>

Das Programmiermodell der Tabellen-API von Apache Flink basiert auf den folgenden Komponenten:
+ **Tabellenumgebung:** Eine Schnittstelle zu zugrunde liegenden Daten, die Sie verwenden, um eine oder mehrere Tabellen zu erstellen und zu hosten. 
+ **Tabelle:** Ein Objekt, das den Zugriff auf eine SQL-Tabelle oder -Ansicht ermöglicht.
+ **Tabellenquelle:** Wird verwendet, um Daten aus einer externen Quelle zu lesen, z. B. aus einem Amazon-MSK-Thema.
+ **Tabellenfunktion:** Eine SQL-Abfrage oder ein API-Aufruf, der zur Transformation von Daten verwendet wird.
+ **Tabellensenke:** Wird verwendet, um Daten an einen externen Speicherort zu schreiben, z. B. in einen Amazon-S3-Bucket.

Anwendungen, die mit der Tabellen-API erstellt werden, haben folgende Funktionen:
+ Erstellen einer `TableEnvironment` durch Herstellen einer Verbindung zu einer `Table Source`. 
+ Erstellen einer Tabelle in der `TableEnvironment` durch entweder SQL-Abfragen oder Tabellen-API-Funktionen.
+ Ausführen einer Tabellenabfrage über die Tabellen-API oder SQL
+ Anwenden von Transformationen auf die Abfrageergebnisse über Tabellenfunktionen oder SQL-Abfragen.
+ Schreiben der Abfrage- oder Funktionsergebnisse in eine `Table Sink`.

Anwendungen, die die Tabellen-API verwenden, können in Java oder Scala geschrieben werden und Daten entweder mittels API-Aufrufen oder SQL-Abfragen abfragen. 

Weitere Informationen zur Verwendung der Tabellen-API finden Sie unter [API-Komponenten von Review Table](how-table.md).

## Erstellen Sie Ihre Anwendung Managed Service für Apache Flink
<a name="how-it-works-app"></a>

Managed Service für Apache Flink ist ein AWS Dienst, der eine Umgebung für das Hosten Ihrer Apache Flink-Anwendung erstellt und ihr die folgenden Einstellungen zur Verfügung stellt:
+ **[Verwenden Sie Laufzeiteigenschaften](how-properties.md): ** Parameter, die Sie Ihrer Anwendung zur Verfügung stellen können. Sie können diese Parameter ändern, ohne Ihren Anwendungscode neu kompilieren zu müssen.
+ **[Implementieren Sie Fehlertoleranz](how-fault.md)**: Wie sich Ihre Anwendung nach Unterbrechungen und Neustarts wiederherstellt.
+ **[Protokollierung und Überwachung in Amazon Managed Service für Apache Flink](monitoring-overview.md)**: Wie Ihre Anwendung Ereignisse in Logs protokolliert. CloudWatch 
+ **[Implementieren Sie Anwendungsskalierung](how-scaling.md)**: Wie Ihre Anwendung Datenverarbeitungsressourcen bereitstellt.

Sie können die Anwendung mit Managed Service für Apache Flink entweder über die Konsole oder die AWS CLI erstellen und ausführen. Erste Schritte zum Erstellen einer Anwendung mit Managed Service für Apache Flink finden Sie unter [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md).

# Erstellen Sie einen Managed Service für die Apache Flink-Anwendung
<a name="how-creating-apps"></a>

Dieses Thema enthält Informationen zum Erstellen einer Managed Service für Apache Flink-Anwendung.

**Topics**
+ [Erstellen Sie Ihren Anwendungscode für Managed Service für Apache Flink](#how-creating-apps-building)
+ [Erstellen Sie Ihre Managed Service für Apache Flink-Anwendung](#how-creating-apps-creating)
+ [Verwenden Sie vom Kunden verwaltete Schlüssel](#how-creating-apps-use-cmk)
+ [Starten Sie Ihre Managed Service for Apache Flink-Anwendung](#how-creating-apps-starting)
+ [Überprüfen Sie Ihre Anwendung Managed Service für Apache Flink](#how-creating-apps-verifying)
+ [Aktivieren Sie System-Rollbacks für Ihre Managed Service for Apache Flink-Anwendung](how-system-rollbacks.md)

## Erstellen Sie Ihren Anwendungscode für Managed Service für Apache Flink
<a name="how-creating-apps-building"></a>

In diesem Abschnitt werden die Komponenten beschrieben, mit denen Sie den Anwendungscode für Ihre Managed Service for Apache Flink-Anwendung erstellen. 

Es wird empfohlen, die neueste unterstützte Version von Apache Flink für Ihren Anwendungscode zu verwenden. Informationen zur Aktualisierung von Managed Service für Apache Flink-Anwendungen finden Sie unter [Verwenden Sie direkte Versionsupgrades für Apache Flink](how-in-place-version-upgrades.md). 

Sie erstellen Ihren Anwendungscode mit [Apache Maven](https://maven.apache.org/). Ein Apache Maven-Projekt verwendet eine `pom.xml`-Datei, um die Versionen der verwendeten Komponenten anzugeben. 

**Anmerkung**  
Managed Service für Apache Flink unterstützt JAR-Dateien mit einer Größe von bis zu 512 MB. Wenn Sie eine größere JAR-Datei verwenden, kann Ihre Anwendung nicht gestartet werden.

Anwendungen können jetzt die Java-API von jeder Scala-Version aus verwenden. Sie müssen die Scala-Standardbibliothek Ihrer Wahl in Ihre Scala-Anwendungen integrieren.

Informationen zum Erstellen einer Managed-Service-für-Apache-Flink-Anwendung, die **Apache Beam** verwendet, finden Sie unter [Verwenden Sie Apache Beam mit Managed Service für Apache Flink-Anwendungen](how-creating-apps-beam.md).

### Geben Sie die Apache Flink-Version Ihrer Anwendung an
<a name="how-creating-apps-building-flink"></a>

Wenn Sie Managed Service für Apache Flink Laufzeit Version 1.1.0 und höher verwenden, geben Sie die Version von Apache Flink an, die Ihre Anwendung verwendet, wenn Sie Ihre Anwendung kompilieren. Sie geben die Version von Apache Flink mit dem Parameter an`-Dflink.version`. Wenn Sie beispielsweise Apache Flink 2.2.0 verwenden, geben Sie Folgendes an:

```
mvn package -Dflink.version=2.2.0
```

Informationen zum Erstellen von Anwendungen mit früheren Versionen von Apache Flink finden Sie unter. [Frühere Versionen](earlier.md)

## Erstellen Sie Ihre Managed Service für Apache Flink-Anwendung
<a name="how-creating-apps-creating"></a>

Nachdem Sie Ihren Anwendungscode erstellt haben, gehen Sie wie folgt vor, um Ihre Managed Service for Apache Flink (Amazon MSF) -Anwendung zu erstellen:
+ **Laden Sie Ihren Anwendungscode hoch**: Laden Sie Ihren Anwendungscode in einen Amazon-S3-Bucket hoch. Geben Sie beim Erstellen Ihrer Anwendung den S3-Bucket-Namen und den Objektnamen Ihres Anwendungscodes an. Ein Tutorial, das zeigt, wie Sie Ihren Anwendungscode hochladen, finden Sie im [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md) Tutorial.
+ **Erstellen Sie Ihre Managed Service for Apache Flink-Anwendung**: Verwenden Sie eine der folgenden Methoden, um Ihre Amazon MSF-Anwendung zu erstellen:
**Anmerkung**  
Amazon MSF verschlüsselt Ihre Anwendung standardmäßig mit. AWS-eigene Schlüssel Sie können Ihre neue Anwendung auch mithilfe von vom AWS KMS Kunden verwalteten Schlüsseln (CMKs) erstellen, um Ihre Schlüssel selbst zu erstellen, zu besitzen und zu verwalten. Informationen zu finden CMKs Sie unter[Schlüsselverwaltung in Amazon Managed Service für Apache Flink](key-management-flink.md).
  + **Erstellen Sie Ihre Amazon MSF-Anwendung mit der AWS Konsole:** Sie können Ihre Anwendung mithilfe der AWS Konsole erstellen und konfigurieren. 

    Wenn Sie Ihre Anwendung mithilfe der Konsole erstellen, werden die von Ihrer Anwendung abhängigen Ressourcen (wie CloudWatch Log-Streams, IAM-Rollen und IAM-Richtlinien) für Sie erstellt. 

    Wenn Sie die Anwendung mithilfe der Konsole erstellen, geben Sie an, welche Version von Apache Flink Ihre Anwendung verwendet, indem Sie sie aus dem Pulldown-Menü auf der Seite **Managed– Service für Apache Flink Anwendung erstellen** auswählen. 

    Ein Tutorial zur Verwendung der Konsole zum Erstellen einer Anwendung finden Sie im [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md) Tutorial.
  + **Erstellen Sie Ihre Amazon MSF-Anwendung mit der AWS CLI:** Sie können Ihre Anwendung mithilfe der AWS CLI erstellen und konfigurieren. 

    Wenn Sie Ihre Anwendung mithilfe der CLI erstellen, müssen Sie auch die abhängigen Ressourcen Ihrer Anwendung (wie CloudWatch Log-Streams, IAM-Rollen und IAM-Richtlinien) manuell erstellen.

    Wenn Sie Ihre Anwendung mit der CLI erstellen, geben Sie mithilfe des `RuntimeEnvironment`-Aktionsparameters der `CreateApplication`-Aktion an, welche Version von Apache Flink Ihre Anwendung verwendet.
**Anmerkung**  
Sie können die `RuntimeEnvironment` einer vorhandenen Anwendung ändern. Um zu erfahren wie dies geht, vgl. [Verwenden Sie direkte Versionsupgrades für Apache Flink](how-in-place-version-upgrades.md).

## Verwenden Sie vom Kunden verwaltete Schlüssel
<a name="how-creating-apps-use-cmk"></a>

In Amazon MSF sind vom Kunden verwaltete Schlüssel (CMKs) eine Funktion, mit der Sie die Daten Ihrer Anwendung mit einem Schlüssel verschlüsseln können, den Sie auf AWS Key Management Service (AWS KMS) erstellen, besitzen und verwalten. Für eine Amazon MSF-Anwendung bedeutet dies, dass alle Daten, die einem [Flink-Checkpoint](how-fault.md) oder [-Snapshot](how-snapshots.md) unterliegen, mit einem CMK verschlüsselt werden, den Sie für diese Anwendung definieren.

Um CMK mit Ihrer Anwendung zu verwenden, müssen Sie zuerst [Ihre neue Anwendung erstellen](#how-creating-apps-creating) und dann eine CMK anwenden. Weitere Informationen zur Verwendung CMKs finden Sie unter. [Schlüsselverwaltung in Amazon Managed Service für Apache Flink](key-management-flink.md)

## Starten Sie Ihre Managed Service for Apache Flink-Anwendung
<a name="how-creating-apps-starting"></a>

Nachdem Sie Ihren Anwendungscode erstellt, in S3 hochgeladen und Ihre Managed Service für Apache Flink-Anwendung erstellt haben, starten Sie die Anwendung. Das Starten einer Anwendung Managed Service für Apache Flink dauert in der Regel mehrere Minuten.

Verwenden Sie eine der folgenden Methoden, um die Anwendung zu starten:
+ **Starten Sie Ihre Managed Service for Apache Flink-Anwendung über die AWS Konsole:** Sie können Ihre Anwendung ausführen, indem Sie auf der Seite Ihrer Anwendung in der AWS Konsole auf **Ausführen** klicken.
+ **Starten Sie Ihre Managed Service for Apache Flink-Anwendung mithilfe der AWS API:** Sie können Ihre Anwendung mithilfe der [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)Aktion ausführen. 

## Überprüfen Sie Ihre Anwendung Managed Service für Apache Flink
<a name="how-creating-apps-verifying"></a>

Sie können auf folgende Arten überprüfen, ob die Anwendung funktioniert:
+ **Verwendung von CloudWatch Protokollen:** Sie können CloudWatch Logs and CloudWatch Logs Insights verwenden, um zu überprüfen, ob Ihre Anwendung ordnungsgemäß ausgeführt wird. Informationen zur Verwendung von CloudWatch Logs mit Ihrer Managed Service for Apache Flink-Anwendung finden Sie unter[Protokollierung und Überwachung in Amazon Managed Service für Apache Flink](monitoring-overview.md).
+ **Verwenden von CloudWatch Metriken:** Sie können CloudWatch Metriken verwenden, um die Aktivität Ihrer Anwendung oder die Aktivitäten in den Ressourcen zu überwachen, die Ihre Anwendung für die Eingabe oder Ausgabe verwendet (wie Kinesis-Streams, Firehose-Streams oder Amazon S3 S3-Buckets). Weitere Informationen zu CloudWatch Metriken finden Sie unter [Arbeiten mit Metriken](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html) im CloudWatch Amazon-Benutzerhandbuch.
+ **Überwachung der Ausgabespeicherorte:** Wenn Ihre Anwendung die Ausgabe an einen Speicherort schreibt (z. B. einen Amazon S3-Bucket oder eine Datenbank), können Sie diesen Speicherort auf geschriebene Daten überwachen.

# Aktivieren Sie System-Rollbacks für Ihre Managed Service for Apache Flink-Anwendung
<a name="how-system-rollbacks"></a>

Mit der System-Rollback-Funktion können Sie eine höhere Verfügbarkeit Ihrer laufenden Apache Flink-Anwendung auf Amazon Managed Service for Apache Flink erreichen. Wenn Sie sich für diese Konfiguration entscheiden, kann der Service die Anwendung automatisch auf die zuvor ausgeführte Version zurücksetzen, wenn eine Aktion wie Code `UpdateApplication` - oder `autoscaling` Konfigurationsfehler auftritt.

**Anmerkung**  
Um die System-Rollback-Funktion nutzen zu können, müssen Sie sich anmelden, indem Sie Ihre Anwendung aktualisieren. Bestehende Anwendungen verwenden standardmäßig nicht automatisch das System-Rollback.

## Funktionsweise
<a name="how-rollback-works"></a>

Wenn Sie einen Anwendungsvorgang starten, z. B. eine Aktualisierungs- oder Skalierungsaktion, versucht der Amazon Managed Service für Apache Flink zunächst, diesen Vorgang auszuführen. Wenn Probleme erkannt werden, die den Erfolg des Vorgangs verhindern, wie z. B. Codefehler oder unzureichende Berechtigungen, leitet der Service automatisch einen Vorgang ein. `RollbackApplication`

Beim Rollback wird versucht, die Anwendung auf die vorherige Version, die erfolgreich ausgeführt wurde, zusammen mit dem zugehörigen Anwendungsstatus wiederherzustellen. Wenn das Rollback erfolgreich ist, setzt Ihre Anwendung die Datenverarbeitung mit minimaler Ausfallzeit mit der vorherigen Version fort. Wenn das automatische Rollback ebenfalls fehlschlägt, versetzt Amazon Managed Service for Apache Flink die Anwendung in den `READY` Status, sodass Sie weitere Maßnahmen ergreifen können, einschließlich der Behebung des Fehlers und der Wiederholung des Vorgangs. 

Sie müssen sich für automatische System-Rollbacks anmelden. Ab diesem Zeitpunkt können Sie es über die Konsole oder API für alle Operationen in Ihrer Anwendung aktivieren. 

Die folgende Beispielanforderung für die `UpdateApplication` Aktion ermöglicht System-Rollbacks für eine Anwendung:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## Sehen Sie sich gängige Szenarien für automatisches System-Rollback an
<a name="common-scenarios"></a>

Die folgenden Szenarien veranschaulichen, in welchen Bereichen automatische System-Rollbacks von Vorteil sind:
+ **Anwendungsupdates:** Wenn Sie Ihre Anwendung mit neuem Code aktualisieren, der Fehler aufweist, wenn Sie den Flink-Job über die Hauptmethode initialisieren, ermöglicht das automatische Rollback die Wiederherstellung der vorherigen Arbeitsversion. Zu den weiteren Aktualisierungsszenarien, in denen System-Rollbacks hilfreich sind, gehören:
  + [Wenn Ihre Anwendung so aktualisiert wurde, dass sie mit einer höheren Parallelität als MaxParallelism ausgeführt wird.](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto)
  + Wenn Ihre Anwendung so aktualisiert wird, dass sie mit falschen Subnetzen für eine VPC-Anwendung läuft, führt dies zu einem Fehler beim Start des Flink-Jobs. 
+ **Upgrades der Flink-Version:** Wenn Sie auf eine neue Apache Flink-Version aktualisieren und bei der aktualisierten Anwendung ein Snapshot-Kompatibilitätsproblem auftritt, können Sie mit dem System-Rollback automatisch zur vorherigen Flink-Version zurückkehren. 
+ **AutoScaling:** Wenn die Anwendung hochskaliert, aber Probleme bei der Wiederherstellung von einem Savepoint auftreten, weil die Operatoren zwischen dem Snapshot und dem Flink-Job-Diagramm nicht übereinstimmen.

## Verwenden Sie den Vorgang für System-Rollbacks APIs
<a name="operation-apis"></a>

Um für mehr Transparenz zu sorgen, bietet Amazon Managed Service für Apache Flink zwei Funktionen, die sich auf Anwendungsvorgänge APIs beziehen und Ihnen helfen können, Fehler und damit verbundene System-Rollbacks nachzuverfolgen.

`ListApplicationOperations`

Diese API listet alle in der Anwendung ausgeführten Operationen, einschließlich, und anderer `UpdateApplication` `Maintenance``RollbackApplication`, in umgekehrter chronologischer Reihenfolge auf. In der folgenden Beispielanforderung für die `ListApplicationOperations` Aktion werden die ersten 10 Anwendungsvorgänge für die Anwendung aufgeführt:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

Die folgende Beispielanforderung für `ListApplicationOperations` hilft dabei, die Liste nach früheren Updates für die Anwendung zu filtern:

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

Diese API bietet detaillierte Informationen zu einem bestimmten Vorgang, der unter aufgeführt ist`ListApplicationOperations`, einschließlich der Fehlerursache, falls zutreffend. Die folgende Beispielanforderung für die `DescribeApplicationOperation` Aktion listet Details für einen bestimmten Anwendungsvorgang auf:

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

Informationen zur Problembehebung finden Sie unter [Bewährte Methoden für das Rollback von Systemen](troubleshooting-system-rollback.md).

# Führen Sie eine Managed Service für Apache Flink-Anwendung aus
<a name="how-running-apps"></a>

Dieser Abschnitt enthält Informationen zum Ausführen eines Managed Service für Apache Flink.

Wenn Sie Ihre Anwendung, die Managed Service für Apache Flink nutzt, ausführen, erstellt der Service einen Apache-Flink-Auftrag. Ein Apache-Flink-Auftrag ist der Ausführungszyklus Ihrer Anwendung, die Managed Service für Apache Flink nutzt. Die Ausführung des Auftrags und die verwendeten Ressourcen werden vom Auftragsmanager verwaltet. Der Auftragsmanager unterteilt die Ausführung der Anwendung in Aufgaben. Jede Aufgabe wird von einem Aufgabenmanager verwaltet. Wenn Sie die Leistung Ihrer Anwendung überwachen, können Sie die Leistung jedes Aufgabenmanagers oder des Auftragsmanagers als Ganzes untersuchen. 

Informationen zu Apache Flink-Jobs finden Sie unter [Jobs and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) in der Apache Flink-Dokumentation.

## Identifizieren Sie den Bewerbungs- und Jobstatus
<a name="how-running-job-status"></a>

Sowohl Ihre Anwendung als auch der Auftrag der Anwendung haben einen aktuellen Ausführungsstatus:
+ **Anwendungsstatus:** Ihre Anwendung hat einen aktuellen Status, der die Ausführungsphase beschreibt. Die folgenden Anwendungsstatus sind möglich:
  + **Stabile Anwendungsstatus:** Ihre Anwendung bleibt in der Regel so lange in diesem Status, bis Sie einen Statuswechsel vornehmen:
    + **BEREIT:** Eine neue oder angehaltene Anwendung befindet sich im BEREIT-Status, bis Sie sie ausführen.
    + **LÄUFT:** Eine Anwendung, die erfolgreich gestartet wurde, befindet sich im LÄUFT-Status.
  + **Vorübergehende Anwendungsstatus:** Eine Anwendung mit einem solchen Status ist in der Regel dabei, in einen anderen Status überzugehen. Wenn eine Anwendung für einen längeren Zeitraum in einem vorübergehenden Status verbleibt, können Sie die Anwendung beenden, indem Sie die [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)Aktion verwenden, bei der der `Force` Parameter auf `true` gesetzt ist. Diese umfassen u. a. folgende:
    + `STARTING:`Tritt nach der [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)Aktion auf. Die Anwendung wechselt vom Status `READY` in den Status `RUNNING`.
    + `STOPPING:`Tritt nach der [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)Aktion auf. Die Anwendung wechselt vom Status `RUNNING` in den Status `READY`.
    + `DELETING:`Tritt nach der [DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html)Aktion auf. Die Anwendung wird gerade gelöscht.
    + `UPDATING:`Tritt nach der [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion auf. Die Anwendung wird gerade aktualisiert und kehrt in den Status `RUNNING` oder `READY` zurück.
    + `AUTOSCALING:`Die Anwendung hat die `AutoScalingEnabled` Eigenschaft auf [ ParallelismConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)gesetzt`true`, und der Dienst erhöht die Parallelität der Anwendung. Wenn sich die Anwendung in diesem Status befindet, ist die einzige gültige API-Aktion, die Sie verwenden können, die [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)Aktion, deren `Force` Parameter auf gesetzt ist. `true` Weitere Informationen zum Auto Scaling finden Sie unter [Verwenden Sie die automatische Skalierung in Managed Service für Apache Flink](how-scaling-auto.md).
    + `FORCE_STOPPING:`Tritt auf, nachdem die [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)Aktion aufgerufen wurde und der `Force` Parameter auf gesetzt ist`true`. Die Anwendung wird gerade zwangsweise angehalten. Die Anwendung wechselt vom Status `STARTING`, `UPDATING`, `STOPPING` oder `AUTOSCALING` in den Status `READY`.
    + `ROLLING_BACK:`Tritt auf, nachdem die [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)Aktion aufgerufen wurde. Die Anwendung wird gerade auf eine frühere Version zurückgesetzt. Die Anwendung wechselt vom Status `UPDATING` oder `AUTOSCALING` in den Status `RUNNING`.
    + `MAINTENANCE:` Tritt auf, während Managed Service für Apache Flink Patches auf Ihre Anwendung einspielt. Weitere Informationen finden Sie unter [Wartungsaufgaben für Managed Service für Apache Flink verwalten](maintenance.md).

  Sie können den Status Ihrer Anwendung mithilfe der Konsole oder mithilfe der [DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html)Aktion überprüfen.
+ **Auftragsstatus:** Wenn sich Ihre Anwendung im `RUNNING`-Status befindet, hat Ihr Auftrag einen Status, der die aktuelle Ausführungsphase beschreibt. Ein Auftrag beginnt im `CREATED`-Status und geht dann in den `RUNNING`-Satus über, wenn er gestartet wurde. Wenn Fehlerzustände auftreten, wechselt Ihre Anwendung in den folgenden Status: 
  + Bei Anwendungen, die Apache Flink 1.11 und höher verwenden, wechselt Ihre Anwendung in den `RESTARTING`-Status.
  + Bei Anwendungen, die Apache Flink 1.8 und niedriger verwenden, wechselt Ihre Anwendung in den `FAILING`-Status.

  Die Anwendung wechselt dann entweder zum Status `RESTARTING` oder `FAILED`, je nachdem, ob der Auftrag neu gestartet werden kann. 

  Sie können den Status des Jobs überprüfen, indem Sie das CloudWatch Protokoll Ihrer Bewerbung auf Statusänderungen überprüfen.

## Batch-Workloads ausführen
<a name="batch-workloads"></a>

Managed Service für Apache Flink unterstützt die Ausführung von Apache-Flink-Batch-Workloads. Wenn in einem Batch-Auftrag ein Apache-Flink-Auftrag den Status **ABGESCHLOSSEN** erreicht, wird der Anwendungsstatus von Managed Service für Apache Flink auf **BEREIT** gesetzt. Weitere Informationen zum Status von Flink-Aufträgen finden Sie unter [Aufträge und Planung](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/).

# Überprüfen Sie die Anwendungsressourcen von Managed Service für Apache Flink
<a name="how-resources"></a>

In diesem Abschnitt werden die Systemressourcen beschrieben, die Ihre Anwendung verwendet. Wenn Sie verstehen, wie Managed Service für Apache Flink Ressourcen bereitstellt und verwendet, können Sie eine leistungsstarke und stabile Anwendung mit Managed Service für die Apache Flink entwerfen, erstellen und verwalten.

## Ressourcen für Managed Service für Apache Flink-Anwendungen
<a name="how-resources-kda"></a>

Managed Service für Apache Flink ist ein AWS Dienst, der eine Umgebung für das Hosten Ihrer Apache Flink-Anwendung schafft. Der Dienst Managed Service for Apache Flink stellt Ressourcen mithilfe von Einheiten bereit, die als **Kinesis Processing Units () KPUs** bezeichnet werden.

Eine KPU steht für die folgenden Systemressourcen:
+ Ein CPU-Kern
+ 4 GB Arbeitsspeicher, davon 1 GB systemeigener Speicher und 3 GB Heap-Speicher
+ 50 GB freier Festplattenspeicher

KPUs **führt Anwendungen in unterschiedlichen Ausführungseinheiten aus, die als **Aufgaben und Unteraufgaben bezeichnet werden**.** Sie können sich eine Unteraufgabe als das Äquivalent eines Threads vorstellen.

Die Anzahl der für eine Anwendung KPUs verfügbaren Dateien entspricht der `Parallelism` Anwendungseinstellung geteilt durch die `ParallelismPerKPU` Anwendungseinstellung. 

Informationen zur Anwendungsparallelität finden Sie unter [Implementieren Sie Anwendungsskalierung](how-scaling.md).

## Ressourcen für die Apache Flink-Anwendung
<a name="how-resources-flink"></a>

Die Apache-Flink-Umgebung weist Ressourcen für Ihre Anwendung mithilfe von Einheiten zu, die als **Aufgabenslots** bezeichnet werden. Wenn Managed Service für Apache Flink Ressourcen für Ihre Anwendung zuweist, weist es einer einzelnen KPU einen oder mehrere Apache-Flink-Aufgabenslots zu. Die Anzahl der Slots, die einer einzelnen KPU zugewiesen sind, entspricht der Einstellung `ParallelismPerKPU` Ihrer Anwendung. Weitere Informationen zu Task-Slots finden Sie unter [Job Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/) in der Apache Flink-Dokumentation.

### Parallelität der Operatoren
<a name="how-resources-flink-operatorparallelism"></a>

Sie können die maximale Anzahl von Unteraufgaben festlegen, die ein Operator verwenden kann. Dieser Wert wird als **Operatorenparallelität** bezeichnet. Standardmäßig entspricht die Parallelität der einzelnen Operatoren in Ihrer Anwendung der Parallelität der Anwendung. Das bedeutet, dass standardmäßig jeder Operator in Ihrer Anwendung bei Bedarf alle verfügbaren Unteraufgaben in der Anwendung verwenden kann.

Sie können die Parallelität der Operatoren in Ihrer Anwendung mithilfe der `setParallelism`-Methode festlegen. Mit dieser Methode können Sie die Anzahl der Unteraufgaben steuern, die jeder Operator gleichzeitig verwenden kann.

Weitere Informationen zu Operatoren finden Sie unter [Operatoren](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) in der Apache Flink-Dokumentation.

### Verkettung von Operatoren
<a name="how-resources-flink-operatorchaining"></a>

Normalerweise verwendet jeder Operator eine separate Unteraufgabe für die Ausführung, aber wenn mehrere Operatoren immer nacheinander ausgeführt werden, kann die Laufzeit sie alle derselben Aufgabe zuweisen. Dieser Vorgang wird **Operatorverkettung** genannt.

Mehrere sequenzielle Operatoren können zu einer einzigen Aufgabe verkettet werden, wenn sie alle mit denselben Daten arbeiten. Dies ist eine Auswahl der Kriterien, die erforderlich sind, damit dies zutrifft:
+ Die Operatoren führen eine einfache 1:1-Weiterleitung durch.
+ Die Operatoren haben alle dieselbe Operatorenparallelität.

Wenn Ihre Anwendung Operatoren zu einer einzigen Unteraufgabe zusammenfasst, werden Systemressourcen geschont, da der Service keine Netzwerkoperationen durchführen und jedem Operator Unteraufgaben zuweisen muss. Um festzustellen, ob Ihre Anwendung Operatorverkettung verwendet, sehen Sie sich das Auftragsdiagramm in der Konsole von Managed Service für Apache Flink an. Jeder Scheitelpunkt in der Anwendung steht für einen oder mehrere Operatoren. Das Diagramm zeigt Operatoren, die zu einem einzigen Scheitelpunkt verkettet wurden.

# Abrechnung pro Sekunde in Managed Service für Apache Flink
<a name="how-pricing"></a>

Managed Service für Apache Flink wird jetzt in Sekundenschritten abgerechnet. Pro Anwendung fällt eine Mindestgebühr von zehn Minuten an. Die Abrechnung pro Sekunde gilt für Anwendungen, die neu gestartet wurden oder bereits ausgeführt werden. In diesem Abschnitt wird beschrieben, wie Managed Service für Apache Flink Ihre Nutzung misst und Ihnen in Rechnung stellt. Weitere Informationen zu den Preisen von Managed Service for Apache Flink finden Sie unter [Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/pricing/) — Preise. 

## Funktionsweise
<a name="how-resources-kda"></a>

Managed Service for Apache Flink berechnet Ihnen die Dauer und Anzahl der **Kinesis Processing Units (KPUs)**, die in Sekundenschritten in den unterstützten Versionen abgerechnet werden. AWS-Regionen Eine einzelne KPU umfasst 1 vCPU-Rechenleistung und 4 GB Arbeitsspeicher. Ihnen wird ein Stundensatz berechnet, der auf der Anzahl der Anwendungen basiert, die für die Ausführung Ihrer KPUs Anwendungen verwendet werden. 

Beispiel: Für eine Anwendung, die 20 Minuten und 10 Sekunden läuft, werden 20 Minuten und 10 Sekunden berechnet, multipliziert mit den verwendeten Ressourcen. Für eine Anwendung, die 5 Minuten lang ausgeführt wird, wird der Mindestbetrag von zehn Minuten berechnet, multipliziert mit den verwendeten Ressourcen.

Managed Service für Apache Flink gibt die Nutzung in Stunden an. Zum Beispiel entsprechen 15 Minuten 0,25 Stunden. 

Für Apache Flink-Anwendungen wird Ihnen eine einzelne zusätzliche KPU pro Anwendung berechnet, die für die Orchestrierung verwendet wird. Anwendungen werden auch für den Betrieb von Speicherplatz und dauerhafte Backups in Rechnung gestellt. Das Ausführen von Anwendungsspeicher wird für die statusbehafteten Verarbeitungsfunktionen in Managed Service for Apache Flink verwendet und wird pro berechnet. GB/month. Durable backups are optional and provide point-in-time recovery for applications, charged per GB/month 

Im Streaming-Modus skaliert Managed Service for Apache Flink automatisch die Anzahl der von Ihrer Stream-Verarbeitungsanwendung KPUs benötigten Dateien, wenn die Anforderungen an Speicher und Rechenleistung schwanken. Sie können wählen, ob Sie Ihrer Anwendung die erforderliche Anzahl von bereitstellen möchten. KPUs 

## AWS-Region Verfügbarkeit
<a name="how-pricing-regions"></a>

**Anmerkung**  
Derzeit ist die Abrechnung pro Sekunde in den folgenden Regionen nicht verfügbar: AWS GovCloud (USA-Ost), AWS GovCloud (US-West), China (Peking) und China (Ningxia).

Die Abrechnung pro Sekunde ist in den folgenden Ländern verfügbar: AWS-Regionen
+ USA Ost (Nord-Virginia) – us-east-1
+ USA Ost (Ohio) – us-east-2
+ USA West (Nordkalifornien) – us-west-1
+ USA West (Oregon) – us-west-2
+ Afrika (Kapstadt) – af-south-1
+ Asien-Pazifik (Hongkong) – ap-east-1
+ Asien-Pazifik (Hyderabad) - ap-south-1
+ Asien-Pazifik (Jakarta) – ap-southeast-3
+ Asien-Pazifik (Melbourne) - ap-southeast-4
+ Asien-Pazifik (Mumbai) – ap-south-1
+ Asien-Pazifik (Osaka) – ap-northeast-3
+ Asien-Pazifik (Seoul) – ap-northeast-2
+ Asien-Pazifik (Singapur) – ap-southeast-1
+ Asien-Pazifik (Sydney) – ap-southeast-2
+ Asien-Pazifik (Tokio) – ap-northeast-1
+ Kanada (Zentral) – ca-central-1
+ Kanada West (Calgary) – ca-west-1
+ Europa (Frankfurt) – eu-central-1
+ Europa (Irland) – eu-west-1
+ Europa (London) – eu-west-2
+ Europa (Mailand) – eu-south-1
+ Europa (Paris) – eu-west-3
+ Europa (Spanien) – eu-south-2
+ Europa (Stockholm) – eu-north-1
+ Europa (Zürich) – eu-central-2
+ Israel (Tel Aviv) - il-central-1
+ Naher Osten (Bahrain) – me-south-1
+ Naher Osten (VAE) – me-central-1
+ Südamerika (São Paulo) – sa-east-1

## Beispiele für Preisgestaltung
<a name="how-pricing-examples"></a>

Preisbeispiele finden Sie auf der Preisseite für Managed Service for Apache Flink. Weitere Informationen finden Sie unter [Amazon Managed Service for Apache Flink — Preise](https://aws.amazon.com/managed-service-apache-flink/pricing/). Im Folgenden finden Sie weitere Beispiele mit Abbildungen des jeweiligen Kostennutzungsberichts.

### Eine lang andauernde, schwere Arbeitslast
<a name="pricing-example-1"></a>

Sie sind ein großer Video-Streaming-Dienst und möchten eine Videoempfehlung in Echtzeit erstellen, die auf den Interaktionen Ihrer Nutzer basiert. Sie verwenden eine Apache Flink-Anwendung in Managed Service für Apache Flink, um kontinuierlich Benutzerinteraktionsereignisse aus mehreren Kinesis-Datenströmen aufzunehmen und Ereignisse in Echtzeit zu verarbeiten, bevor sie an ein nachgeschaltetes System ausgegeben werden. Benutzerinteraktionsereignisse werden mithilfe mehrerer Operatoren transformiert. Dazu gehören die Partitionierung von Daten nach Ereignistyp, die Anreicherung von Daten mit zusätzlichen Metadaten, das Sortieren von Daten nach Zeitstempel und das Zwischenspeichern von Daten für 5 Minuten vor der Auslieferung. Die Anwendung umfasst viele Transformationsschritte, die rechenintensiv und parallelisierbar sind. Ihre Flink-Anwendung ist so konfiguriert, dass sie mit 20 KPUs ausgeführt wird, um der Arbeitslast gerecht zu werden. Ihre Anwendung verwendet täglich 1 GB an dauerhaftem Anwendungs-Backup. Die monatlichen Gebühren für Managed Service für Apache Flink werden wie folgt berechnet:

**Monatliche Gebühren**

Der Preis in der Region USA Ost (Nord-Virginia) beträgt 0,11\$1 pro KPU-Stunde. Managed Service für Apache Flink weist 50 GB Speicher für laufende Anwendungen pro KPU zu und berechnet 0,10 USD pro GB/Monat.
+ Monatliche KPU-Gebühren: 24 Stunden x 30 Tage \$1 (20 KPUs \$1 1 zusätzliche KPU für Streaming-Anwendungen) \$1 0,11 USD/Stunde = 1.584,00 USD
+ Monatliche Speichergebühren für laufende Anwendungen: 30 Tage \$1 20 \$1 50 \$1 0,10 USD/GB-Monat = 100,00 USD KPUs GB/KPUs 
+ Monatliche Speichergebühren für dauerhafte Anwendungen: 30 Tage \$1 1 GB \$1 0,023 GB-Monat = 0,03 USD
+ **Gesamtkosten: 1.584,00 USD \$1 100 USD \$1 0,03 USD = 1.684,03 USD**

**Kostennutzungsbericht für Managed Service for Apache Flink auf der Billing and Cost Management-Konsole für den Monat**

Kinesis-Analytik
+ 1.684,03 USD — USA Ost (Nord-Virginia)
+ Amazon Kinesis Analytics CreateSnapshot
  + 0,023\$1 pro GB/Monat an dauerhaften Anwendungs-Backups
    + 1 GB-Monat — 0,03 USD
+ Amazon Kinesis Analytics StartApplication
  + 0,10\$1 pro GB pro Monat laufendem Anwendungsspeicher
    + 1.000 GB pro Monat — 100 USD
  + 0,11\$1 pro Kinesis Processing Unit-Stunde für Apache Flink-Anwendungen
    + 15.120 KPU-Stunde — 1.584 USD

### Ein Batch-Workload, der täglich \$115 Minuten lang ausgeführt wird
<a name="pricing-example-2"></a>

Sie verwenden eine Apache Flink-Anwendung in Managed Service für Apache Flink, um Protokolldaten in Amazon Simple Storage Service (Amazon S3) im Batch-Modus zu transformieren. Die Protokolldaten werden mithilfe mehrerer Operatoren transformiert. Dazu gehören die Anwendung eines Schemas auf die verschiedenen Protokollereignisse, die Partitionierung der Daten nach Ereignistyp und die Sortierung der Daten nach Zeitstempel. Die Anwendung hat viele Transformationsschritte, aber keiner ist rechenintensiv. Diese Anwendung nimmt in einem Monat mit 30 Tagen täglich 2.000 Daten records/second für 15 Minuten auf. Sie erstellen keine dauerhaften Anwendungs-Backups. Die monatlichen Gebühren für Managed Service für Apache Flink werden wie folgt berechnet:

**Monatliche Gebühren**

Der Preis in der Region USA Ost (Nord-Virginia) beträgt 0,11\$1 pro KPU-Stunde. Managed Service für Apache Flink weist 50 GB Speicher für laufende Anwendungen pro KPU zu und berechnet 0,10 USD pro GB/Monat.
+ Batch-Workload: Während der 15 Minuten pro Tag verarbeitet die Anwendung Managed Service for Apache Flink 2.000 records/second, which takes 2KPUs. 30 days/month \$1 15 minutes/day = 450 minutes/month
+ Monatliche KPU-Gebühren: 450 minutes/month \$1 (2 KPUs \$11 zusätzliche KPU für Streaming-Anwendungen) \$1 0,11 USD/Stunde = 2,48 USD
+ Monatliche Speichergebühren für laufende Anwendungen: 450 minutes/month \$1 2 KPUs \$1 50 \$1 0,10 USD/GB-Monat = 0,11 USD GB/KPUs 
+ **Gesamtkosten: 2,48 USD \$1 0,11 = 2,59 USD**

**Kostennutzungsbericht für Managed Service for Apache Flink auf der Billing and Cost Management-Konsole für den Monat**

Kinesis-Analytik
+ 2,59 USD — USA Ost (Nord-Virginia)
+ Amazon Kinesis Analytics StartApplication
  + 0,10\$1 pro GB pro Monat laufender Anwendungs-Backups
    + 1,042 GB-Monat — 0,11 USD
  + 0,11\$1 pro Kinesis Processing Unit-Stunde für Apache Flink-Anwendungen
    + 22,5 KPU-Stunde — 2,48 USD

### Eine Testanwendung, die in derselben Stunde kontinuierlich beendet und gestartet wird und für die mehrere Mindestgebühren anfallen
<a name="pricing-example-3"></a>

Sie sind eine große E-Commerce-Plattform, die täglich Millionen von Transaktionen verarbeitet. Sie möchten die Betrugserkennung in Echtzeit entwickeln. Sie verwenden eine Apache Flink-Anwendung in Managed Service für Apache Flink, um Transaktionsereignisse aus Kinesis Data Streams aufzunehmen und Ereignisse in Echtzeit mit verschiedenen Transformationsschritten zu verarbeiten. Dazu gehören die Verwendung eines Schiebefensters zur Zusammenfassung von Ereignissen, die Partitionierung von Ereignissen nach Ereignistypen und die Anwendung spezifischer Erkennungsregeln für verschiedene Ereignistypen. Während der Entwicklung starten und beenden Sie Ihre Anwendung mehrmals, um das Verhalten zu testen und zu debuggen. Es gibt Situationen, in denen Ihre Anwendung nur für ein paar Minuten läuft. Es gibt eine Stunde, in der Sie Ihre Anwendung mit 4 testen KPUs und Ihre Anwendung keine dauerhaften Anwendungs-Backups verwendet:
+ Um 10:05 Uhr starten Sie Ihre Anwendung, die 30 Minuten lang ausgeführt wird, bevor sie um 10:35 Uhr gestoppt wird.
+ Um 10:40 Uhr starten Sie Ihre Anwendung erneut, die 5 Minuten lang ausgeführt wird, bevor sie um 10:45 Uhr gestoppt wird.
+ Um 10:50 Uhr starten Sie die Anwendung erneut, die 2 Minuten lang ausgeführt wird, bevor sie um 10:52 Uhr gestoppt wird.

Managed Service für Apache Flink berechnet bei jedem Start einer Anwendung eine Nutzungsdauer von mindestens 10 Minuten. Die monatliche Nutzung von Managed Service for Apache Flink für Ihre Anwendung wird wie folgt berechnet:
+ Ihre Anwendung wird zum ersten Mal gestartet und gestoppt: 30 Minuten Nutzungsdauer
+ Beim zweiten Start und Stopp Ihrer Anwendung: 10 Minuten Nutzung (Ihre Anwendung läuft 5 Minuten, aufgerundet auf die Mindestgebühr von 10 Minuten)
+ Beim dritten Start und Stopp Ihrer Anwendung: 10 Minuten Nutzung (Ihre Anwendung läuft 2 Minuten, aufgerundet auf die Mindestgebühr von 10 Minuten)

Insgesamt würden Ihrer Anwendung 50 Minuten Nutzungsdauer in Rechnung gestellt. Wenn es in dem Monat keine anderen Zeiten gibt, zu denen Ihre Anwendung ausgeführt wird, werden die monatlichen Gebühren für Managed Service für Apache Flink wie folgt berechnet:

**Monatliche Gebühren**

Der Preis in der Region USA Ost (Nord-Virginia) beträgt 0,11\$1 pro KPU-Stunde. Managed Service für Apache Flink weist 50 GB Speicher für laufende Anwendungen pro KPU zu und berechnet 0,10 USD pro GB/Monat.
+ Monatliche KPU-Gebühren: 50 Minuten \$1 (4 KPUs \$1 1 zusätzliche KPU für Streaming-Anwendungen) \$1 0,11 USD/Stunde = 0,46 USD (auf den nächsten Cent gerundet)
+ Monatliche Speichergebühren für laufende Anwendungen: 50 Minuten \$1 4 \$1 50 \$1 0,10 KPUs USD/GB pro Monat = GB/KPUs 0,03 USD (auf den nächsten Cent gerundet)
+ **Gesamtkosten: 0,46 USD \$1 0,03 = 0,49 USD**

**Kostennutzungsbericht für Managed Service for Apache Flink auf der Billing and Cost Management-Konsole für den Monat**

Kinesis-Analytik
+ 0,49 USD — USA Ost (Nord-Virginia)
+ Amazon Kinesis Analytics StartApplication
  + 0,10\$1 pro GB pro Monat laufendem Anwendungsspeicher
    + 0,232 GB-Monat — 0,03 USD
  + 0,11\$1 pro Kinesis Processing Unit-Stunde für Apache Flink-Anwendungen
    + 4,167 KPU-Stunde — 0,46 USD

# DataStream API-Komponenten überprüfen
<a name="how-datastream"></a>

Ihre Apache Flink-Anwendung verwendet die [Apache DataStream Flink-API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/), um Daten in einen Datenstrom umzuwandeln. 

In diesem Abschnitt werden die verschiedenen Komponenten beschrieben, die Daten verschieben, transformieren und verfolgen:
+ [Verwenden Sie Konnektoren, um Daten in Managed Service für Apache Flink mit der DataStream API zu verschieben](how-connectors.md): Diese Komponenten verschieben Daten zwischen Ihrer Anwendung und externen Datenquellen und Zielen.
+ [Transformieren Sie Daten mithilfe von Operatoren in Managed Service für Apache Flink mit der API DataStream](how-operators.md): Diese Komponenten transformieren oder gruppieren Datenelemente innerhalb Ihrer Anwendung.
+ [Ereignisse in Managed Service für Apache Flink mithilfe der DataStream API verfolgen](how-time.md): In diesem Thema wird beschrieben, wie Managed Service for Apache Flink Ereignisse bei der Verwendung der DataStream API verfolgt.

# Verwenden Sie Konnektoren, um Daten in Managed Service für Apache Flink mit der DataStream API zu verschieben
<a name="how-connectors"></a>

In der Amazon Managed Service for Apache DataStream Flink-API sind *Konnektoren* Softwarekomponenten, die Daten in und aus einer Managed Service for Apache Flink-Anwendung übertragen. Konnektoren sind flexible Integrationen, mit denen Sie aus Dateien und Verzeichnissen lesen können. Konnektoren bestehen aus kompletten Modulen für die Interaktion mit Amazon-Services und Systemen von Drittanbietern.

Zu den Konnektoren gehören die folgenden:
+ [Fügen Sie Streaming-Datenquellen hinzu](how-sources.md): Stellen Ihrer Anwendung Daten aus einem Kinesis Data Stream, einer Datei oder einer anderen Datenquelle zur Verfügung.
+ [Schreiben Sie Daten mithilfe von Senken](how-sinks.md): Senden Sie Daten aus Ihrer Anwendung an einen Kinesis-Datenstream, Firehose-Stream oder ein anderes Datenziel.
+ [Verwenden Sie asynchrone I/O](how-async.md): Ermöglicht asynchronen Zugriff auf eine Datenquelle (z. B. eine Datenbank), um Stream-Ereignisse zu bereichern. 

## Verfügbare Konnektoren
<a name="how-connectors-list"></a>

Das Apache-Flink-Framework enthält Konnektoren für den Zugriff auf Daten aus verschiedenen Quellen. Informationen zu den im Apache Flink-Framework verfügbaren Konnektoren finden Sie unter [Konnektoren](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) in der [Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Warnung**  
Wenn Sie Anwendungen haben, die auf Flink 1.6, 1.8, 1.11 oder 1.13 laufen und in den Regionen Naher Osten (VAE), Asien-Pazifik (Hyderabad), Israel (Tel Aviv), Europa (Zürich), Naher Osten (VAE), Asien-Pazifik (Melbourne) oder Asien-Pazifik (Jakarta) laufen möchten, müssen Sie möglicherweise Ihr Anwendungsarchiv mit einem aktualisierten Konnektor neu erstellen oder auf Flink 1.18 aktualisieren.   
Apache Flink-Konnektoren werden in ihren eigenen Open-Source-Repositorys gespeichert. Wenn Sie auf Version 1.18 oder höher aktualisieren, müssen Sie Ihre Abhängigkeiten aktualisieren. Informationen zum Zugriff auf das Repository für Apache AWS Flink-Konnektoren finden Sie unter. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
Die frühere `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` Kinesis-Quelle wurde eingestellt und könnte mit einer future Version von Flink entfernt werden. Verwenden Sie stattdessen [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Es besteht keine staatliche Kompatibilität zwischen `FlinkKinesisConsumer` und`KinesisStreamsSource`. Einzelheiten finden Sie unter [Migration vorhandener Jobs auf eine neue Kinesis Streams-Quelle](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) in der Apache Flink-Dokumentation.  
 Im Folgenden finden Sie die empfohlenen Richtlinien:   


**Konnektor-Upgrades**  

| Flink-Version | Verwendeter Konnektor | Auflösung | 
| --- | --- | --- | 
| 1.19, 1.20 | Kinesis-Quelle |  Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Kinesis Data Streams Streams-Quellconnector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Kinesis-Spüle |  Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Kinesis Data Streams Streams-Sink-Connector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | DynamoDB-Streams-Quelle |  Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten DynamoDB Streams Streams-Quellconnector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | DynamoDB-Senke | Stellen Sie beim Upgrade auf Managed Service for Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten DynamoDB-Sink-Connector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Amazon SQS SQS-Spüle |  Stellen Sie beim Upgrade auf Managed Service für Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Amazon SQS SQS-Sink-Connector verwenden. Das muss eine beliebige Version 5.0.0 oder höher sein. Weitere Informationen finden Sie unter [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1,19, 1,20 | Amazon Managed Service für Prometheus Sink |  Stellen Sie beim Upgrade auf Managed Service für Apache Flink Version 1.19 und 1.20 sicher, dass Sie den neuesten Amazon Managed Service for Prometheus Sink Connector verwenden. Das muss eine beliebige Version 1.0.0 oder höher sein. Weitere Informationen finden Sie unter [Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/) Sink.  | 

# Fügen Sie Streaming-Datenquellen zu Managed Service für Apache Flink hinzu
<a name="how-sources"></a>

Apache Flink bietet Konnektoren zum Lesen aus Dateien, Sockets, Sammlungen und benutzerdefinierten Quellen. In Ihrem Anwendungscode verwenden Sie eine [Apache Flink-Quelle](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources), um Daten aus einem Stream zu empfangen. In diesem Abschnitt werden die Quellen beschrieben, die für Amazon-Services verfügbar sind.

## Verwenden Sie Kinesis-Datenstreams
<a name="input-streams"></a>

Das `KinesisStreamsSource` stellt Streaming-Daten aus einem Amazon Kinesis Kinesis-Datenstream für Ihre Anwendung bereit. 

### Erstellen eines `KinesisStreamsSource`
<a name="input-streams-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Weitere Informationen zur Verwendung von finden Sie unter [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) in der Apache Flink-Dokumentation und in [unserem öffentlichen KinesisConnectors Beispiel auf](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) Github. `KinesisStreamsSource`

### Erstellen Sie einen`KinesisStreamsSource`, der einen EFO-Consumer verwendet
<a name="input-streams-efo"></a>

Der unterstützt `KinesisStreamsSource` jetzt [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Wenn ein Kinesis-Verbraucher EFO verwendet, stellt ihm der Kinesis Data Streams-Service seine eigene dedizierte Bandbreite zur Verfügung, anstatt dass der Verbraucher die feste Bandbreite des Streams mit den anderen Verbrauchern teilt, die aus dem Stream lesen.

Weitere Informationen zur Verwendung von EFO mit Kinesis Consumer finden Sie unter [FLIP-128: Verbesserter Lüfterausgang](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) für Kinesis-Verbraucher. AWS 

Sie aktivieren den EFO-Consumer, indem Sie die folgenden Parameter für den Kinesis-Consumer festlegen:
+ **READER\$1TYPE:** Setzen Sie diesen Parameter auf **EFO**, damit Ihre Anwendung einen EFO-Consumer für den Zugriff auf die Kinesis Data Stream-Daten verwendet. 
+ **EFO\$1CONSUMER\$1NAME:** Setzen Sie diesen Parameter auf einen Zeichenfolgenwert, der unter den Verbrauchern dieses Streams eindeutig ist. Die Wiederverwendung eines Verbrauchernamens in demselben Kinesis Data Stream führt dazu, dass der vorherige Verbraucher, der diesen Namen verwendet hat, beendet wird. 

Um einen `KinesisStreamsSource` für die Verwendung von EFO zu konfigurieren, fügen Sie dem Verbraucher die folgenden Parameter hinzu:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Ein Beispiel für eine Managed Service for Apache Flink-Anwendung, die einen EFO-Consumer verwendet, finden Sie in [unserem öffentlichen Kinesis Connectors-Beispiel](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) auf Github.

## Verwenden Sie Amazon MSK
<a name="input-msk"></a>

Die `KafkaSource`-Quelle stellt Streaming-Daten aus einem Amazon MSK-Thema für Ihre Anwendung bereit. 

### Erstellen eines `KafkaSource`
<a name="input-msk-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Weitere Informationen zur Verwendung von `KafkaSource` finden Sie unter [MSK-Replikation](earlier.md#example-msk).

# Schreiben Sie Daten mithilfe von Senken in Managed Service für Apache Flink
<a name="how-sinks"></a>

In Ihrem Anwendungscode können Sie jeden [Apache Flink-Sink-Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) verwenden, um in externe Systeme zu schreiben, einschließlich AWS Dienste wie Kinesis Data Streams und DynamoDB.

Apache Flink bietet auch Senken für Dateien und Sockets, und Sie können benutzerdefinierte Senken implementieren. Unter den verschiedenen unterstützten Senken werden häufig die folgenden verwendet:

## Verwenden Sie Kinesis-Datenstreams
<a name="sinks-streams"></a>

Apache Flink bietet Informationen zum [Kinesis Data Streams-Konnector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) in der Apache Flink-Dokumentation.

Ein Beispiel für eine Anwendung, die einen Kinesis Data Stream für Eingabe und Ausgabe verwendet, finden Sie unter [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md).

## Verwenden Sie Apache Kafka und Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

Der [Apache Flink Kafka Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) bietet umfassende Unterstützung für die Veröffentlichung von Daten in Apache Kafka und Amazon MSK, einschließlich exakt einmaliger Garantien. Informationen zum Schreiben in Kafka finden Sie in den [Beispielen für Kafka Connectors](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) in der Apache Flink-Dokumentation.

## Verwenden Sie Amazon S3
<a name="sinks-s3"></a>

Sie können Apache Flink `StreamingFileSink` verwenden, um Objekte in einen Amazon S3-Bucket zu schreiben.

Ein Beispiel dafür, wie man Objekte in S3 schreibt, finden Sie unter [Beispiel: In einen Amazon S3 S3-Bucket schreiben](earlier.md#examples-s3). 

## Benutze Firehose
<a name="sinks-firehose"></a>

Das `FlinkKinesisFirehoseProducer` ist eine zuverlässige, skalierbare Apache Flink-Senke zum Speichern von Anwendungsausgaben mithilfe des [Firehose-Dienstes.](https://docs.aws.amazon.com/firehose/latest/dev/) In diesem Abschnitt wird die Einrichtung eines Maven-Projekts beschrieben, um einen `FlinkKinesisFirehoseProducer` zu erstellen und zu verwenden.

**Topics**
+ [Erstellen eines `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer`-Codebeispiel](#sinks-firehose-sample)

### Erstellen eines `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Das folgende Code-Beispiel zeigt das Erstellen eines `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer`-Codebeispiel
<a name="sinks-firehose-sample"></a>

Das folgende Codebeispiel zeigt, wie Sie einen Apache Flink-Datenstream erstellen `FlinkKinesisFirehoseProducer` und konfigurieren und Daten von diesem an den Firehose senden.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Ein vollständiges Tutorial zur Verwendung der Firehose-Spüle finden Sie unter[Beispiel: An Firehose schreiben](earlier.md#get-started-exercise-fh).

# Verwenden Sie Asynchronous I/O in Managed Service für Apache Flink
<a name="how-async"></a>

Ein I/O asynchroner Operator reichert Stream-Daten mithilfe einer externen Datenquelle wie einer Datenbank an. Managed Service für Apache Flink reichert die Stream-Ereignisse asynchron an, sodass Anfragen zur Steigerung der Effizienz gebündelt werden können. 

Weitere Informationen finden Sie unter [Asynchronous I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) in der Apache Flink-Dokumentation.

# Transformieren Sie Daten mithilfe von Operatoren in Managed Service für Apache Flink mit der API DataStream
<a name="how-operators"></a>

Um eingehende Daten in einem Managed Service für Apache Flink umzuwandeln, verwenden Sie einen Apache-Flink-*Operator*. Ein Apache-Flink-Operator wandelt einen oder mehrere Datenströme in einen neuen Datenstrom um. Der neue Datenstrom enthält modifizierte Daten aus dem ursprünglichen Datenstrom. Apache Flink bietet mehr als 25 vorgefertigte Operatoren zur Stream-Verarbeitung. Weitere Informationen finden Sie unter [Operatoren](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) in der Apache Flink-Dokumentation.

**Topics**
+ [Verwenden Sie Transformationsoperatoren](#how-operators-transform)
+ [Verwenden Sie Aggregationsoperatoren](#how-operators-agg)

## Verwenden Sie Transformationsoperatoren
<a name="how-operators-transform"></a>

Im Folgenden finden Sie ein Beispiel für eine einfache Texttransformation in einem der Felder eines JSON-Datenstroms. 

Dieser Code erstellt einen transformierten Datenstrom. Der neue Datenstrom enthält dieselben Daten wie der ursprüngliche Stream, wobei die Zeichenfolge „` Company`“ an den Inhalt des `TICKER`-Felds angehängt wird.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Verwenden Sie Aggregationsoperatoren
<a name="how-operators-agg"></a>

Es folgt ein Beispiel für einen Aggregationsoperator. Der Code erstellt einen aggregierten Datenstrom. Der Operator erstellt ein 5-sekündiges rollierendes Fenster und gibt die Summe der `PRICE`-Werte für die Datensätze im Fenster mit demselben `TICKER`-Wert zurück.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Weitere Codebeispiele finden Sie unter [Beispiele für die Erstellung von und die Arbeit mit Managed Service für Apache Flink-Anwendungen](examples-collapsibles.md). 

# Ereignisse in Managed Service für Apache Flink mithilfe der DataStream API verfolgen
<a name="how-time"></a>

Managed Service für Apache Flink verfolgt Ereignisse mit den folgenden Zeitstempeln:
+ **Verarbeitungszeit:** Bezieht sich auf die Systemzeit der Maschine, die den jeweiligen Vorgang ausführt.
+ **Ereigniszeit:** Bezieht sich auf die Zeit, zu der jedes einzelne Ereignis auf seinem produzierenden Gerät eingetreten ist.
+ **Erfassungszeit:** Bezieht sich auf den Zeitpunkt, zu dem Ereignisse in den Service von Managed Service für Apache Flink eingehen.

Sie legen die von der Streaming-Umgebung verwendete Zeit mithilfe von fest`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Weitere Informationen zu Zeitstempeln finden Sie unter [Generieren von Wasserzeichen](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) in der Apache Flink-Dokumentation.

# API-Komponenten von Review Table
<a name="how-table"></a>

Ihre Apache-Flink-Anwendung verwendet die [Apache Flink Table API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/), um mithilfe eines relationalen Modells mit Daten in einem Stream zu interagieren. Sie verwenden die Tabellen-API, um mithilfe von Tabellenquellen auf Daten zuzugreifen, und verwenden dann Tabellenfunktionen, um Tabellendaten zu transformieren und zu filtern. Sie können Tabellendaten entweder mithilfe von API-Funktionen oder SQL-Befehlen transformieren und filtern. 

In diesem Abschnitt werden folgende Themen behandelt:
+ [Tabellen-API-Konnektoren](how-table-connectors.md): Diese Komponenten verschieben Daten zwischen Ihrer Anwendung und externen Datenquellen und -zielen.
+ [Zeitattribute der Tabellen-API](how-table-timeattributes.md): In diesem Thema wird beschrieben, wie Managed Service for Apache Flink Ereignisse verfolgt, wenn die Tabellen-API verwendet wird.

# Tabellen-API-Konnektoren
<a name="how-table-connectors"></a>

Im Apache Flink-Programmiermodell sind Konnektoren Komponenten, die Ihre Anwendung verwendet, um Daten aus externen Quellen, z. B. anderen AWS Diensten, zu lesen oder zu schreiben.

Mit der Apache Flink Table API können Sie die folgenden Arten von Konnektoren verwenden:
+ [Tabellen-API-Quellen](#how-table-connectors-source): Sie verwenden Tabellen-API-Quellkonnektoren, um Tabellen innerhalb Ihrer `TableEnvironment` zu erstellen, indem Sie entweder API-Aufrufe oder SQL-Abfragen verwenden.
+ [Die Tabellen-API sinkt](#how-table-connectors-sink): Sie verwenden SQL-Befehle, um Tabellendaten in externe Quellen wie ein Amazon-MSK-Thema oder einen Amazon-S3-Bucket zu schreiben.

## Tabellen-API-Quellen
<a name="how-table-connectors-source"></a>

Sie erstellen eine Tabellenquelle aus einem Datenstrom. Der folgende Code erstellt eine Tabelle aus einem Amazon-MSK-Thema:

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

Weitere Informationen zu Tabellenquellen finden Sie unter [Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) in der Apache Flink-Dokumentation.

## Die Tabellen-API sinkt
<a name="how-table-connectors-sink"></a>

Um Tabellendaten in eine Senke zu schreiben, erstellen Sie die Senke in SQL und führen dann die SQL-basierte Senke für das `StreamTableEnvironment`-Objekt aus.

Das folgende Codebeispiel zeigt, wie Tabellendaten in eine Amazon-S3-Senke geschrieben werden:

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 Sie können den `format`-Parameter verwenden, um zu steuern, welches Format Managed Service für Apache Flink verwendet, um die Ausgabe in die Senke zu schreiben. Informationen zu Formaten finden Sie unter [Unterstützte Konnektoren](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/) in der Apache Flink-Dokumentation.

## Benutzerdefinierte Quellen und Senken
<a name="how-table-connectors-userdef"></a>

Sie können vorhandene Apache-Kafka-Konnektoren verwenden, um Daten zu und von anderen AWS -Services wie Amazon MSK und Amazon S3 zu senden. Für die Interaktion mit anderen Datenquellen und -zielen können Sie Ihre eigenen Quellen und Senken definieren. Weitere Informationen finden Sie unter [Benutzerdefinierte Quellen und Senken in der Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/).

# Zeitattribute der Tabellen-API
<a name="how-table-timeattributes"></a>

Jeder Datensatz in einem Datenstrom hat mehrere Zeitstempel, die definieren, wann Ereignisse im Zusammenhang mit dem Datensatz aufgetreten sind:
+ **Ereigniszeit**: Ein benutzerdefinierter Zeitstempel, der definiert, wann das Ereignis eingetreten ist, durch das der Datensatz erstellt wurde.
+ **Erfassungszeit**: Der Zeitpunkt, zu dem Ihre Anwendung den Datensatz aus dem Datenstrom abgerufen hat.
+ **Verarbeitungszeit**: Der Zeitpunkt, zu dem Ihre Anwendung den Datensatz verarbeitet hat.

Wenn die Apache Flink Table-API Fenster auf der Grundlage von Rekordzeiten erstellt, definieren Sie mithilfe der Methode, welche dieser Zeitstempel sie verwendet. `setStreamTimeCharacteristic` 

Weitere Informationen zur Verwendung von Zeitstempeln mit der Tabellen-API finden Sie unter [Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/) and [Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/) in der Apache Flink-Dokumentation.

# Verwenden Sie Python mit Managed Service für Apache Flink
<a name="how-python"></a>

**Anmerkung**  
Wenn Sie die Python-Flink-Anwendung auf einem neuen Mac mit Apple Silicon-Chip entwickeln, können einige [bekannte Probleme](https://issues.apache.org/jira/browse/FLINK-26981) mit den Python-Abhängigkeiten von PyFlink 1.15 auftreten. In diesem Fall empfehlen wir, den Python-Interpreter in Docker auszuführen. step-by-stepAnweisungen finden Sie unter [PyFlink 1.15-Entwicklung auf Apple](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon) Silicon Mac.

Apache Flink Version 2.2 unterstützt das Erstellen von Anwendungen mit Python Version 3.12; die Unterstützung für Python Version 3.8 wurde entfernt. Weitere Informationen finden Sie unter [Flink Python Docs](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/). Gehen Sie wie folgt vor, um mithilfe von Python eine Anwendung zu erstellen, die Managed Service für Apache Flink nutzt:
+ Erstellen Sie Ihren Python-Anwendungscode als Textdatei mit einer `main`-Methode.
+ Bündeln Sie Ihre Anwendungscodedatei und alle Python- oder Java-Abhängigkeiten in einer ZIP-Datei und laden Sie sie in einen Amazon-S3-Bucket hoch.
+ Erstellen Sie Ihre Anwendung, die Managed Service für Apache Flink nutzt, und geben Sie dabei Ihren Amazon-S3-Codespeicherort sowie die Anwendungseigenschaften und die Anwendungseinstellungen an.

Auf einer hohen Ebene ist die Python Table API ein Wrapper rund um die Java Table API. Informationen zur Python-Tabellen-API finden Sie im [Tabellen-API-Tutorial](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/) in der Apache Flink-Dokumentation.

# Programmieren Sie Ihre Python-Anwendung Managed Service für Apache Flink
<a name="how-python-programming"></a>

Sie programmieren Ihre Python-Anwendung, die Managed Service für Apache Flink nutzt, mithilfe der Apache Flink Python Table API. Die Apache-Flink-Engine übersetzt Python-Table-API-Anweisungen (die in der Python-VM ausgeführt werden) in Java-Table-API-Anweisungen (die in der Java-VM ausgeführt werden). 

Sie verwenden die Python Table API folgendermaßen:
+ Erstellen Sie eine Referenz auf die `StreamTableEnvironment`.
+ Erstellen Sie `table`-Objekte aus Ihren Quell-Streaming-Daten, indem Sie Abfragen für die `StreamTableEnvironment`-Referenz ausführen.
+ Führen Sie Abfragen an Ihren `table`-Objekten aus, um Ausgabetabellen zu erstellen.
+ Schreiben Sie Ihre Ausgabetabellen mit einem `StatementSet` an Ihre Ziele.

Informationen zu den ersten Schritten mit der Python Table API in Managed Service für Apache Flink finden Sie unter [Erste Schritte mit Amazon Managed Service für Apache Flink für Python](gs-python.md).

## Streaming-Daten lesen und schreiben
<a name="how-python-programming-readwrite"></a>

Um Streaming-Daten zu lesen und zu schreiben, führen Sie SQL-Abfragen in der Tabellenumgebung aus.

### Erstellen einer Tabelle
<a name="how-python-programming-readwrite-createtable"></a>

Das folgende Codebeispiel demonstriert eine benutzerdefinierte Funktion, die eine SQL-Abfrage erstellt. Die SQL-Abfrage erstellt eine Tabelle, die mit einem Kinesis-Stream interagiert:

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### Streaming-Daten lesen
<a name="how-python-programming-readwrite-read"></a>

Das folgende Codebeispiel zeigt, wie die vorherige `CreateTable`-SQL-Abfrage für eine Tabellenumgebungsreferenz zum Lesen von Daten verwendet wird:

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### Schreiben Sie Streaming-Daten
<a name="how-python-programming-readwrite-write"></a>

Das folgende Codebeispiel zeigt, wie Sie die SQL-Abfrage aus dem `CreateTable`-Beispiel verwenden, um eine Ausgabetabellenreferenz zu erstellen, und wie Sie ein `StatementSet` verwenden, um mit den Tabellen zu interagieren und Daten in einen Kinesis-Ziel-Stream zu schreiben:

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## Lesen Sie die Laufzeiteigenschaften
<a name="how-python-programming-properties"></a>

Sie können Laufzeiteigenschaften verwenden, um Ihre Anwendung zu konfigurieren, ohne Ihren Anwendungscode zu ändern.

Sie geben Anwendungseigenschaften für Ihre Anwendung auf die gleiche Weise an wie bei einer Java-Anwendung, die Managed Service für Apache Flink nutzt. Sie können Laufzeiteigenschaften auf folgende Weise angeben:
+ Die [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)Aktion verwenden.
+ Die [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion verwenden.
+ Konfigurieren Ihrer Anwendung mit Hilfe der Konsole.

Sie rufen Anwendungseigenschaften im Code ab, indem Sie eine JSON-Datei mit dem Namen `application_properties.json` auslesen, die von der Laufzeit von Managed Service for Apache Flink erstellt wird.

Das folgende Codebeispiel zeigt das Lesen von Anwendungseigenschaften aus der Datei `application_properties.json`:

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

Das folgende Beispiel für einen benutzerdefinierten Funktionscode demonstriert das Lesen einer Eigenschaftsgruppe aus dem Anwendungseigenschaftenobjekt: ruft ab:

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

Das folgende Codebeispiel zeigt das Lesen einer Eigenschaft namens INPUT\$1STREAM\$1KEY aus einer Eigenschaftsgruppe, die im vorherigen Beispiel zurückgegeben wurde:

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## Erstellen Sie das Codepaket Ihrer Anwendung
<a name="how-python-programming-package"></a>

Sobald Sie Ihre Python-Anwendung erstellt haben, bündeln Sie Ihre Codedatei und Abhängigkeiten in einer ZIP-Datei.

Ihre ZIP-Datei muss ein Python-Skript mit einer `main`-Methode enthalten und kann optional Folgendes enthalten:
+ Zusätzliche Python-Codedateien
+ Benutzerdefinierter Java-Code in JAR-Dateien
+ Java-Bibliotheken in JAR-Dateien

**Anmerkung**  
Ihre Anwendungs-ZIP-Datei muss alle Abhängigkeiten für Ihre Anwendung enthalten. Sie können für Ihre Anwendung nicht auf Bibliotheken aus anderen Quellen verweisen.

# Erstellen Sie Ihre Python-Anwendung Managed Service für Apache Flink
<a name="how-python-creating"></a>

## Geben Sie Ihre Codedateien an
<a name="how-python-creating-code"></a>

Sobald Sie das Codepaket für Ihre Anwendung erstellt haben, laden Sie es in einen Amazon-S3-Bucket hoch. Anschließend erstellen Sie Ihre Anwendung entweder mit der Konsole oder der [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)Aktion.

Wenn Sie Ihre Anwendung mithilfe der [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)Aktion erstellen, geben Sie die Codedateien und Archive in Ihrer ZIP-Datei mithilfe einer speziellen Anwendungseigenschaftengruppe namens an`kinesis.analytics.flink.run.options`. Sie können die folgenden Dateitypen definieren:
+ **python**: Eine Textdatei, die eine Python-main-Methode enthält.
+ **jarfile**: Eine Java-JAR-Datei, die benutzerdefinierte Java-Funktionen enthält.
+ **pyFiles**: Eine Python-Ressourcendatei, die Ressourcen enthält, die von der Anwendung verwendet werden sollen.
+ **pyArchives**: Eine ZIP-Datei mit Ressourcendateien für die Anwendung.

Weitere Informationen zu Apache Flink-Python-Codedateitypen finden Sie unter [Command-Line Interface in der Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/).

**Anmerkung**  
Managed Service für Apache Flink unterstützt nicht die Dateitypen `pyModule`, `pyExecutable` oder `pyRequirements`. Der gesamte Code, die Anforderungen und Abhängigkeiten müssen sich in Ihrer ZIP-Datei befinden. Sie können keine Abhängigkeiten angeben, die mit pip installiert werden sollen. 

Der folgende JSON-Beispielausschnitt zeigt, wie Sie Dateispeicherorte in der ZIP-Datei Ihrer Anwendung angeben:

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Überwachen Sie Ihre Python-Anwendung Managed Service für Apache Flink
<a name="how-python-monitoring"></a>

Sie verwenden das CloudWatch Protokoll Ihrer Anwendung, um Ihre Python-Anwendung Managed Service for Apache Flink zu überwachen.

Managed Service für Apache Flink protokolliert die folgenden Meldungen für Python-Anwendungen:
+ Nachrichten, die mithilfe von `print()` in der `main`-Methode der Anwendung in die Konsole geschrieben wurden.
+ Nachrichten, die mithilfe des `logging`-Pakets in benutzerdefinierten Funktionen gesendet werden. Das folgende Codebeispiel zeigt, wie eine benutzerdefinierte Funktion in das Anwendungsprotokoll schreibt:

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ Von der Anwendung ausgelöste Fehlermeldungen.

  Wenn die Anwendung in der `main`-Funktion eine Ausnahme auslöst, wird diese in den Protokollen Ihrer Anwendung angezeigt.

  Das folgende Beispiel zeigt einen Protokolleintrag für eine Ausnahme, die aus dem Python-Code ausgelöst wurde:

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**Anmerkung**  
Aufgrund von Leistungsproblemen empfehlen wir, während der Anwendungsentwicklung nur benutzerdefinierte Protokollnachrichten zu verwenden. 

## Logs mit CloudWatch Insights abfragen
<a name="how-python-monitoring-insights"></a>

Die folgende CloudWatch Insights-Abfrage sucht nach Protokollen, die vom Python-Einstiegspunkt erstellt wurden, während die Hauptfunktion Ihrer Anwendung ausgeführt wird:

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Verwenden Sie Laufzeiteigenschaften in Managed Service für Apache Flink
<a name="how-properties"></a>

Sie können *Laufzeiteigenschaften* verwenden, um Ihre Anwendung zu konfigurieren, ohne Ihren Anwendungscode neu zu kompilieren. 

**Topics**
+ [Verwalten Sie Runtime-Eigenschaften mithilfe der Konsole](#how-properties-console)
+ [Laufzeiteigenschaften mit der CLI verwalten](#how-properties-cli)
+ [Greifen Sie auf Laufzeiteigenschaften in einer Managed Service for Apache Flink-Anwendung zu](#how-properties-access)

## Verwalten Sie Runtime-Eigenschaften mithilfe der Konsole
<a name="how-properties-console"></a>

Sie können Runtime-Eigenschaften zu Ihrer Managed Service for Apache Flink-Anwendung hinzufügen, aktualisieren oder entfernen, indem Sie AWS-Managementkonsole

**Anmerkung**  
Wenn Sie eine frühere unterstützte Version von Apache Flink verwenden und Ihre vorhandenen Anwendungen auf Apache Flink 1.19.1 aktualisieren möchten, können Sie dazu direkte Apache Flink-Versionsupgrades verwenden. Mit direkten Versionsupgrades behalten Sie die Rückverfolgbarkeit von Anwendungen anhand eines einzigen ARN für alle Apache Flink-Versionen, einschließlich Snapshots, Logs, Metriken, Tags, Flink-Konfigurationen und mehr. Sie können diese Funktion in jedem beliebigen Bundesstaat verwenden. `RUNNING` `READY` Weitere Informationen finden Sie unter [Verwenden Sie direkte Versionsupgrades für Apache Flink](how-in-place-version-upgrades.md).

**Aktualisieren von Laufzeiteigenschaften für eine Anwendung, die Managed Service für Apache Flink nutzt**

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon MSF-Konsole unter https://console.aws.amazon.com /flink.

1. Wählen Sie Ihre Anwendung, die Managed Service für Apache Flink nutzt. Wählen Sie **Anwendungsdetails** aus.

1. Wählen Sie auf der Seite für Ihre Anwendung **Konfigurieren** aus.

1. Erweitern Sie den Bereich **Eigenschaften**.

1. Verwenden Sie die Steuerelemente im Abschnitt **Eigenschaften**, um eine Eigenschaftsgruppe mit Schlüssel-Wert-Paaren zu definieren. Verwenden Sie diese Steuerelemente, um Eigenschaftsgruppen und Laufzeiteigenschaften hinzuzufügen, zu aktualisieren oder zu entfernen.

1. Wählen Sie **Aktualisieren** aus.

## Laufzeiteigenschaften mit der CLI verwalten
<a name="how-properties-cli"></a>

Sie können Laufzeiteigenschaften mit der [AWS CLI](https://docs.aws.amazon.com/cli) hinzufügen, aktualisieren oder entfernen. 

Dieser Abschnitt enthält Beispielanfragen für API-Aktionen zur Konfiguration von Laufzeiteigenschaften für eine Anwendung. Weitere Informationen zur Verwendung einer JSON-Datei für die Eingabe einer API-Aktion finden Sie unter [Beispielcode für Managed Service für Apache Flink API](api-examples.md).

**Anmerkung**  
Ersetzen Sie die beispielhafte Konto-ID (*`012345678901`*) im folgenden Beispiel durch Ihre tatsächliche Konto-ID.

### Fügen Sie Runtime-Eigenschaften hinzu, wenn Sie eine Anwendung erstellen
<a name="how-properties-create"></a>

Die folgende Beispielanfrage für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)-Aktion fügt zwei Laufzeiteigenschaftsgruppen (`ProducerConfigProperties` und `ConsumerConfigProperties`) hinzu, wenn Sie eine Anwendung erstellen:

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### Fügen Sie Runtime-Eigenschaften in einer vorhandenen Anwendung hinzu und aktualisieren Sie sie
<a name="how-properties-update"></a>

Mit der folgenden Beispielanfrage für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion werden Laufzeiteigenschaften für eine bestehende Anwendung hinzugefügt oder aktualisiert:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**Anmerkung**  
Wenn Sie in einer Eigenschaftsgruppe einen Schlüssel verwenden, für den keine entsprechende Laufzeiteigenschaft vorhanden ist, fügt Managed Service für Apache Flink das Schlüssel-Wert-Paar als neue Eigenschaft hinzu. Wenn Sie einen Schlüssel für eine vorhandene Laufzeiteigenschaft in einer Eigenschaftsgruppe verwenden, aktualisiert Managed Service für Apache Flink den Eigenschaftswert. 

### Laufzeiteigenschaften entfernen
<a name="how-properties-remove"></a>

Mit der folgenden Beispielanfrage für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion werden alle Laufzeiteigenschaften und Eigenschaftsgruppen aus einer bestehenden Anwendung entfernt:

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**Wichtig**  
Wenn Sie eine vorhandene Eigenschaftsgruppe oder einen vorhandenen Eigenschaftsschlüssel in einer Eigenschaftsgruppe weglassen, wird diese Eigenschaftsgruppe oder Eigenschaft entfernt.

## Greifen Sie auf Laufzeiteigenschaften in einer Managed Service for Apache Flink-Anwendung zu
<a name="how-properties-access"></a>

Sie rufen Laufzeiteigenschaften in Ihrem Java-Anwendungscode mithilfe der statischen `KinesisAnalyticsRuntime.getApplicationProperties()`-Methode ab, die ein `Map<String, Properties>`-Objekt zurückgibt.

Im folgenden Java-Codebeispiel werden Laufzeiteigenschaften für Ihre Anwendung abgerufen:

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

Sie rufen eine Eigenschaftsgruppe (als `Java.Util.Properties`-Objekt) wie folgt ab:

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

Normalerweise konfigurieren Sie eine Apache-Flink-Quelle oder -Senke, indem Sie das `Properties`-Objekt übergeben, ohne die einzelnen Eigenschaften abrufen zu müssen. Das folgende Codebeispiel zeigt, wie Sie eine Flink-Quelle erstellen, indem Sie ein `Properties`-Objekt übergeben, das aus Laufzeiteigenschaften abgerufen wurde:

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

Codebeispiele finden Sie unter [Beispiele für die Erstellung und Arbeit mit Managed Service für Apache Flink-Anwendungen](examples-collapsibles.md).

# Verwenden Sie Apache Flink-Konnektoren mit Managed Service für Apache Flink
<a name="how-flink-connectors"></a>

Apache Flink-Konnektoren sind Softwarekomponenten, die Daten in und aus einer Amazon Managed Service für Apache Flink-Anwendung übertragen. Konnektoren sind flexible Integrationen, mit denen Sie aus Dateien und Verzeichnissen lesen können. Konnektoren bestehen aus kompletten Modulen für die Interaktion mit Amazon-Services und Systemen von Drittanbietern.

Zu den Konnektoren gehören die folgenden:
+ **Quellen:** Stellen Sie Ihrer Anwendung Daten aus einem Kinesis-Datenstream, einer Datei, einem Apache Kafka-Thema, einer Datei oder anderen Datenquellen bereit.
+ **Senken:** Senden Sie Daten aus Ihrer Anwendung an einen Kinesis-Datenstream, Firehose-Stream, Apache Kafka-Thema oder andere Datenziele.
+ **Asynchrone I/O: Ermöglicht** asynchronen Zugriff auf eine Datenquelle, z. B. eine Datenbank, um Streams anzureichern. 

Apache Flink-Konnektoren werden in ihren eigenen Quell-Repositorys gespeichert. Die Version und das Artefakt für Apache Flink-Konnektoren ändern sich je nach der von Ihnen verwendeten Apache Flink-Version und davon, ob Sie die Table- oder die DataStream SQL-API verwenden. 

Amazon Managed Service für Apache Flink unterstützt über 40 vorgefertigte Apache Flink-Quell- und Senken-Konnektoren. Die folgende Tabelle enthält eine Zusammenfassung der beliebtesten Konnektoren und der zugehörigen Versionen. Sie können auch benutzerdefinierte Senken mithilfe des Async-Sink-Frameworks erstellen. Weitere Informationen finden Sie unter [The Generic Asynchronous Base Sink](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/) in der Apache Flink-Dokumentation.

 Informationen zum Zugriff auf das Repository für Apache AWS Flink-Konnektoren finden Sie unter. [flink-connector-aws](https://github.com/apache/flink-connector-aws)

## Anschlüsse für Flink 2.2
<a name="connectors-flink-2-2"></a>

Wenn Sie auf Flink 2.2 aktualisieren, müssen Sie Ihre Connector-Abhängigkeiten auf Versionen aktualisieren, die mit der Flink 2.x-Laufzeit kompatibel sind. Flink-Konnektoren werden unabhängig von der Flink-Runtime veröffentlicht, und es gibt noch nicht für alle Konnektoren eine Flink 2.x-kompatible Version. Die folgende Tabelle fasst die Verfügbarkeit häufig verwendeter Konnektoren in Amazon Managed Service für Apache Flink zum jetzigen Zeitpunkt zusammen:


**Anschlüsse für Flink 2.2**  

| Konnektor | Flink 2.0\$1 Version | Hinweise | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Empfohlen für Flink 2.2 | 
| Kinesis Data Streams (Quelle) | flink-connector-aws-kinesis-Streams 6.0.0-2.0 | Empfohlen für Flink 2.2 | 
| Kinesis Data Streams (Senke) | flink-connector-aws-kinesis-Streams 6.0.0-2.0 | Empfohlen für Flink 2.2 | 
| FileSystem (S3, HDFS) | Im Paket mit Flink | In die Flink-Distribution integriert — immer verfügbar | 
| JDBC | Noch nicht für 2.x veröffentlicht | Keine Flink 2.x-kompatible Version verfügbar | 
| OpenSearch | Noch nicht für 2.x veröffentlicht | Keine Flink 2.x-kompatible Version verfügbar | 
| Elasticsearch | Noch nicht für 2.x veröffentlicht | Erwägen Sie eine Migration zum Connector OpenSearch  | 
| Amazon Managed Service für Prometheus | Noch nicht für 2.x veröffentlicht | Zum Zeitpunkt der Erstellung dieses Artikels gab es keine mit Flink 2.x kompatible Version | 

Wenn Ihre Anwendung von einem Connector abhängt, für den es noch keine Flink 2.2-Version gibt, haben Sie zwei Möglichkeiten: Warten Sie, bis der Connector eine kompatible Version veröffentlicht, oder prüfen Sie, ob Sie ihn durch eine Alternative ersetzen können (z. B. mithilfe des JDBC-Katalogs oder einer benutzerdefinierten Senke).

**Bekannte Probleme**
+ Anwendungen, die den in Connector v5.0.0 und v6.0.0 eingeführten Pfad `KinesisStreamsSource` with EFO (Enhanced Fan-Out/ SubscribeToShard) verwenden, können fehlschlagen, wenn Kinesis-Streams reshardiert werden. Dies ist ein bekanntes Problem in der Community. Weitere Informationen finden Sie unter [FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648).
+ Bei Anwendungen, die den in den Konnektoren v5.0.0 und v6.0.0 eingeführten Pfad `KinesisStreamsSource` with EFO (Enhanced Fan-Out/ SubscribeToShard) zusammen verwenden, `KinesisStreamsSink` kann es zu Deadlocks kommen, wenn die Flink-Anwendung unter Gegendruck steht, was zu einem vollständigen Stopp der Datenverarbeitung in einer oder mehreren führt. TaskManagers Ein erzwungener Stop-Vorgang und ein Startvorgang für die App sind erforderlich, um die App wiederherzustellen. Dies ist ein Unterfall des in der Community bekannten Problems: [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071).

## Anschlüsse für ältere Flink-Versionen
<a name="connectors-older-versions"></a>


**Anschlüsse für ältere Flink-Versionen**  

| Konnektor | Flink Version 1.15 | Flink versie 1.18 | Flink-Versionen 1.19 | Flink-Versionen 1.20 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream — Quell DataStream - und Tabellen-API | flink-connector-kinesis, 1.15.4 | flink-connector-kinesis, 4,3,0-1,18 | flink-connector-kinesis, 5,0,0-1,19 | flink-connector-kinesis, 5,0,0-1,20 | 
| Kinesis Data Stream — Sink DataStream - und Tabellen-API | flink-connector-aws-kinesis-Streams, 1.15.4 | flink-connector-aws-kinesis-Streams, 4.3.0-1.18 | flink-connector-aws-kinesis-Streams, 5.0.0-1.19 | flink-connector-aws-kinesis-Streams, 5.0.0-1.20 | 
| Kinesis Data Streams - Source/Sink - SQL | flink-sql-connector-kinesis, 1.15.4 | flink-sql-connector-kinesis, 4,3,0-1,18 | flink-sql-connector-kinesis, 5,0,0-1,19 | flink-sql-connector-kinesis-Streams, 5.0.0-1.20 | 
| Kafka — und Tabellen-API DataStream  | flink-connector-kafka, 1.15.4 | flink-connector-kafka, 3,2,0-1,18 | flink-connector-kafka, 3,3,0-1,19 | flink-connector-kafka, 3,3,0-1,20 | 
| Kafka - SQL | flink-sql-connector-kafka, 1.15.4 | flink-sql-connector-kafka, 3,2,0-1,18 | flink-sql-connector-kafka, 3,3,0-1,19 | flink-sql-connector-kafka, 3,3,0-1,20 | 
| Firehose DataStream - und Tabellen-API | flink-connector-aws-kinesis-Firehose, 1.15.4 | flink-connector-aws-firehose, 4.3.0-1.18 | flink-connector-aws-firehose, 5,0,0-1,19 | flink-connector-aws-firehose, 5,0,0-1,20 | 
| Firehose - SQL | flink-sql-connector-aws-Kinesis-Firehose, 1.15.4 | flink-sql-connector-aws-Feuerwehrschlauch, 4.3.0-1.18 | flink-sql-connector-aws-Feuerwehrschlauch, 5.0.0-1.19 | flink-sql-connector-aws-Feuerwehrschlauch, 5.0.0-1.20 | 
| DynamoDB - DataStream und Tabellen-API | flink-connector-dynamodb, 3.0.0-1.15 | flink-connector-dynamodb, 4,3,0-1,18 | flink-connector-dynamodb, 5,0,0-1,19 | flink-connector-dynamodb, 5,0,0-1,20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb, 3.0.0-1.15 | flink-sql-connector-dynamodb, 4,3,0-1,18 | flink-sql-connector-dynamodb, 5,0,0-1,19 | flink-sql-connector-dynamodb, 5,0,0-1,20 | 
| OpenSearch - und Tabellen-API DataStream  | - | flink-connector-opensearch, 1.2.0-1.18 | flink-connector-opensearch, 1,2.0-1,19 | flink-connector-opensearch, 1,2.0-1,19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-sql-connector-opensearch, 1,2.0-1,19 | flink-sql-connector-opensearch, 1,2.0-1,19 | 
| Amazon Managed Service für Prometheus DataStream | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-prometheus, 1.0.0-1,19 | flink-connector-prometheus, 1.0.0-1,20 | 
| Amazon SQS DataStream und Tabellen-API | - | flink-sql-connector-opensearch, 1.2.0-1.18 | flink-connector-sqs, 5,0,0-1,19 | flink-connector-sqs, 5,0,0-1,20 | 

Weitere Informationen zu Konnektoren in Amazon Managed Service für Apache Flink finden Sie unter:
+ [DataStream API-Konnektoren](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Tabellen-API-Konnektoren](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### Bekannte Probleme
<a name="connectors-known-issues"></a>

Es gibt ein bekanntes Open-Source-Apache Flink-Problem mit dem Apache Kafka-Konnektor in Apache Flink 1.15. Dieses Problem wurde in späteren Versionen von Apache Flink behoben. 

Weitere Informationen finden Sie unter [Bekannte Probleme](flink-1-15-2.md#flink-1-15-known-issues). 

# Implementieren Sie Fehlertoleranz in Managed Service für Apache Flink
<a name="how-fault"></a>

Prüfpunktprüfung ist die Methode, die zur Implementierung von Fehlertoleranz in Amazon Managed Service für Apache Flink verwendet wird. Ein *Checkpoint* ist ein up-to-date Backup einer laufenden Anwendung, das zur sofortigen Wiederherstellung nach einer unerwarteten Anwendungsunterbrechung oder einem Failover verwendet wird. 

Einzelheiten zum Checkpointing in Apache Flink-Anwendungen finden Sie unter [Checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/) in der Apache Flink-Dokumentation.

Ein *Snapshot* ist ein manuell erstelltes und verwaltetes Backup des Anwendungsstatus. Mit Snapshots können Sie Ihre Anwendung durch einen Aufruf von [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)in einen früheren Zustand zurückversetzen. Weitere Informationen finden Sie unter [Anwendungs-Backups mithilfe von Snapshots verwalten](how-snapshots.md).

Wenn Prüfpunktprüfung für Ihre Anwendung aktiviert ist, bietet der Dienst Fehlertoleranz, indem er bei unerwarteten Anwendungsneustarts Backups der Anwendungsdaten erstellt und lädt. Diese unerwarteten Anwendungsneustarts können durch unerwartete Jobneustarts, Instancefehler usw. verursacht werden. Dadurch erhält die Anwendung dieselbe Semantik wie eine fehlerfreie Ausführung bei diesen Neustarts. 

Wenn Snapshots für die Anwendung aktiviert und mithilfe der Snapshots der Anwendung konfiguriert sind, bietet der Dienst bei [ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)Anwendungsupdates oder während der dienstbezogenen Skalierung oder Wartung die Semantik für die Verarbeitung genau einmal.

## Konfigurieren Sie Checkpointing in Managed Service für Apache Flink
<a name="how-fault-configure"></a>

Sie können das Prüfpunktprüfungs-Verhalten Ihrer Anwendung konfigurieren. Sie können festlegen, ob sie den Prüfpunktprüfungs-Status beibehält, wie oft sie ihren Status an Prüfpunkten speichert und das Mindestintervall zwischen dem Ende einer Prüfpunkt-Operation und dem Beginn einer anderen.

Sie konfigurieren die folgenden Einstellungen mithilfe der [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)- oder [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-API-Operationen:
+ `CheckpointingEnabled` – Gibt an, ob Prüfpunktprüfung in der Anwendung aktiviert ist.
+ `CheckpointInterval` – Enthält die Zeit in Millisekunden zwischen Prüfpunkt-Vorgängen (Persistenzoperationen).
+ `ConfigurationType` – Setzen Sie diesen Wert auf `DEFAULT`, um das standardmäßige Prüfpunktprüf-Verhalten zu verwenden. Setzen Sie diesen Wert auf `CUSTOM`, um andere Werte zu konfigurieren.
**Anmerkung**  
Das Standardverhalten von Prüfpunkten ist wie folgt:  
**CheckpointingEnabled: wahr**
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints: 500**
Wenn auf gesetzt **ConfigurationType**ist`DEFAULT`, werden die vorherigen Werte verwendet, auch wenn sie entweder mithilfe von oder durch Setzen der AWS Command Line Interface Werte im Anwendungscode auf andere Werte gesetzt wurden.
**Anmerkung**  
Ab Flink 1.15 verwendet Managed Service für Apache Flink `stop-with-savepoint` während der automatischen Snapshot-Erstellung, d. h. beim Aktualisieren, Skalieren oder Stoppen von Anwendungen. 
+ `MinPauseBetweenCheckpoints` – Die Mindestzeit in Millisekunden zwischen dem Ende einer Prüfpunkt-Operation und dem Beginn einer anderen. Wenn dieser Wert festgelegt ist, verhindert dies, dass die Anwendung fortlaufende Prüfpunktprüfung durchführt, wenn eine Prüfpunkt-Operation länger dauert als das `CheckpointInterval`.

## Sehen Sie sich die Beispiele für Checkpoint-APIs an
<a name="how-fault-examples"></a>

Dieser Abschnitt enthält Beispielanfragen für API-Aktionen zur Konfiguration von Prüfpunktprüfung für eine Anwendung. Weitere Informationen zur Verwendung einer JSON-Datei als Eingabe für API-Aktionen finden Sie unter [Beispielcode für Managed Service für Apache Flink API](api-examples.md).

### Konfigurieren Sie Checkpointing für eine neue Anwendung
<a name="how-fault-examples-create-config"></a>

In der folgenden Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)-Aktion wird Prüfpunktprüfung konfiguriert, wenn Sie eine Anwendung erstellen:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### Deaktivieren Sie Checkpointing für eine neue Anwendung
<a name="how-fault-examples-create-disable"></a>

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)-Aktion deaktiviert Prüfpunktprüfung, wenn Sie eine Anwendung erstellen:

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### Konfigurieren Sie Checkpointing für eine bestehende Anwendung
<a name="how-fault-examples-update-config"></a>

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion konfiguriert Prüfpunktprüfung für eine bestehende Anwendung:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### Deaktivieren Sie Checkpointing für eine bestehende Anwendung
<a name="how-fault-examples-update-update-disable"></a>

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion deaktiviert Prüfpunktprüfung für eine bestehende Anwendung:

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# Anwendungs-Backups mithilfe von Snapshots verwalten
<a name="how-snapshots"></a>

Ein *Snapshot* ist die Managed Service für Apache Flink-Implementierung eines Apache Flink *Savepoint*. Ein Snapshot ist ein vom Benutzer oder Service ausgelöstes, erstelltes und verwaltetes Backup des Anwendungsstatus. Informationen zu Apache Flink Savepoints finden Sie unter Savepoints in der Apache [Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/). Mithilfe von Snapshots können Sie eine Anwendung von einem bestimmten Snapshot des Anwendungsstatus aus neu starten.

**Anmerkung**  
Wir empfehlen, dass Ihre Anwendung mehrmals täglich einen Snapshot erstellt, um einen ordnungsgemäßen Neustart mit den korrekten Statusdaten zu gewährleisten. Die richtige Häufigkeit für Ihre Snapshots hängt von der Geschäftslogik Ihrer Anwendung ab. Durch häufiges Erstellen von Snapshots können Sie neuere Daten wiederherstellen, was jedoch die Kosten erhöht und mehr Systemressourcen beansprucht.

In Managed Service für Apache Flink verwalten Sie Snapshots mit den folgenden API-Aktionen:
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

Informationen zur Beschränkung der Anzahl von Snapshots pro Anwendung finden Sie unter [Managed Service für Apache Flink und Studio Notebook-Kontingent](limits.md). Wenn Ihre Anwendung das Limit für Snapshots erreicht, schlägt das manuelle Erstellen eines Snapshots mit einer `LimitExceededException` fehl. 

Managed Service für Apache Flink löscht Snapshots niemals. Sie müssen die Snapshots mit der Aktion [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) manuell löschen.

Um beim Starten einer Anwendung einen gespeicherten Snapshot des Anwendungsstatus zu laden, verwenden Sie den [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)-Parameter der [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)- oder [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion.

**Topics**
+ [Verwalten Sie die automatische Erstellung von Snapshots](#how-fault-snapshot-update)
+ [Wiederherstellung aus einem Snapshot, der inkompatible Statusdaten enthält](#how-fault-snapshot-restore)
+ [Sehen Sie sich die Snapshot-API-Beispiele](#how-fault-snapshot-examples)

## Verwalten Sie die automatische Erstellung von Snapshots
<a name="how-fault-snapshot-update"></a>

Wenn `true` in der [ ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)für die Anwendung auf gesetzt `SnapshotsEnabled` ist, erstellt und verwendet Managed Service for Apache Flink automatisch Snapshots, wenn die Anwendung aktualisiert, skaliert oder gestoppt wird, um eine Semantik für die Verarbeitung exakt einmal bereitzustellen.

**Anmerkung**  
Die Einstellung von `ApplicationSnapshotConfiguration::SnapshotsEnabled` auf `false` führt bei Anwendungsupdates zu Datenverlust.

**Anmerkung**  
Managed Service für Apache Flink löst Zwischen-Savepoints während der Snapshoterstellung aus. Bei Flink Version 1.15 oder höher haben Zwischen-Savepoints keine Nebenwirkungen mehr. [Siehe Savepoints auslösen.](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)

Automatisch erstellte Snapshots haben die folgenden Eigenschaften:
+ Der Snapshot wird vom Service verwaltet, aber Sie können den Snapshot mithilfe der Aktion sehen. [ ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html) Automatisch erstellte Snapshots werden auf Ihr Snapshot-Limit angerechnet.
+ Wenn Ihre Anwendung das Snapshot-Limit überschreitet, schlagen manuell erstellte Snapshots fehl, aber der Managed Service für Apache Flink-Service erstellt weiterhin erfolgreich Snapshots, wenn die Anwendung aktualisiert, skaliert oder gestoppt wird. Sie müssen Snapshots mithilfe der [ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)Aktion manuell löschen, bevor Sie weitere Snapshots manuell erstellen können.

## Wiederherstellung aus einem Snapshot, der inkompatible Statusdaten enthält
<a name="how-fault-snapshot-restore"></a>

Da Snapshots Informationen über Operatoren enthalten, kann das Wiederherstellen von Zustandsdaten aus einem Snapshot für einen Operator, der sich seit der vorherigen Anwendungsversion geändert hat, zu unerwarteten Ergebnissen führen. Eine Anwendung schlägt fehl, wenn sie versucht, Zustandsdaten aus einem Snapshot wiederherzustellen, der nicht dem aktuellen Operator entspricht. Die fehlerhafte Anwendung bleibt entweder im `UPDATING`- oder `STOPPING`-Status hängen. 

Um einer Anwendung die Wiederherstellung aus einem Snapshot zu ermöglichen, der inkompatible Statusdaten enthält, setzen Sie `true` mithilfe der [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion den `AllowNonRestoredState` Parameter [FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)auf.

Wenn eine Anwendung aus einem veralteten Snapshot wiederhergestellt wird, tritt das folgende Verhalten auf:
+ **Operator hinzugefügt:** Wenn ein neuer Operator hinzugefügt wird, hat der Savepoint keine Statusdaten für den neuen Operator. Es tritt kein Fehler auf und es ist nicht nötig, `AllowNonRestoredState` festzulegen.
+ **Operator gelöscht:** Wenn ein vorhandener Operator gelöscht wird, enthält der Savepoint Statusdaten für den fehlenden Operator. Es tritt ein Fehler auf, sofern `AllowNonRestoredState` nicht auf `true` festgelegt ist.
+ **Operator geändert:** Wenn kompatible Änderungen vorgenommen werden, z. B. wenn der Typ eines Parameters in einen kompatiblen Typ geändert wird, kann die Anwendung die Daten aus dem veralteten Snapshot wiederherstellen. Weitere Informationen zur Wiederherstellung aus Snapshots finden Sie unter [Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/) in der Apache Flink-Dokumentation. Eine Anwendung, die Apache Flink Version 1.8 oder höher verwendet, kann möglicherweise aus einem Snapshot mit einem anderen Schema wiederhergestellt werden. Eine Anwendung, die Apache Flink Version 1.6 verwendet, kann nicht wiederhergestellt werden. Für two-phase-commit Datenspeicher empfehlen wir die Verwendung eines System-Snapshots (SWs) anstelle eines vom Benutzer erstellten Snapshots (). CreateApplicationSnapshot

  Für Flink löst Managed Service für Apache Flink während der Snapshot-Erstellung Zwischen-Savepoints aus. Ab Flink 1.15 haben Zwischen-Savepoints keine Nebenwirkungen mehr. Siehe [Savepoints auslösen](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints).

Wenn Sie eine Anwendung fortsetzen müssen, die mit vorhandenen Savepoint-Daten nicht kompatibel ist, empfehlen wir, die Wiederherstellung aus dem Snapshot zu überspringen, indem Sie den `ApplicationRestoreType` Aktionsparameter auf setzen. [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)`SKIP_RESTORE_FROM_SNAPSHOT`

Weitere Informationen darüber, wie Apache Flink mit inkompatiblen Statusdaten umgeht, finden Sie unter [State Schema Evolution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/) in der *Apache Flink-Dokumentation*.

## Sehen Sie sich die Snapshot-API-Beispiele
<a name="how-fault-snapshot-examples"></a>

Dieser Abschnitt enthält Beispielanfragen für API-Aktionen zur Verwendung von Snapshots mit einer Anwendung. Weitere Informationen zur Verwendung einer JSON-Datei als Eingabe für API-Aktionen finden Sie unter [Beispielcode für Managed Service für Apache Flink API](api-examples.md).

### Aktivieren Sie Snapshots für eine Anwendung
<a name="how-fault-savepoint-examples-enable"></a>

In der folgenden Beispiel-Anfrage für die Aktion [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) werden Tags für eine Anwendung aktiviert:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### Snapshot erstellen
<a name="how-fault-savepoint-examples-create"></a>

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html)-Aktion erstellt einen Snapshot des aktuellen Anwendungsstatus:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Listet Snapshots für eine Anwendung auf
<a name="how-fault-snapshot-examples-list"></a>

In der folgenden Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html)-Aktion werden die ersten 50 Snapshots für den aktuellen Anwendungsstatus aufgeführt:

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### Listet Details für einen Anwendungs-Snapshot auf
<a name="how-fault-snapshot-examples-describe"></a>

In der folgenden Beispiel-Anfrage für die Aktion [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) werden Details für eines bestimmten Anwendungssnapshot aufgelistet:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### Löschen eines Snapshots
<a name="how-fault-snapshot-examples-delete"></a>

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)-Aktion löscht einen zuvor gespeicherten Snapshot. Sie können den `SnapshotCreationTimestamp`-Wert entweder mit [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html) oder [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) abrufen:

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### Starten Sie eine Anwendung mithilfe eines benannten Snapshots neu
<a name="how-fault-snapshot-examples-load-custom"></a>

Mit der folgenden Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)-Aktion wird die Anwendung mit dem gespeicherten Status eines bestimmten Snapshots gestartet:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### Starten Sie eine Anwendung mit dem neuesten Snapshot neu
<a name="how-fault-snapshot-examples-load-recent"></a>

Mit der folgenden Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)-Aktion wird die Anwendung mit dem neuesten Snapshot gestartet:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### Starten Sie eine Anwendung neu, die keinen Snapshot verwendet
<a name="how-fault-snapshot-examples-load-none"></a>

Mit der folgenden Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)-Aktion wird die Anwendung gestartet, ohne den Anwendungsstatus zu laden, auch wenn ein Snapshot vorhanden ist:

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Verwenden Sie direkte Versionsupgrades für Apache Flink
<a name="how-in-place-version-upgrades"></a>

Mit direkten Versionsupgrades für Apache Flink behalten Sie die Rückverfolgbarkeit von Anwendungen anhand eines einzigen ARN für alle Apache Flink-Versionen aufrecht. Dazu gehören Snapshots, Protokolle, Metriken, Tags, Flink-Konfigurationen, Erhöhungen von Ressourcenlimits und mehr. VPCs 

Sie können direkte Versions-Upgrades für Apache Flink durchführen, um bestehende Anwendungen auf eine neue Flink-Version in Amazon Managed Service for Apache Flink zu aktualisieren. Um diese Aufgabe auszuführen, können Sie das AWS CLI AWS CloudFormation, AWS SDK oder das verwenden. AWS-Managementkonsole

**Anmerkung**  
Sie können keine direkten Versionsupgrades für Apache Flink mit Amazon Managed Service für Apache Flink Studio verwenden.

**Topics**
+ [Aktualisieren Sie Anwendungen mithilfe von direkten Versionsupgrades für Apache Flink](upgrading-applications.md)
+ [Aktualisieren Sie Ihre Anwendung auf eine neue Apache Flink-Version](upgrading-application-new-version.md)
+ [Machen Sie Anwendungs-Upgrades rückgängig](rollback.md)
+ [Allgemeine bewährte Methoden und Empfehlungen für Anwendungs-Upgrades](best-practices-recommendations.md)
+ [Vorsichtsmaßnahmen und bekannte Probleme bei Anwendungsupgrades](precautions.md)
+ [Upgrade auf Flink 2.2: Vollständige Anleitung](flink-2-2-upgrade-guide.md)
+ [Leitfaden zur staatlichen Kompatibilität für Flink 2.2-Upgrades](state-compatibility.md)

# Aktualisieren Sie Anwendungen mithilfe von direkten Versionsupgrades für Apache Flink
<a name="upgrading-applications"></a>

Bevor Sie beginnen, empfehlen wir Ihnen, sich dieses Video anzusehen: [Direkte Versionsupgrades](https://www.youtube.com/watch?v=f1qGGdaP2XI).

Um direkte Versionsupgrades für Apache Flink durchzuführen, können Sie das AWS CLI, AWS CloudFormation, AWS SDK oder das verwenden. AWS-Managementkonsole Sie können diese Funktion mit allen vorhandenen Anwendungen verwenden, die Sie mit Managed Service for Apache Flink im Status oder verwenden. `READY` `RUNNING` Es verwendet die UpdateApplication API, um die Möglichkeit hinzuzufügen, die Flink-Laufzeit zu ändern.

## Vor dem Upgrade: Aktualisieren Sie Ihre Apache Flink-Anwendung
<a name="before-upgrading"></a>

Wenn Sie Ihre Flink-Anwendungen schreiben, bündeln Sie sie mit ihren Abhängigkeiten in einer Anwendungs-JAR und laden die JAR in Ihren Amazon S3 S3-Bucket hoch. Von dort aus führt Amazon Managed Service für Apache Flink den Job in der neuen Flink-Laufzeit aus, die Sie ausgewählt haben. Möglicherweise müssen Sie Ihre Anwendungen aktualisieren, um die Kompatibilität mit der Flink-Laufzeit zu erreichen, auf die Sie ein Upgrade durchführen möchten. Es kann Inkonsistenzen zwischen den Flink-Versionen geben, die dazu führen, dass das Versionsupgrade fehlschlägt. Am häufigsten wird dies mit Konnektoren für Quellen (Ingress) oder Destinationen (Sinks, Egress) und Scala-Abhängigkeiten geschehen. Flink 1.15 und spätere Versionen in Managed Service for Apache Flink sind Scala-unabhängig, und Ihr JAR muss die Version von Scala enthalten, die Sie verwenden möchten.

**Um Ihre Anwendung zu aktualisieren**

1. Lesen Sie die Ratschläge der Flink-Community zur Aktualisierung von Anwendungen mit State. Weitere Informationen finden Sie unter [Aktualisieren von Anwendungen und Flink-Versionen](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

1. Lesen Sie die Liste der bekannten Probleme und Einschränkungen. Siehe [Vorsichtsmaßnahmen und bekannte Probleme bei Anwendungsupgrades](precautions.md).

1. Aktualisieren Sie Ihre Abhängigkeiten und testen Sie Ihre Anwendungen lokal. Diese Abhängigkeiten sind in der Regel:

   1. Die Flink-Laufzeit und die API.

   1. Für die neue Flink-Laufzeit werden Konnektoren empfohlen. Sie finden diese unter [Release-Versionen](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html) für die spezifische Laufzeit, auf die Sie aktualisieren möchten.

   1. Scala — Apache Flink ist ab und einschließlich Flink 1.15 Scala-agnostisch. Sie müssen die Scala-Abhängigkeiten, die Sie verwenden möchten, in Ihre Anwendungs-JAR aufnehmen.

1. Erstellen Sie eine neue Anwendungs-JAR auf einer Zip-Datei und laden Sie sie auf Amazon S3 hoch. Wir empfehlen, dass Sie einen anderen Namen als die vorherige JAR-/Zip-Datei verwenden. Wenn Sie ein Rollback durchführen müssen, verwenden Sie diese Informationen.

1. Wenn Sie statusbehaftete Anwendungen ausführen, empfehlen wir Ihnen dringend, einen Snapshot Ihrer aktuellen Anwendung zu erstellen. Auf diese Weise können Sie statusabhängig ein Rollback durchführen, falls während oder nach dem Upgrade Probleme auftreten. 

# Aktualisieren Sie Ihre Anwendung auf eine neue Apache Flink-Version
<a name="upgrading-application-new-version"></a>

Sie können Ihre Flink-Anwendung aktualisieren, indem Sie die [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion verwenden.

Sie können die `UpdateApplication` API auf verschiedene Arten aufrufen:
+ Verwenden Sie den vorhandenen **Konfigurationsworkflow** auf dem AWS-Managementkonsole.
  + Gehen Sie zu Ihrer App-Seite auf der AWS-Managementkonsole.
  + Wählen Sie **Konfigurieren** aus.
  + Wählen Sie die neue Laufzeit und den Snapshot aus, von dem aus Sie beginnen möchten. Dies wird auch als Wiederherstellungskonfiguration bezeichnet. Verwenden Sie die neueste Einstellung als Wiederherstellungskonfiguration, um die App vom neuesten Snapshot aus zu starten. Zeigen Sie auf die neue aktualisierte Anwendung JAR/zip auf Amazon S3.
+ Verwenden Sie die Aktion „ AWS CLI [Anwendung aktualisieren](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)“.
+ Verwenden Sie CloudFormation (CFN).
  + Aktualisieren Sie das [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment)Feld. Zuvor wurde die Anwendung CloudFormation gelöscht und eine neue erstellt, wodurch Ihre Schnappschüsse und andere App-Historien verloren gingen. CloudFormation Aktualisiert jetzt Ihr RuntimeEnvironment vorhandenes Dokument und löscht Ihre Anwendung nicht. 
+ Verwenden Sie das AWS SDK.
  + Die Programmiersprache Ihrer Wahl finden Sie in der SDK-Dokumentation. Siehe [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html). 

Sie können das Upgrade durchführen, während sich die Anwendung im `RUNNING` Status befindet oder während die Anwendung im `READY` Status gestoppt ist. Amazon Managed Service für Apache Flink validiert, um die Kompatibilität zwischen der ursprünglichen Runtime-Version und der Ziel-Runtime-Version zu überprüfen. Diese Kompatibilitätsprüfung wird ausgeführt, wenn Sie diese durchführen, [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)während Sie sich im `RUNNING` Status befinden, oder beim nächsten Mal, [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)wenn Sie ein Upgrade durchführen, während Sie sich im `READY` Status befinden. 

## Führen Sie ein Upgrade für eine Anwendung im aktuellen `RUNNING` Status durch
<a name="upgrading-running"></a>

Das folgende Beispiel zeigt das Upgrade einer App im `RUNNING` Bundesstaat Flink 1.18 in US East (Nord-Virginia) mithilfe von AWS CLI und das Starten der aktualisierten App aus dem neuesten Snapshot. `UpgradeTest` 

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ Wenn Sie Service-Snapshots aktiviert haben und die Anwendung vom letzten Snapshot aus fortsetzen möchten, überprüft Amazon Managed Service für Apache Flink, ob die Laufzeit der aktuellen `RUNNING` Anwendung mit der ausgewählten Ziellaufzeit kompatibel ist.
+ Wenn Sie einen Snapshot angegeben haben, von dem aus die Ziellaufzeit fortgesetzt werden soll, überprüft Amazon Managed Service für Apache Flink, ob die Ziellaufzeit mit dem angegebenen Snapshot kompatibel ist. Schlägt die Kompatibilitätsprüfung fehl, wird Ihre Aktualisierungsanfrage abgelehnt und Ihre Anwendung bleibt unverändert. `RUNNING`
+ Wenn Sie Ihre Anwendung ohne Snapshot starten möchten, führt Amazon Managed Service für Apache Flink keine Kompatibilitätsprüfungen durch.
+ Wenn Ihre aktualisierte Anwendung fehlschlägt oder in einem transitiven `UPDATING` Zustand hängen bleibt, folgen Sie den Anweisungen im [Machen Sie Anwendungs-Upgrades rückgängig](rollback.md) Abschnitt, um zum fehlerfreien Zustand zurückzukehren. 

**Prozessablauf für die Ausführung von Statusanwendungen**

![\[Das folgende Diagramm stellt den empfohlenen Arbeitsablauf für das Upgrade der Anwendung während der Ausführung dar. Wir gehen davon aus, dass die Anwendung statusbehaftet ist und dass Sie Snapshots aktiviert haben. Für diesen Workflow stellen Sie bei der Aktualisierung die Anwendung aus dem neuesten Snapshot wieder her, der vor der Aktualisierung automatisch von Amazon Managed Service für Apache Flink erstellt wurde.\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/images/in-place-update-while-running.png)


## Führen Sie ein Upgrade einer Anwendung im Status **READY** durch
<a name="upgrading-ready"></a>

Das folgende Beispiel zeigt das Upgrade einer App im `READY` Bundesstaat `UpgradeTest` Flink 1.18 in USA Ost (Nord-Virginia) mithilfe von. AWS CLI Es gibt keinen angegebenen Snapshot zum Starten der App, da die Anwendung nicht ausgeführt wird. Sie können einen Snapshot angeben, wenn Sie die Anfrage zum Starten der Anwendung stellen.

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ Sie können die Laufzeit Ihrer Anwendungen im `READY` Status auf eine beliebige Flink-Version aktualisieren. Amazon Managed Service für Apache Flink führt keine Prüfungen durch, bis Sie Ihre Anwendung starten.
+  Amazon Managed Service für Apache Flink führt nur Kompatibilitätsprüfungen für den Snapshot durch, den Sie zum Starten der App ausgewählt haben. Dies sind grundlegende Kompatibilitätsprüfungen, die der [Flink-Kompatibilitätstabelle](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table) folgen. Sie überprüfen nur die Flink-Version, mit der der Snapshot erstellt wurde, und die Flink-Version, auf die Sie abzielen. Wenn die Flink-Laufzeit des ausgewählten Snapshots nicht mit der neuen Laufzeit der App kompatibel ist, wird die Startanfrage möglicherweise abgelehnt.

**Prozessablauf für Ready-State-Anwendungen**

![\[Das folgende Diagramm zeigt den empfohlenen Arbeitsablauf für das Upgrade der Anwendung im Bereitschaftszustand. Wir gehen davon aus, dass die Anwendung statusbehaftet ist und dass Sie Snapshots aktiviert haben. Für diesen Workflow stellen Sie bei der Aktualisierung die Anwendung aus dem letzten Snapshot wieder her, der automatisch von Amazon Managed Service für Apache Flink erstellt wurde, als die Anwendung gestoppt wurde.\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/images/in-place-update-while-ready.png)


# Machen Sie Anwendungs-Upgrades rückgängig
<a name="rollback"></a>

Wenn Sie Probleme mit Ihrer Anwendung haben oder Inkonsistenzen in Ihrem Anwendungscode zwischen den Flink-Versionen feststellen, können Sie ein Rollback mit dem AWS CLI, AWS CloudFormation, AWS SDK oder dem durchführen. AWS-Managementkonsole Die folgenden Beispiele zeigen, wie ein Rollback in verschiedenen Fehlerszenarien aussieht.

## Das Runtime-Upgrade war erfolgreich, die Anwendung befindet sich im `RUNNING` Status, aber der Job schlägt fehl und wird ständig neu gestartet
<a name="succeeded-restarting"></a>

Angenommen, Sie versuchen, eine statusbehaftete Anwendung mit `TestApplication` dem Namen Flink 1.15 auf Flink 1.18 in USA Ost (Nord-Virginia) zu aktualisieren. Die aktualisierte Flink 1.18-Anwendung kann jedoch nicht gestartet werden oder wird ständig neu gestartet, obwohl sich die Anwendung im Status befindet. `RUNNING` Dies ist ein häufiges Fehlerszenario. Um weitere Ausfallzeiten zu vermeiden, empfehlen wir, dass Sie Ihre Anwendung sofort auf die vorherige laufende Version (Flink 1.15) zurücksetzen und das Problem später diagnostizieren.

Verwenden Sie den AWS CLI Befehl rollback-application oder die API-Aktion, um die [Anwendung](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) auf die zuvor ausgeführte Version zurückzusetzen. [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) Diese API-Aktion macht die Änderungen rückgängig, die Sie vorgenommen haben und die zur neuesten Version geführt haben. Anschließend wird Ihre Anwendung mit dem letzten erfolgreichen Snapshot neu gestartet. 

Wir empfehlen dringend, dass Sie einen Snapshot mit Ihrer vorhandenen App erstellen, bevor Sie versuchen, ein Upgrade durchzuführen. Dies trägt dazu bei, Datenverlust oder die Notwendigkeit einer erneuten Verarbeitung von Daten zu vermeiden. 

In diesem Fehlerszenario CloudFormation wird die Anwendung nicht für Sie zurückgesetzt. Sie müssen die CloudFormation Vorlage so aktualisieren, dass sie auf die vorherige Laufzeit und auf den vorherigen Code verweist, um die Aktualisierung der Anwendung CloudFormation zu erzwingen. Andernfalls CloudFormation wird davon ausgegangen, dass Ihre Anwendung aktualisiert wurde, wenn sie in den `RUNNING` Status wechselt.

## Eine Anwendung, die feststeckt, wird rückgängig gemacht `UPDATING`
<a name="stuck-updating"></a>

Wenn Ihre Anwendung nach einem Upgrade-Versuch im `AUTOSCALING` Status `UPDATING` oder hängen bleibt, bietet Amazon Managed Service für Apache Flink den AWS CLI Befehl [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) oder die [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)API-Aktion, mit der die Anwendung auf die Version vor dem Status „blockiert“ oder „blockiert“ zurückgesetzt werden kann. `UPDATING` `AUTOSCALING` Diese API macht die Änderungen rückgängig, die Sie vorgenommen haben und die dazu geführt haben, dass die Anwendung in `UPDATING` einem transitiven Zustand hängengeblieben ist. `AUTOSCALING`

# Allgemeine bewährte Methoden und Empfehlungen für Anwendungs-Upgrades
<a name="best-practices-recommendations"></a>
+ Testen Sie die neue Version job/runtime ohne Status in einer Umgebung außerhalb der Produktionsumgebung, bevor Sie ein Produktionsupgrade durchführen.
+ Erwägen Sie, das statusbehaftete Upgrade zunächst mit einer Anwendung zu testen, die nicht zur Produktion gehört.
+ Stellen Sie sicher, dass Ihr neuer Job Graph einen kompatiblen Status mit dem Snapshot aufweist, den Sie zum Starten Ihrer aktualisierten Anwendung verwenden werden.
  + Stellen Sie sicher, dass die in den Operatorstatus gespeicherten Typen gleich bleiben. Wenn sich der Typ geändert hat, kann Apache Flink den Operatorstatus nicht wiederherstellen.
  + Stellen Sie sicher, dass der Operator, den IDs Sie mit der `uid` Methode festgelegt haben, derselbe bleibt. Apache Flink empfiehlt ausdrücklich, Operatoren eindeutig IDs zuzuweisen. Weitere Informationen finden Sie unter [Zuweisen von Operatoren IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids) in der Apache Flink-Dokumentation.

    Wenn Sie Ihren Operatoren nichts IDs zuweisen, generiert Flink sie automatisch. In diesem Fall hängen sie möglicherweise von der Programmstruktur ab und können, wenn sie geändert werden, zu Kompatibilitätsproblemen führen. Flink verwendet Operator IDs, um den Status im Snapshot dem Operator zuzuordnen. Eine Änderung des Operators IDs führt dazu, dass die Anwendung nicht gestartet wird oder der im Snapshot gespeicherte Status gelöscht wird und der neue Operator ohne Status startet.
  + Ändern Sie nicht den Schlüssel, der zum Speichern des eingegebenen Status verwendet wurde.
  + Ändern Sie nicht den Eingabetyp von statusbehafteten Operatoren wie Window oder Join. Dadurch wird implizit der Typ des internen Zustands des Operators geändert, was zu einer Zustandsinkompatibilität führt.

# Vorsichtsmaßnahmen und bekannte Probleme bei Anwendungsupgrades
<a name="precautions"></a>

## Kafka Commit beim Checkpointing schlägt nach einem Neustart des Brokers wiederholt fehl
<a name="apache-kafka-connector"></a>

Es gibt ein bekanntes Open-Source-Apache Flink-Problem mit dem Apache Kafka-Konnektor in Flink Version 1.15, das durch einen kritischen Open-Source-Fehler im Kafka-Client 2.8.1 verursacht wurde. Weitere Informationen finden Sie unter [Kafka Commit on Checkpointing schlägt nach einem Neustart des Brokers wiederholt fehl und kann die Verbindung zum Gruppenkoordinator nach einer](https://issues.apache.org/jira/browse/FLINK-28060) Ausnahme [nicht KafkaConsumer wiederherstellen](https://issues.apache.org/jira/browse/KAFKA-13840). commitOffsetAsync 

Um dieses Problem zu vermeiden, empfehlen wir, Apache Flink 1.18 oder höher in Amazon Managed Service für Apache Flink zu verwenden.

## Bekannte Einschränkungen der staatlichen Kompatibilität
<a name="state-precautions"></a>
+ Wenn Sie die Tabellen-API verwenden, garantiert Apache Flink keine Statuskompatibilität zwischen den Flink-Versionen. Weitere Informationen finden Sie unter [Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution) in der Apache Flink-Dokumentation.
+ Die Status von Flink 1.6 sind nicht mit Flink 1.18 kompatibel. Die API lehnt Ihre Anfrage ab, wenn Sie versuchen, ein Upgrade von 1.6 auf 1.18 und höher mit State durchzuführen. Sie können ein Upgrade auf 1.8, 1.11, 1.13 und 1.15 durchführen und einen Snapshot erstellen und dann auf 1.18 und höher aktualisieren. Weitere Informationen finden Sie unter [Upgraden von Anwendungen und Flink-Versionen in der Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/).

## Bekannte Probleme mit dem Flink Kinesis Connector
<a name="kinesis-connector-precautions"></a>
+ Wenn Sie Flink 1.11 oder eine frühere Version verwenden und den `amazon-kinesis-connector-flink` Connector für die Enhanced-fan-out (EFO) -Unterstützung verwenden, müssen Sie zusätzliche Schritte für ein Stateful-Upgrade auf Flink 1.13 oder höher durchführen. Dies liegt an der Änderung des Paketnamens des Connectors. Weitere Informationen finden Sie unter [amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink).

  Der `amazon-kinesis-connector-flink` Anschluss für Flink 1.11 und frühere Versionen verwendet die Verpackung`software.amazon.kinesis`, wohingegen der Kinesis-Anschluss für Flink 1.13 und höher verwendet. `org.apache.flink.streaming.connectors.kinesis` [Verwenden Sie dieses Tool, um Ihre Migration zu unterstützen: -state-migrator. amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator)
+ Wenn Sie Flink 1.13 oder früher mit Flink 1.15 oder höher verwenden `FlinkKinesisProducer` und ein Upgrade auf Flink 1.15 oder höher durchführen, müssen Sie für ein Stateful-Upgrade weiterhin Flink 1.15 oder höher verwenden, `FlinkKinesisProducer` anstatt das neuere. `KinesisStreamsSink` Wenn Sie jedoch bereits ein benutzerdefiniertes `uid` Set auf Ihrer Spüle haben, sollten Sie in der Lage sein, zu diesem zu wechseln, weil der Status nicht beibehalten wird. `KinesisStreamsSink` `FlinkKinesisProducer` Flink behandelt ihn als denselben Operator, da ein benutzerdefinierter Operator festgelegt `uid` ist.

## In Scala geschriebene Flink-Anwendungen
<a name="scala-precautions"></a>
+ Ab Flink 1.15 beinhaltet Apache Flink Scala nicht mehr in der Runtime. Sie müssen die Version von Scala, die Sie verwenden möchten, und andere Scala-Abhängigkeiten in Ihren Code aufnehmen, JAR/zip wenn Sie auf Flink 1.15 oder höher aktualisieren. Weitere Informationen finden Sie unter [Amazon Managed Service für Apache Flink für die Version Apache Flink 1.15.2](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html).
+ Wenn Ihre Anwendung Scala verwendet und Sie sie von Flink 1.11 oder früher (Scala 2.11) auf Flink 1.13 (Scala 2.12) aktualisieren, stellen Sie sicher, dass Ihr Code Scala 2.12 verwendet. Andernfalls kann Ihre Flink 1.13-Anwendung möglicherweise keine Scala 2.11-Klassen in der Flink 1.13-Laufzeit finden.

## Dinge, die Sie beim Downgrade der Flink-Anwendung beachten sollten
<a name="downgrading-precautions"></a>
+ Ein Downgrade von Flink-Anwendungen ist möglich, aber auf Fälle beschränkt, in denen die Anwendung zuvor mit der älteren Flink-Version ausgeführt wurde. Für ein Stateful-Upgrade benötigt Managed Service für Apache Flink die Verwendung eines Snapshots, der mit einer entsprechenden oder früheren Version für das Downgrade erstellt wurde
+ Wenn Sie Ihre Runtime von Flink 1.13 oder höher auf Flink 1.11 oder früher aktualisieren und Ihre App das HashMap State-Backend verwendet, schlägt Ihre Anwendung kontinuierlich fehl.

# Upgrade auf Flink 2.2: Vollständige Anleitung
<a name="flink-2-2-upgrade-guide"></a>

Dieses Handbuch enthält step-by-step Anweisungen für das Upgrade Ihrer Amazon Managed Service for Apache Flink-Anwendung von Flink 1.x auf Flink 2.2. Dies ist ein Hauptversions-Upgrade mit grundlegenden Änderungen, die sorgfältige Planung und Tests erfordern.

**Das Upgrade der Hauptversion erfolgt unidirektional**  
Der Upgrade-Vorgang kann Ihre Anwendung unter Beibehaltung des Zustands von Flink 1.x auf 2.2 verschieben, Sie können jedoch nicht von 2.2 auf 1.x mit dem Status 2.2 zurückkehren. Wenn Ihre Anwendung nach dem Upgrade fehlerhaft wird, verwenden Sie die Rollback-API, um zur 1.x-Version mit Ihrem ursprünglichen 1.x-Status aus dem letzten Snapshot zurückzukehren.

## Voraussetzungen
<a name="upgrade-guide-prerequisites"></a>

Bevor Sie mit dem Upgrade beginnen:
+ Bewertung [Wichtige Änderungen und veraltete Versionen](flink-2-2.md#flink-2-2-breaking-changes)
+ Rückblick [Leitfaden zur staatlichen Kompatibilität für Flink 2.2-Upgrades](state-compatibility.md)
+ Stellen Sie sicher, dass Sie über eine Testumgebung außerhalb der Produktionsumgebung verfügen
+ Dokumentieren Sie Ihre aktuelle Anwendungskonfiguration und Abhängigkeiten

## Verstehen Sie Ihre Migrationspfade
<a name="upgrade-guide-migration-paths"></a>

Ihr Upgrade-Erlebnis hängt von der Kompatibilität Ihrer Anwendung mit Flink 2.2 ab. Wenn Sie diese Pfade kennen, können Sie sich angemessen vorbereiten und realistische Erwartungen setzen.

**Pfad 1: Kompatibler Binär- und Anwendungsstatus**

**Was ist zu erwarten:**
+ Rufen Sie den Upgrade-Vorgang auf
+ Schließen Sie die Migration auf 2.2 ab, wobei sich der Status der Anwendung ändert: → → `RUNNING` `UPDATING` `RUNNING`
+ Behalten Sie den gesamten Anwendungsstatus ohne Datenverlust oder erneute Verarbeitung bei
+ Gleiche Erfahrung wie bei Migrationen kleinerer Versionen

Am besten geeignet für: Zustandslose Anwendungen oder Anwendungen mit kompatibler Serialisierung (Avro, kompatible Protobuf-Schemas, ohne Sammlungen) POJOs 

**Pfad 2: Binäre Inkompatibilitäten**

**Was ist zu erwarten:**
+ Rufen Sie den Upgrade-Vorgang auf
+ Der Vorgang schlägt fehl und die binäre Inkompatibilität wird über die Operations API und die Protokolle aufgedeckt
+ Mit aktiviertem Auto-Rollback: Anwendungen werden innerhalb von Minuten automatisch ohne Ihr Eingreifen zurückgesetzt
+ Bei deaktiviertem Auto-Rollback: Anwendungen bleiben ohne Datenverarbeitung im Ausführungsstatus; Sie führen ein manuelles Rollback zu einer älteren Version durch
+ Sobald die Binärdatei repariert ist, verwenden Sie die [UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) für ein ähnliches Erlebnis wie bei Pfad 1

Ideal für: Anwendungen APIs , die entfernte Anwendungen verwenden und beim Start des Flink-Jobs erkannt wurden

**Pfad 3: Status der inkompatiblen Anwendung**

**Was ist zu erwarten:**
+ Rufen Sie den Upgrade-Vorgang auf
+ Die Migration scheint zunächst erfolgreich zu sein
+ Anwendungen geraten innerhalb von Sekunden in Neustartschleifen, wenn die Statuswiederherstellung fehlschlägt
+ Erkennen Sie Fehler anhand von CloudWatch Metriken, die kontinuierliche Neustarts anzeigen
+ Rufen Sie den Rollback-Vorgang manuell auf
+ Kehren Sie innerhalb von Minuten nach Initiierung des Rollbacks zur Produktion zurück
+ Überprüfen Sie Ihre [Zustandsmigration](state-compatibility.md#state-compat-migration) Bewerbung

Am besten geeignet für: Anwendungen mit Inkompatibilitäten bei der staatlichen Serialisierung (POJOs mit Sammlungen, bestimmter Status mit Kryo-Serialisierung)

**Anmerkung**  
Es wird dringend empfohlen, ein Replikat Ihrer Produktionsanwendung zu erstellen und jede der folgenden Phasen des Upgrades auf dem Replikat zu testen, bevor Sie dieselben Schritte für Ihre Produktionsanwendung ausführen.

## Phase 1: Vorbereitung
<a name="upgrade-guide-phase-1"></a>

**Aktualisieren Sie den Anwendungscode**

Aktualisieren Sie Ihren Anwendungscode, damit er mit Flink 2.2 kompatibel ist:
+ **Aktualisieren Sie die Flink-Abhängigkeiten** auf Version 2.2.0 in Ihrem oder `pom.xml` `build.gradle`
+ **Aktualisieren Sie die Connector-Abhängigkeiten** auf Flink 2.2-kompatible Versionen (siehe) [Verfügbarkeit von Anschlüssen](flink-2-2.md#flink-2-2-connectors)
+ **Entfernen Sie die veraltete API-Nutzung**:
  + Ersetzen Sie die DataSet API durch eine API oder eine DataStream Tabellen-API/SQL
  + Ersetzen Sie die alte Version von`SourceFunction`/`SinkFunction`durch FLIP-27 Source und FLIP-143 Sink APIs
  + Ersetzen Sie die Verwendung der Scala-API durch die Java-API
+ **Update auf Java 17**

**Laden Sie den aktualisierten Anwendungscode hoch**
+ Erstellen Sie Ihr Anwendungs-JAR mit Flink 2.2-Abhängigkeiten
+ Laden Sie mit einem **anderen Dateinamen** als Ihrem aktuellen JAR auf Amazon S3 hoch (z. B.`my-app-flink-2.2.jar`)
+ Notieren Sie sich den S3-Bucket und den Schlüssel zur Verwendung im Upgrade-Schritt

## Phase 2: Automatisches Rollback aktivieren
<a name="upgrade-guide-phase-2"></a>

Mit Auto-Rollback kann Amazon Managed Service für Apache Flink automatisch zur vorherigen Version zurückkehren, falls das Upgrade fehlschlägt.

**Überprüfen Sie den Status des automatischen Rollbacks**

*AWS-Managementkonsole:*

1. Navigieren Sie zu Ihrer Bewerbung

1. Wählen Sie **Konfiguration**

1. Stellen Sie unter **Anwendungseinstellungen** sicher, dass der **System-Rollback aktiviert** ist

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**Automatisches Rollback aktivieren (falls nicht aktiviert)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## Phase 3: Schnappschuss erstellen (optional)
<a name="upgrade-guide-phase-3"></a>

Wenn automatische Snapshots für Ihre Anwendung aktiviert sind, können Sie diesen Schritt überspringen. Andernfalls erstellen Sie vor dem Upgrade einen Snapshot Ihrer Anwendung, um den Status Ihrer Anwendung zu speichern.

**Machen Sie einen Snapshot von der laufenden Anwendung**

*AWS-Managementkonsole:*

1. Navigieren Sie zu Ihrer Anwendung

1. Wählen Sie **Schnappschüsse**

1. Wählen Sie **Snapshot erstellen**

1. Geben Sie einen Snapshot-Namen ein (z. B.`pre-flink-2.2-upgrade`)

1. Wählen Sie **Create (Erstellen)** aus.

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**Überprüfen Sie die Erstellung des Snapshots**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

Warten Sie, bis dies `SnapshotStatus` der Fall ist, `READY` bevor Sie fortfahren.

## Phase 4: Anwendung aktualisieren
<a name="upgrade-guide-phase-4"></a>

Sie können Ihre Flink-Anwendung aktualisieren, indem Sie die [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion verwenden.

Sie können die `UpdateApplication` API auf verschiedene Arten aufrufen:
+ **Benutze die AWS-Managementkonsole.**
  + Gehen Sie zu Ihrer App-Seite auf der AWS-Managementkonsole.
  + Wählen Sie **Konfigurieren** aus.
  + Wählen Sie die neue Laufzeit und den Snapshot aus, von dem aus Sie beginnen möchten. Dies wird auch als Wiederherstellungskonfiguration bezeichnet. Verwenden Sie die neueste Einstellung als Wiederherstellungskonfiguration, um die App vom neuesten Snapshot aus zu starten. Zeigen Sie auf die neue aktualisierte Anwendung JAR/zip auf Amazon S3.
+ **Verwenden Sie die AWS CLI[https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html)**Aktion.
+ **Benutze CloudFormation.**
  + Aktualisiere das `RuntimeEnvironment` Feld. Zuvor wurde die Anwendung CloudFormation gelöscht und eine neue erstellt, wodurch Ihre Schnappschüsse und andere App-Historien verloren gingen. CloudFormation Aktualisiert jetzt Ihr `RuntimeEnvironment` vorhandenes Dokument und löscht Ihre Anwendung nicht.
+ **Verwenden Sie das AWS SDK.**
  + Die Programmiersprache Ihrer Wahl finden Sie in der SDK-Dokumentation. Siehe [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html).

Sie können das Upgrade durchführen, während sich die Anwendung im `RUNNING` Status befindet oder während die Anwendung im `READY` Status gestoppt ist. Amazon Managed Service für Apache Flink überprüft die Kompatibilität zwischen der ursprünglichen Runtime-Version und der Ziel-Runtime-Version. Diese Kompatibilitätsprüfung wird ausgeführt, wenn Sie diese durchführen, `UpdateApplication` während Sie sich im `RUNNING` Status befinden, oder beim nächsten Mal, `StartApplication` wenn Sie ein Upgrade durchführen, während Sie sich im `READY` Status befinden.

**Führen Sie das Upgrade vom Status RUNNING aus**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**Führen Sie ein Upgrade aus dem Status BEREIT durch**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## Phase 5: Überwachen Sie das Upgrade
<a name="upgrade-guide-phase-5"></a>

**Kompatibilitätsprüfung**
+ Verwenden Sie die Operations API, um den Status des Upgrades zu überprüfen. Wenn binäre Inkompatibilitäten oder Probleme beim Jobstart auftreten, schlägt der Upgrade-Vorgang mit Protokollen fehl.
+ Wenn der Upgrade-Vorgang erfolgreich war, die Anwendung jedoch in Neustartschleifen stecken bleibt, bedeutet dies, dass der Status nicht mit der neuen Flink-Version kompatibel ist oder dass ein Problem mit dem aktualisierten Code vorliegt. Erfahren Sie[Leitfaden zur staatlichen Kompatibilität für Flink 2.2-Upgrades](state-compatibility.md), wie Sie Probleme mit staatlichen Inkompatibilitäten identifizieren können.

**Überwachen Sie den Zustand der Anwendung**

*Status der Anwendung:*
+ Der Status der Bewerbung sollte sich ändern: `RUNNING` → `UPDATING` → `RUNNING`
+ Überprüfen Sie die Laufzeit der Anwendung. Wenn es 2.2 ist, war der Upgrade-Vorgang erfolgreich.
+ Wenn Ihre Anwendung aktiviert ist, `RUNNING` aber noch in der älteren Runtime läuft, wurde das automatische Rollback aktiviert. Die Operations API zeigt den Vorgang als an. `FAILED` Suchen Sie in den Protokollen nach der Ausnahme für einen Fehler.

Überwachen Sie außerdem diese Metriken in CloudWatch:

*Metrik neu starten:*
+ `numRestarts`: Achten Sie auf unerwartete Neustarts — das Upgrade ist erfolgreich, wenn der Wert Null `numRestarts` ist und/oder `uptime` der Wert zunimmt. `runningTime`

*Checkpoint-Metriken:*
+ `lastCheckpointDuration`: Sollte den Werten vor dem Upgrade ähneln
+ `numberOfFailedCheckpoints`: Sollte bei 0 bleiben

## Phase 6: Überprüfen Sie das Anwendungsverhalten
<a name="upgrade-guide-phase-6"></a>

Nachdem die Anwendung auf Flink 2.2 ausgeführt wurde:

**Funktionale Validierung**
+ Stellen Sie sicher, dass Daten aus Quellen gelesen werden
+ Stellen Sie sicher, dass Daten auf Speichermedien geschrieben werden
+ Stellen Sie sicher, dass die Geschäftslogik die erwarteten Ergebnisse liefert
+ Vergleichen Sie das Ergebnis mit dem Ausgangswert vor dem Upgrade

**Überprüfung der Leistung**
+ Überwachen Sie die Latenzkennzahlen (end-to-end Verarbeitungszeit)
+ Überwachen Sie die Durchsatzmetriken (Datensätze pro Sekunde)
+ Überwachen Sie die Dauer und Größe der Checkpoints
+ Überwachen Sie die Speicher- und CPU-Auslastung

**Läuft für mehr als 24 Stunden**

Lassen Sie die Anwendung mindestens 24 Stunden in der Produktion laufen, um Folgendes sicherzustellen:
+ Keine Speicherlecks
+ Stabiles Checkpoint-Verhalten
+ Keine unerwarteten Neustarts
+ Konsistenter Durchsatz

## Phase 7: Rollback-Verfahren
<a name="upgrade-guide-phase-7"></a>

Wenn das Upgrade fehlschlägt oder die Anwendung läuft, aber fehlerhaft ist, führen Sie ein Rollback zur vorherigen Version durch.

**Automatisches Rollback**

Wenn Auto-Rollback aktiviert ist und das Upgrade beim Start fehlschlägt, kehrt Amazon Managed Service für Apache Flink automatisch zur vorherigen Version zurück.

**Manuelles Rollback**

Wenn die Anwendung läuft, aber fehlerhaft ist, verwenden Sie die `RollbackApplication` API:

*AWS-Managementkonsole:*

1. Navigieren Sie zu Ihrer Anwendung

1. Wählen Sie **Aktionen** → **Rollback**

1. Bestätigen Sie das Rollback

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**Was passiert beim Rollback:**
+ Die Anwendung wird gestoppt
+ Runtime kehrt zur vorherigen Flink-Version zurück
+ Der Anwendungscode wird auf die vorherige JAR zurückgesetzt
+ Die Anwendung wird mit dem letzten erfolgreichen Snapshot neu gestartet, der **vor dem** Upgrade erstellt wurde

**Wichtig**  
Sie können einen Flink 2.2-Snapshot auf Flink 1.x nicht wiederherstellen
Rollback verwendet den Snapshot, der vor dem Upgrade erstellt wurde
Erstellen Sie vor dem Upgrade immer einen Snapshot (Phase 3)

## Nächste Schritte
<a name="upgrade-guide-next-steps"></a>

Bei Fragen oder Problemen während des Upgrades wenden Sie sich an den Support [Fehlerbehebung bei Managed Service für Apache Flink](troubleshooting.md) oder wenden Sie sich an den AWS Support.

# Leitfaden zur staatlichen Kompatibilität für Flink 2.2-Upgrades
<a name="state-compatibility"></a>

Beim Upgrade von Flink 1.x auf Flink 2.2 können Kompatibilitätsprobleme verhindern, dass Ihre Anwendung aus Snapshots wiederhergestellt werden kann. Dieser Leitfaden hilft Ihnen bei der Identifizierung potenzieller Kompatibilitätsprobleme und bietet Migrationsstrategien.

## Grundlegendes zu Änderungen der Statuskompatibilität
<a name="state-compat-understanding"></a>

Amazon Managed Service für Apache Flink 2.2 führt mehrere Serialisierungsänderungen ein, die sich auf die Statuskompatibilität auswirken. Im Folgenden sind die wichtigsten aufgeführt:
+ **Kryo-Versionsupgrade**: Apache Flink 2.2 aktualisiert den mitgelieferten Kryo-Serializer von Version 2 auf Version 5. Da Kryo v5 ein anderes binäres Kodierungsformat als Kryo v2 verwendet, kann jeder Operatorstatus, der über Kryo in einem Flink 1.x-Savepoint serialisiert wurde, in Flink 2.2 nicht wiederhergestellt werden.
+ **Serialisierung von Java-Sammlungen**: In Flink 1.x wurden die darin enthaltenen Java-Sammlungen (wie, und) mit Kryo serialisiert. `HashMap` `ArrayList` `HashSet` POJOs Flink 2.2 führt sammlungsspezifisch optimierte Serialisierer ein, die nicht mit dem Kryo-serialisierten Status von 1.x kompatibel sind. Anwendungen, die Java-Sammlungen mit POJO- oder Kryo-Serialisierern in 1.x verwenden, können diesen Zustand in Flink 2.2 nicht wiederherstellen. Weitere Informationen zu Datentypen und Serialisierung finden Sie in der [Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/).
+ **Kinesis Connector-Kompatibilität**: Die Kinesis Data Streams (KDS) Connector-Version unter 5.0 behält den Status bei, dass er nicht mit dem Flink 2.2 Kinesis Connector Version 6.0 kompatibel ist. Sie müssen vor dem Upgrade auf Connector-Version 5.0 oder höher migrieren.

## Kompatibilitätsreferenz zur Serialisierung
<a name="state-compat-reference"></a>

Überprüfen Sie alle Statusdeklarationen in Ihrer Anwendung und ordnen Sie die Serialisierungstypen der folgenden Tabelle zu. Wenn ein Zustandstyp nicht kompatibel ist, lesen Sie den [Zustandsmigration](#state-compat-migration) Abschnitt, bevor Sie mit dem Upgrade fortfahren.


**Kompatibilitätsreferenz zur Serialisierung**  

| Typ der Serialisierung | Kompatibel? | Details | 
| --- | --- | --- | 
| Avro (SpecificRecord,GenericRecord) | Ja | Verwendet unabhängig von Kryo ein eigenes Binärformat. Stellen Sie sicher, dass Sie die systemeigenen Avro-Typinformationen von Flink verwenden und nicht Avro, das als Kryo-Serializer registriert ist. | 
| Protobuf | Ja | Verwendet unabhängig von Kryo eine eigene Binärcodierung. Stellen Sie sicher, dass Schemaänderungen abwärtskompatiblen Evolutionsregeln entsprechen. | 
| POJOs ohne Sammlungen | Ja | Wird vom POJO-Serializer von Flink verarbeitet — aber nur, wenn die Klasse alle POJO-Kriterien erfüllt: öffentliche Klasse, öffentlicher No-Arg-Konstruktor, alle Felder, die entweder öffentlich oder über Getter/Setter zugänglich sind, und alle Feldtypen selbst, die von Flink serialisierbar sind. Ein POJO, das gegen eines dieser Kriterien verstößt, fällt stillschweigend auf Kryo zurück und wird inkompatibel. | 
| Benutzerdefiniert TypeSerializers | Ja | Nur kompatibel, wenn Ihr Serializer nicht intern an Kryo delegiert. | 
| Status der SQL- und Tabellen-API | Ja (mit Vorbehalt) | Verwendet die internen Serialisierer von Flink. Apache Flink garantiert jedoch nicht die staatliche Kompatibilität zwischen Hauptversionen für Table-API-Anwendungen. Testen Sie zuerst in einer Umgebung außerhalb der Produktionsumgebung. | 
| POJOs mit Java-Sammlungen (HashMap,ArrayList,HashSet) | Nein | In Flink 1.x POJOs wurden die darin enthaltenen Sammlungen über Kryo v2 serialisiert. Flink 2.2 führt spezielle Serialisierer für Sammlungen ein, deren Binärformat nicht mit dem Kryo v2-Format kompatibel ist. | 
| Scala-Fallklassen | Nein | Serialisiert über Kryo in Flink 1.x. Das Upgrade von Kryo v2 auf v5 ändert das Binärformat. | 
| Java-Aufzeichnungen | Nein | In der Regel wird in Flink 1.x auf die Kryo-Serialisierung zurückgegriffen. Überprüfen Sie dies, indem Sie mit testen. disableGenericTypes() | 
| Bibliothekstypen von Drittanbietern | Nein | Typen ohne einen registrierten benutzerdefinierten Serializer fallen auf Kryo zurück. Die Änderung des Binärformats von Kryo v2 auf v5 beeinträchtigt die Kompatibilität. | 
| Jeder Typ, der Kryo-Fallback verwendet | Nein | Wenn Flink einen Typ mit einem eingebauten oder registrierten Serializer nicht verarbeiten kann, fällt er auf Kryo zurück. Der gesamte Kryo-serialisierte Status von 1.x ist mit 2.2 nicht kompatibel. | 

## Diagnostische Methoden
<a name="state-compat-diagnostics"></a>

Sie können Probleme mit der Statuskompatibilität entweder proaktiv identifizieren, indem Sie sich die Anwendungsprotokolle ansehen oder die Protokolle nach dem [UpdateApplication API-Vorgang](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) überprüfen.

**Identifizieren Sie Kryo-Fallback in Ihrer Anwendung**

Sie können das folgende Regex-Muster in Ihren Protokollen verwenden, um Kryo-Fallback in Ihrer Anwendung zu identifizieren:

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

Beispielprotokoll:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

Wenn das Upgrade mithilfe der UpdateApplication API fehlschlägt, deuten die folgenden Ausnahmen möglicherweise darauf hin, dass Sie auf eine Inkompatibilität des Serializer-basierten Status stoßen:

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## Checkliste vor dem Upgrade
<a name="state-compat-checklist"></a>
+ Überprüfen Sie alle staatlichen Erklärungen in Ihrem Antrag
+ Suchen Sie nach POJOs Sammlungen (`HashMap`,`ArrayList`,`HashSet`)
+ Überprüfen Sie die Serialisierungsmethoden für jeden Zustandstyp
+ Erstellen Sie eine Prod-Replica-Anwendung und testen Sie die State-Kompatibilität mithilfe der UpdateApplication API auf diesem Replikat
+ Wenn der Status inkompatibel ist, wählen Sie eine Strategie aus [Zustandsmigration](#state-compat-migration)
+ Aktivieren Sie das automatische Rollback in der Konfiguration Ihrer Flink-Produktionsanwendung

## Zustandsmigration
<a name="state-compat-migration"></a>

**Vollständigen Status neu erstellen**

Am besten für Anwendungen geeignet, bei denen der Status anhand von Quelldaten wiederhergestellt werden kann.

Wenn Ihre Anwendung den Status anhand von Quelldaten neu erstellen kann:

1. Stoppen Sie die Flink 1.x-Anwendung

1. Führen Sie ein Upgrade auf Flink 2.x mit aktualisiertem Code durch

1. Beginne mit `SKIP_RESTORE_FROM_SNAPSHOT`

1. Erlauben Sie der Anwendung, den Status wiederherzustellen

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## Best Practices
<a name="state-compat-best-practices"></a>

1. **Verwenden Sie immer Avro oder Protobuf für komplexe Zustände** — Diese ermöglichen die Schemaentwicklung und sind Kryo-unabhängig

1. **Vermeiden Sie Sammlungen in POJOs** — Verwenden Sie stattdessen das native und von Flink `ListState` `MapState`

1. **Testen Sie die Statuswiederherstellung lokal** — Testen Sie vor dem Produktionsupgrade mit tatsächlichen Snapshots

1. **Machen Sie häufig Schnappschüsse** — vor allem vor größeren Versionsupgrades

1. **Automatisches Rollback aktivieren** — Konfigurieren Sie Ihre MSF-Anwendung so, dass sie bei einem Fehler automatisch zurückkehrt

1. **Dokumentieren Sie Ihre Zustandstypen** — Pflegen Sie die Dokumentation aller Zustandstypen und ihrer Serialisierungsmethoden

1. **Überwachen Sie die Checkpoint-Größen — Zunehmende** Checkpoint-Größen können auf Serialisierungsprobleme hinweisen

## Nächste Schritte
<a name="state-compat-next-steps"></a>

**Planen Sie Ihr Upgrade**: Siehe. [Upgrade auf Flink 2.2: Vollständige Anleitung](flink-2-2-upgrade-guide.md)

Bei Fragen oder Problemen während der Migration wenden Sie sich an den Support [Fehlerbehebung bei Managed Service für Apache Flink](troubleshooting.md) oder wenden Sie sich an den AWS Support.

# Implementieren Sie die Anwendungsskalierung in Managed Service für Apache Flink
<a name="how-scaling"></a>

Sie können die parallele Ausführung von Aufgaben und die Zuweisung von Ressourcen für Amazon Managed Service für Apache Flink konfigurieren, um die Skalierung zu implementieren. Informationen darüber, wie Apache Flink parallel Instanzen von Aufgaben plant, finden Sie unter [Parallele Ausführung](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/) in der Apache Flink-Dokumentation.

**Topics**
+ [Konfigurieren Sie Anwendungsparallelität und KPU ParallelismPer](#how-parallelism)
+ [Kinesis-Verarbeitungseinheiten zuweisen](#how-scaling-kpus)
+ [Aktualisieren Sie die Parallelität Ihrer Anwendung](#how-scaling-howto)
+ [Verwenden Sie die automatische Skalierung in Managed Service für Apache Flink](how-scaling-auto.md)
+ [Überlegungen zu maxParallelism](#how-scaling-auto-max-parallelism)

## Konfigurieren Sie Anwendungsparallelität und KPU ParallelismPer
<a name="how-parallelism"></a>

Sie konfigurieren die parallele Ausführung der Aufgaben Ihrer mit Managed Service für Apache Flink erstellten Anwendung (wie das Lesen aus einer Quelle oder das Ausführen eines Operators) mithilfe der folgenden [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html)-Eigenschaften: 
+ `Parallelism` – Verwenden Sie diese Eigenschaft, um die Standardparallelität der Apache-Flink-Anwendung festzulegen. Alle Operatoren, Quellen und Senken werden mit dieser Parallelität ausgeführt, sofern sie nicht im Anwendungscode überschrieben werden. Der Standardwert beträgt `1`; der voreingestellte Maximalwert beträgt `256`.
+ `ParallelismPerKPU` – Verwenden Sie diese Eigenschaft, um die Anzahl der parallelen Aufgaben festzulegen, die pro Kinesis Processing Unit (KPU) Ihrer Anwendung geplant werden können. Der Standardwert ist `1` und der Maximalwert ist `8`. Bei Anwendungen mit blockierenden Vorgängen (z. B. E/A) führt ein höherer `ParallelismPerKPU`-Wert zu einer Vollauslastung der KPU-Ressourcen.

**Anmerkung**  
Der Grenzwert für `Parallelism` entspricht dem `ParallelismPerKPU` Mehrfachen des Grenzwerts für KPUs (der standardmäßig 64 ist). Das KPUs Limit kann erhöht werden, indem eine Erhöhung des Limits beantragt wird. Anweisungen zum Anfordern einer Erhöhung dieses Grenzwerts finden Sie unter „So fordern Sie eine Erhöhung des Grenzwerts an“ unter [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

Informationen zum Einstellen der Aufgabenparallelität für einen bestimmten Operator finden Sie unter [Setting the Parallelism: Operator](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level) in der Apache Flink-Dokumentation.

## Kinesis-Verarbeitungseinheiten zuweisen
<a name="how-scaling-kpus"></a>

Managed Service für Apache Flink stellt Kapazität bereit als. KPUs Eine einzelne KPU bietet Ihnen 1 vCPU und 4 GB Arbeitsspeicher. Für jede zugewiesene KPU werden außerdem 50 GB Speicher für laufende Anwendungen bereitgestellt. 

Managed Service for Apache Flink berechnet KPUs die für die Ausführung Ihrer Anwendung erforderlichen Werte anhand der `ParallelismPerKPU` Eigenschaften `Parallelism` und wie folgt:

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Managed Service für Apache Flink stellt Ihren Anwendungen schnell Ressourcen zur Verfügung, um auf Spitzen im Durchsatz oder bei der Verarbeitungsaktivität zu reagieren. Es entfernt Ressourcen schrittweise aus Ihrer Anwendung, nachdem die Aktivitätsspitze vorüber ist. Um die automatische Zuweisung von Ressourcen zu deaktivieren, setzen Sie den Wert von `AutoScalingEnabled` auf `false`, wie weiter unten unter [Aktualisieren Sie die Parallelität Ihrer Anwendung](#how-scaling-howto) beschrieben. 

Das Standardlimit KPUs für Ihre Anwendung ist 64. Anweisungen zum Anfordern einer Erhöhung dieses Grenzwerts finden Sie unter „So fordern Sie eine Erhöhung des Grenzwerts an“ unter [Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html).

**Anmerkung**  
Für Orchestrierungszwecke wird eine zusätzliche KPU berechnet. Weitere Informationen finden Sie unter [Managed Service für Apache Flink – Preise](https://aws.amazon.com/kinesis/data-analytics/pricing/).

## Aktualisieren Sie die Parallelität Ihrer Anwendung
<a name="how-scaling-howto"></a>

Dieser Abschnitt enthält Beispielanfragen für API-Aktionen, die die Parallelität einer Anwendung festlegen. Weitere Beispiele und Anweisungen zur Verwendung von Anforderungsblöcken mit API-Aktionen finden Sie unter [Beispielcode für Managed Service für Apache Flink API](api-examples.md).

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)-Aktion legt die Parallelität fest, wenn Sie eine Anwendung erstellen:

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion legt die Parallelität für eine bestehende Anwendung fest:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

Die folgende Beispielanforderung für die [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)-Aktion deaktiviert die Parallelität für eine bestehende Anwendung:

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Verwenden Sie die automatische Skalierung in Managed Service für Apache Flink
<a name="how-scaling-auto"></a>

Managed Service für Apache Flink skaliert die Parallelität Ihrer Anwendung elastisch, um dem Datendurchsatz Ihrer Quelle und der Komplexität Ihres Operators in den meisten Szenarien Rechnung zu tragen. Die automatische Skalierung ist standardmäßig aktiviert. Managed Service für Apache Flink überwacht die Ressourcenauslastung (CPU-Auslastung) Ihrer Anwendung und skaliert die Parallelität Ihrer Anwendung entsprechend elastisch nach oben oder unten:
+ Ihre Anwendung `containerCPUUtilization` wird skaliert (erhöht die Parallelität), wenn das CloudWatch metrische Maximum 15 Minuten lang mehr als 75 Prozent oder mehr beträgt. Das bedeutet, dass die `ScaleUp` Aktion ausgelöst wird, wenn es 15 aufeinanderfolgende Datenpunkte mit einem Zeitraum von 1 Minute gibt, der 75 Prozent oder mehr entspricht. Eine `ScaleUp` Aktion verdoppelt den Wert Ihrer Anwendung`CurrentParallelism`. `ParallelismPerKPU`wird nicht geändert. Infolgedessen verdoppelt sich KPUs auch die Anzahl der zugewiesenen Personen. 
+ Ihre Anwendung wird herunterskaliert (Parallelität wird verringert), wenn die CPU-Auslastung sechs Stunden lang unter 10 Prozent bleibt. Das bedeutet, dass die `ScaleDown` Aktion ausgelöst wird, wenn 360 aufeinanderfolgende Datenpunkte mit einem Zeitraum von 1 Minute weniger als 10 Prozent vorhanden sind. Eine `ScaleDown` Aktion halbiert (aufgerundet) die Parallelität der Anwendung. `ParallelismPerKPU`wird nicht verändert, und die Anzahl der zugewiesenen Personen halbiert sich KPUs ebenfalls (aufgerundet). 

**Anmerkung**  
Es kann auf ein Maximum von `containerCPUUtilization` mehr als 1 Minute verwiesen werden, um die Korrelation mit einem Datenpunkt zu ermitteln, der für die Skalierungsaktion verwendet wird. Es ist jedoch nicht erforderlich, den genauen Zeitpunkt wiederzugeben, zu dem die Aktion initialisiert wird.

Managed Service für Apache Flink reduziert den `CurrentParallelism`-Wert Ihrer Anwendung nicht auf weniger als die `Parallelism`-Einstellung Ihrer Anwendung.

Wenn der Service von Managed Service für Apache Flink Ihre Anwendung skaliert, befindet er sich im Status `AUTOSCALING`. Sie können Ihren aktuellen Anwendungsstatus mithilfe der [ DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html)Aktionen oder überprüfen. [ ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html) Während der Service Ihre Anwendung skaliert, können Sie die einzige gültige API-Aktion verwenden, wenn der `Force` Parameter auf gesetzt ist [ StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)`true`.

Sie können die Eigenschaft `AutoScalingEnabled` (Teil von [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html) ) verwenden, um das Auto-Scaling-Verhalten zu aktivieren oder zu deaktivieren. Ihr AWS Konto wird für KPUs die Bereitstellung von Managed Service for Apache Flink belastet, was von Ihren Anwendungen `parallelism` und `parallelismPerKPU` Einstellungen abhängt. Eine Aktivitätsspitze erhöht Ihre Kosten für Managed Service für Apache Flink.

Weitere Informationen finden Sie unter [Amazon Managed Service für Apache Flink – Preise](https://aws.amazon.com/kinesis/data-analytics/pricing/). 

Beachten Sie Folgendes im Zusammenhang mit der Anwendungsskalierung:
+ Die automatische Skalierung ist standardmäßig aktiviert.
+ Skalierung gilt nicht für Studio-Notebooks. Wenn Sie ein Studio-Notebook jedoch als Anwendung mit dauerhaftem Zustand bereitstellen, gilt die Skalierung für die bereitgestellte Anwendung.
+ Ihre Anwendung hat ein Standardlimit von KPUs 64. Weitere Informationen finden Sie unter [Managed Service für Apache Flink und Studio Notebook-Kontingent](limits.md).
+ Wenn Auto Scaling die Anwendungsparallelität aktualisiert, kommt es bei der Anwendung zu Ausfallzeiten. Gehen Sie wie folgt vor, um diese Ausfallzeit zu vermeiden:
  + Deaktivieren der automatischen Skalierung
  + Konfigurieren Sie das `parallelism` und `parallelismPerKPU` mit der [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion Ihrer Anwendung. Weitere Informationen zum Festlegen der Parallelitätseinstellungen Ihrer Anwendung finden Sie unter. [Aktualisieren Sie die Parallelität Ihrer Anwendung](how-scaling.md#how-scaling-howto)
  + Überwachen Sie regelmäßig den Ressourcenverbrauch Ihrer Anwendung, um sicherzustellen, dass Ihre Anwendung über die richtigen Parallelitätseinstellungen für ihren Workload verfügt. Weitere Informationen über die Überwachung des Ressourcenverbrauchs finden Sie unter [Metriken und Dimensionen in Managed Service für Apache Flink](metrics-dimensions.md).

## Implementieren Sie benutzerdefiniertes Autoscaling
<a name="how-scaling-custom-autoscaling"></a>

Wenn Sie eine genauere Kontrolle über die automatische Skalierung wünschen oder andere Trigger-Metriken als verwenden möchten`containerCPUUtilization`, können Sie dieses Beispiel verwenden: 
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  Dieses Beispiel zeigt, wie Sie Ihre Managed Service for Apache Flink-Anwendung mithilfe einer anderen CloudWatch Metrik als der Apache Flink-Anwendung skalieren können, einschließlich Metriken aus Amazon MSK und Amazon Kinesis Data Streams, die als Quellen oder Senken verwendet werden.

Weitere Informationen finden Sie unter [Verbesserte Überwachung und automatische Skalierung](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/) für Apache Flink.

## Implementieren Sie die geplante automatische Skalierung
<a name="how-scaling-scheduled-autoscaling"></a>

Wenn Ihre Arbeitslast im Laufe der Zeit einem vorhersehbaren Profil folgt, ziehen Sie es möglicherweise vor, Ihre Apache Flink-Anwendung präventiv zu skalieren. Dadurch wird Ihre Anwendung zu einem geplanten Zeitpunkt skaliert, anstatt reaktiv auf der Grundlage einer Metrik zu skalieren. Um das Hoch- und Herunterskalieren zu festen Tageszeiten einzurichten, können Sie dieses Beispiel verwenden:
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## Überlegungen zu maxParallelism
<a name="how-scaling-auto-max-parallelism"></a>

Die maximale Parallelität, die ein Flink-Job skalieren kann, ist durch das *Minimum* `maxParallelism` für alle Operatoren des Jobs begrenzt. Wenn Sie beispielsweise einen einfachen Job mit nur einer Quelle und einer Senke haben und die Quelle einen Wert `maxParallelism` von 16 und die Senke einen Wert von 8 hat, kann die Anwendung nicht über die Parallelität 8 hinaus skalieren.

Informationen darüber, wie die Standardeinstellung `maxParallelism` eines Operators berechnet wird und wie Sie die Standardeinstellung überschreiben können, finden Sie unter [Setting the Maximum Parallelism in der Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism).

Als Grundregel gilt: Wenn Sie `maxParallelism` für keinen Operator definieren und Ihre Anwendung mit einer Parallelität von weniger als oder gleich 128 starten, haben alle Operatoren einen Wert von 128. `maxParallelism`

**Anmerkung**  
Die maximale Parallelität des Jobs ist die Obergrenze der Parallelität für die Skalierung Ihrer Anwendung unter Beibehaltung des Status.   
Wenn Sie eine bestehende Anwendung ändern, kann die Anwendung nicht `maxParallelism` von einem früheren Snapshot aus neu gestartet werden, der mit der alten erstellt wurde. `maxParallelism` Sie können die Anwendung nur ohne Snapshot neu starten.   
Wenn Sie planen, Ihre Anwendung auf eine Parallelität von mehr als 128 zu skalieren, müssen Sie dies `maxParallelism` in Ihrer Anwendung explizit festlegen.
+ Die Autoscaling-Logik verhindert, dass ein Flink-Job auf eine Parallelität skaliert wird, die die maximale Parallelität des Jobs überschreitet.
+ Wenn Sie eine benutzerdefinierte automatische Skalierung oder eine geplante Skalierung verwenden, konfigurieren Sie sie so, dass sie die maximale Parallelität des Jobs nicht überschreiten.
+ Wenn Sie Ihre Anwendung manuell über die maximale Parallelität hinaus skalieren, kann die Anwendung nicht gestartet werden.

# Hinzufügen von Tags zu Managed Service für Apache Flink-Anwendungen
<a name="how-tagging"></a>



In diesem Abschnitt wird beschrieben, wie Sie Schlüssel-Wert-Metadaten-Tags zu Anwendungen, die Managed Service für Apache Flink nutzen, hinzufügen. Diese Tags können für die folgenden Zwecke verwendet werden:
+ Festlegen der Abrechnung für einzelne Anwendungen, die Managed Service für Apache Flink nutzen. Weitere Informationen finden Sie unter [Verwendung von Kostenzuordnungs-Tags](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html) im *Benutzerhandbuch Fakturierungs- und Kostenverwaltung*.
+ Steuern des Zugriffs auf Anwendungsressourcen basierend auf Tags. Weitere Informationen finden Sie unter [Zugriffssteuerung mit Tags](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html) im *AWS Identity and Access Management Benutzerhandbuch*.
+ Benutzerdefinierte Zwecke. Sie können die Anwendungsfunktionalität basierend auf dem Vorhandensein von Benutzer-Tags definieren.

Bitte beachten Sie die folgenden Informationen über Tagging:
+ Die maximale Anzahl an Anwendungs-Tags enthält System-Tags. Die maximale Anzahl an benutzerdefinierten Anwendungs-Tags ist 50.
+ Wenn eine Aktion eine Tag-Liste beinhaltet, die doppelte `Key`-Werte enthält, löst der Service eine `InvalidArgumentException` aus.

**Topics**
+ [Fügen Sie Tags hinzu, wenn eine Anwendung erstellt wird](how-tagging-create.md)
+ [Fügen Sie Tags für eine bestehende Anwendung hinzu oder aktualisieren Sie sie](how-tagging-add.md)
+ [Tags für eine Anwendung auflisten](how-tagging-list.md)
+ [Entfernen Sie Tags aus einer Anwendung](how-tagging-remove.md)

# Fügen Sie Tags hinzu, wenn eine Anwendung erstellt wird
<a name="how-tagging-create"></a>

Sie fügen Tags hinzu, wenn Sie eine Anwendung mithilfe des `tags` [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)Aktionsparameters erstellen.

Das folgende Beispiel zeigt den `Tags`-Knoten für eine `CreateApplication`-Anforderung:

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# Fügen Sie Tags für eine bestehende Anwendung hinzu oder aktualisieren Sie sie
<a name="how-tagging-add"></a>

Mithilfe der [TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html)Aktion fügen Sie einer Anwendung Tags hinzu. Mithilfe der [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)Aktion können Sie einer Anwendung keine Tags hinzufügen.

Fügen Sie zum Aktualisieren eines vorhandenen Tags ein Tag mit demselben Schlüssel wie das vorhandene Tag hinzu.

In der folgenden Beispiel-Anfrage für die Aktion`TagResource` werden neue Tags hinzugefügt oder vorhandene Tags aktualisiert:

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# Tags für eine Anwendung auflisten
<a name="how-tagging-list"></a>

Um vorhandene Tags aufzulisten, verwenden Sie die [ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html)Aktion.

In der folgenden Beispiel-Anfrage für die Aktion `ListTagsForResource` werden Tags für eine Anwendung aufgelistet:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# Entfernen Sie Tags aus einer Anwendung
<a name="how-tagging-remove"></a>

Um Tags aus einer Anwendung zu entfernen, verwenden Sie die [UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html)Aktion.

In der folgenden Beispiel-Anfrage für die Aktion `UntagResource` werden Tags aus einer Anwendung entfernt:

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Verwendung CloudFormation mit Managed Service für Apache Flink
<a name="lambda-cfn-flink"></a>

Die folgende Übung zeigt, wie Sie eine Flink-Anwendung starten, die CloudFormation mit einer Lambda-Funktion im selben Stack erstellt wurde. 

## Bevor Sie beginnen
<a name="before-you-begin"></a>

Bevor Sie mit dieser Übung beginnen, folgen Sie den Schritten zum Erstellen einer Flink-Anwendung mithilfe von at. CloudFormation [AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html)

## Schreiben Sie eine Lambda-Funktion
<a name="write-lambda-function"></a>

Um eine Flink-Anwendung nach der Erstellung oder Aktualisierung zu starten, verwenden wir die kinesisanalyticsv2 [start-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html) API. Der Aufruf wird durch ein CloudFormation Ereignis nach der Erstellung der Flink-Anwendung ausgelöst. Wir werden später in dieser Übung besprechen, wie der Stack so eingerichtet wird, dass er die Lambda-Funktion auslöst, aber zuerst konzentrieren wir uns auf die Lambda-Funktionsdeklaration und ihren Code. In diesem Beispiel verwenden wir die `Python3.8`-Laufzeit. 

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

Im vorherigen Code verarbeitet Lambda eingehende CloudFormation Ereignisse, filtert alles andere heraus `Create` und ruft den Anwendungsstatus ab und startet ihn`Update`, falls der Status vorhanden ist`READY`. Um den Anwendungsstatus abzurufen, müssen Sie die Lambda-Rolle erstellen, wie im Folgenden gezeigt.

## Eine Lambda-Rolle erstellen
<a name="create-lambda-role"></a>

Sie erstellen eine Rolle für Lambda, um erfolgreich mit der Anwendung zu kommunizieren und Protokolle zu schreiben. Diese Rolle verwendet standardmäßig verwaltete Richtlinien, aber Sie sollten sie möglicherweise auf die Verwendung benutzerdefinierter Richtlinien einschränken.

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Beachten Sie, dass die Lambda-Ressourcen nach der Erstellung der Flink-Anwendung im selben Stack erstellt werden, da sie davon abhängen.

## Aufrufen der Lambda-Funktion
<a name="invoking-lambda-function"></a>

Jetzt müssen Sie nur noch die Lambda-Funktion aufrufen. Sie tun dies, indem Sie eine [benutzerdefinierte Ressource](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html) verwenden.

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

Das ist alles, was Sie benötigen, um Ihre Flink-Anwendung mit Lambda zu starten. Sie sind jetzt bereit, Ihren eigenen Stack zu erstellen oder anhand des vollständigen Beispiels unten zu sehen, wie all diese Schritte in der Praxis funktionieren.

## Sehen Sie sich ein erweitertes Beispiel an
<a name="lambda-cfn-flink-full-example"></a>

Das folgende Beispiel ist eine leicht erweiterte Version der vorherigen Schritte mit einer zusätzlichen `RunConfiguration` Anpassung über [Vorlagenparameter](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html). Dies ist ein funktionierender Stack, den Sie ausprobieren können. Lesen Sie unbedingt die beigefügten Hinweise: 

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

Auch hier sollten Sie ggf. die Rollen für Lambda sowie eine Anwendung selbst anpassen.

Vergessen Sie nicht, Ihre Parameter anzugeben, bevor Sie den obigen Stack erstellen.

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

Ersetzen Sie `YOUR_BUCKET_ARN` und `YOUR_JAR` durch Ihre spezifischen Anforderungen. Sie können dieser [Anleitung](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html) folgen, um einen Amazon-S3-Bucket und ein Anwendungs-Jar zu erstellen.

Erstellen Sie nun den Stack (ersetzen Sie YOUR\$1REGION durch eine Region Ihrer Wahl, z. B. us-east-1):

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

Sie können jetzt zu [https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation) navigieren und sich den Fortschritt ansehen. Nach der Erstellung sollte Ihre Flink-Anwendung im `Starting`-Zustand angezeigt werden. Es kann einige Minuten dauern, bis es `Running` startet. 

Weitere Informationen finden Sie hier:
+ [Vier Möglichkeiten zum Abrufen beliebiger AWS Service-Eigenschaften mithilfe von AWS CloudFormation (Teil 1 von 3](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/)).
+ [Exemplarische Vorgehensweise: Amazon Machine Image IDs nachschlagen](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html).

# Verwenden Sie das Apache Flink Dashboard mit Managed Service für Apache Flink
<a name="how-dashboard"></a>

Sie können das Apache Flink-Dashboard Ihrer Anwendung verwenden, um den Zustand Ihres Managed Service für Apache Flink-Anwendung zu überwachen. Das Dashboard Ihrer Anwendung zeigt die folgenden Informationen an:
+ Verwendete Ressourcen, einschließlich Task-Managern und Task-Slots. 
+ Informationen zu Jobs, einschließlich laufender, abgeschlossener, abgebrochener und fehlgeschlagener Jobs. 

Informationen zu Apache Flink Task Managern, Task Slots und Jobs finden Sie unter [Apache Flink Architektur](https://flink.apache.org/what-is-flink/flink-architecture/) auf der Apache Flink-Website. 

Beachten Sie Folgendes zur Verwendung des Apache Flink-Dashboards mit Managed Service für Apache Flink-Anwendungen:
+ Das Apache Flink Dashboard für Managed Service für Apache Flink-Anwendungen ist schreibgeschützt. Sie können mit dem Apache Flink Dashboard keine Änderungen an Ihrem Managed Service für Apache Flink-Anwendung vornehmen.
+ Das Apache Flink Dashboard ist nicht mit Microsoft Internet Explorer kompatibel.

## Greifen Sie auf das Apache Flink Dashboard Ihrer Anwendung zu
<a name="how-dashboard-accessing"></a>

Sie können auf das Apache Flink-Dashboard Ihrer Anwendung entweder über den Managed Service für Apache Flink-Konsole zugreifen oder indem Sie über die CLI einen sicheren URL-Endpunkt anfordern.

### Greifen Sie über die Managed Service for Apache Flink-Konsole auf das Apache Flink-Dashboard Ihrer Anwendung zu
<a name="how-dashboard-accessing-console"></a>

Um von der Konsole aus auf das Apache Flink Dashboard Ihrer Anwendung zuzugreifen, wählen Sie **Apache Flink Dashboard** auf der Seite Ihrer Anwendung.

**Anmerkung**  
Wenn Sie das Dashboard von dem Managed Service für Apache Flink-Konsole aus öffnen, ist die von der Konsole generierte URL 12 Stunden lang gültig.

### Greifen Sie mit dem Managed Service für Apache Flink CLI auf das Apache Flink-Dashboard Ihrer Anwendung zu
<a name="how-dashboard-accessing-cli"></a>

Sie können den Managed Services für Apache Flink CLI verwenden, um eine URL für den Zugriff auf Ihr Anwendungs-Dashboard zu generieren. Die URL, die Sie generieren, ist für eine bestimmte Zeit gültig.

**Anmerkung**  
Wenn Sie nicht innerhalb von drei Minuten auf die generierte URL zugreifen, ist sie nicht mehr gültig.

Sie generieren Ihre Dashboard-URL mithilfe der [ CreateApplicationPresignedUrl](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html)Aktion. Sie können die folgenden Werte für die Aktion angeben: 
+ Den Anwendungsnamen
+ Die Zeit in Sekunden, über die hinweg wird die URL gültig sein
+ Sie geben `FLINK_DASHBOARD_URL` als URL-Typ an.