

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden Sie ein Studio-Notebook mit Managed Service für Apache Flink
<a name="how-notebook"></a>

Studio-Notebooks für Managed Service für Apache Flink ermöglichen Ihnen die interaktive Abfrage von Datenströmen in Echtzeit und die einfache Erstellung und Ausführung von Stream-Verarbeitungsanwendungen mit Standard-SQL, Python und Scala. Mit ein paar Klicks in der AWS Management-Konsole können Sie ein serverloses Notebook starten, um Datenströme abzufragen und innerhalb von Sekunden Ergebnisse zu erhalten. 

Ein Notebook ist eine webbasierte Entwicklungsumgebung. Notebooks bieten ein einfaches interaktives Entwicklungserlebnis in Kombination mit den fortschrittlichen Funktionen von Apache Flink. Studio-Notebooks verwenden Notebooks, die auf [Apache Zeppelin](https://zeppelin.apache.org/) basieren, und [Apache Flink](https://flink.apache.org/) als Engine für die Streamverarbeitung. Studio-Notebooks kombinieren diese Technologien nahtlos, um Entwicklern aller Qualifikationsstufen erweiterte Analysen von Datenströmen zugänglich zu machen. 

Apache Zeppelin bietet für Ihre Studio-Notebooks eine komplette Suite von Analysetools, darunter die folgenden:
+ Datenvisualisierung
+ Exportieren der Daten in Dateien
+ Kontrolle über das Ausgabeformat zur Erleichterung von Analysen

Hinweise zu den ersten Schritten mit Managed Service für Apache Flink und Apache Zeppelin finden Sie unter [Tutorial: Erstellen Sie ein Studio-Notizbuch in Managed Service für Apache Flink](example-notebook.md). Weitere Informationen zu Apache Zeppelin finden Sie in der [Apache-Zeppelin-Dokumentation](http://zeppelin.apache.org).

 Mit einem Notizbuch modellieren Sie Abfragen mithilfe der Apache Flink [Table API und SQL](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/overview/) in SQL, Python oder Scala oder der [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) in Scala. Mit wenigen Klicks können Sie das Studio-Notebook dann zu einer kontinuierlich laufenden, nicht interaktiven Stream-Verarbeitungsanwendung, die Managed Service für Apache Flink nutzt, für Ihre Produktions-Workloads heraufstufen.

**Topics**
+ [Verwenden Sie die richtige Runtime-Version des Studio-Notebooks](studio-notebook-versions.md)
+ [Erstellen Sie ein Studio-Notizbuch](how-zeppelin-creating.md)
+ [

# Führen Sie eine interaktive Analyse von Streaming-Daten durch
](how-zeppelin-interactive.md)
+ [

# Stellen Sie es als Anwendung mit dauerhaftem Zustand bereit
](how-notebook-durable.md)
+ [IAM-Berechtigungen](how-zeppelin-iam.md)
+ [Verwenden Sie Konnektoren und Abhängigkeiten](how-zeppelin-connectors.md)
+ [Benutzerdefinierte Funktionen](how-zeppelin-udf.md)
+ [

# Aktivieren von Checkpointing
](how-zeppelin-checkpoint.md)
+ [

# Aktualisieren Sie Studio Runtime
](upgrading-studio-runtime.md)
+ [

# Arbeiten Sie mit AWS Glue
](how-zeppelin-glue.md)
+ [Beispiele und Tutorials für Studio-Notebooks in Managed Service für Apache Flink](how-zeppelin-examples.md)
+ [

# Beheben Sie Probleme mit Studio-Notebooks für Managed Service für Apache Flink
](how-zeppelin-troubleshooting.md)
+ [

# Erstellen Sie benutzerdefinierte IAM-Richtlinien für Managed Service für Apache Flink Studio-Notebooks
](how-zeppelin-appendix-iam.md)

# Verwenden Sie die richtige Runtime-Version des Studio-Notebooks
<a name="studio-notebook-versions"></a>

Mit Amazon Managed Service für Apache Flink Studio können Sie Datenströme in Echtzeit abfragen und Streamverarbeitungsanwendungen mit Standard-SQL, Python und Scala in einem interaktiven Notizbuch erstellen und ausführen. Studio-Notebooks werden von [Apache Zeppelin](https://zeppelin.apache.org/) unterstützt und verwenden [Apache Flink](https://flink.apache.org/) als Stream-Verarbeitungs-Engine. 

**Anmerkung**  
Wir werden Studio Runtime mit **Apache Flink Version 1.11** am 5. November 2024 als veraltet markieren. Ab diesem Datum können Sie mit dieser Version keine neuen Notebooks ausführen oder neue Anwendungen erstellen. Wir empfehlen, dass Sie vor diesem Zeitpunkt auf die neueste Runtime (Apache Flink 1.15 und Apache Zeppelin 0.10) aktualisieren. Hinweise zur Aktualisierung Ihres Notebooks finden Sie unter. [Aktualisieren Sie Studio Runtime](upgrading-studio-runtime.md)


**Studio-Laufzeit**  

| Apache Flink-Version | Apache Zeppelin-Ausführung | Python-Version |  | 
| --- | --- | --- | --- | 
| 1.15 | 0.1 | 3.8 | Empfohlen | 
| 1.13 | 0.9 | 3.8 | Wird bis 16. Oktober 2024 unterstützt | 
| 1.11 | 0.9 | 3.7 | Wird am 24. Februar 2025 nicht mehr unterstützt | 

# Erstellen Sie ein Studio-Notizbuch
<a name="how-zeppelin-creating"></a>

Ein Studio-Notebook enthält in SQL, Python oder Scala geschriebene Abfragen oder Programme, die auf Streaming-Daten ausgeführt werden und Analyseergebnisse zurückgeben. Sie erstellen Ihre Anwendung entweder mit der Konsole oder der CLI und stellen Abfragen zur Analyse der Daten aus Ihrer Datenquelle bereit.

Die Anwendung besteht aus folgenden Komponenten:
+ Einer Datenquelle, z. B. einem Amazon-MSK-Cluster, einem Kinesis-Datenstrom oder einem Amazon-S3-Bucket.
+ Eine AWS Glue Datenbank. Diese Datenbank enthält Tabellen, in denen Ihre Datenquellen- und Zielschemas und Endpunkte gespeichert sind. Weitere Informationen finden Sie unter [Arbeiten Sie mit AWS Glue](how-zeppelin-glue.md).
+ Ihr Anwendungscode. Ihr Code implementiert Ihre Analyseabfrage oder Ihr Analyseprogramm.
+ Ihre Anwendungseinstellungen und Laufzeiteigenschaften. Informationen zu Anwendungseinstellungen und Laufzeiteigenschaften finden Sie unter den folgenden Themen im [Entwicklerhandbuch für Apache-Flink-Anwendungen](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html):
  + **Anwendungsparallelität und -skalierung:** Sie verwenden die Parallelitätseinstellung Ihrer Anwendung, um die Anzahl der Abfragen zu steuern, die Ihre Anwendung gleichzeitig ausführen kann. Ihre Abfragen können auch von der erhöhten Parallelität profitieren, wenn sie mehrere Ausführungspfade haben, z. B. unter den folgenden Umständen:
    + Bei der Verarbeitung mehrerer Shards eines Kinesis-Datenstroms
    + Bei der Partitionierung von Daten mit dem `KeyBy`-Operator.
    + Bei Verwendung mehrerer Fensteroperatoren

    Weitere Informationen zur Anwendungsskalierung finden Sie unter [Anwendungsskalierung in Managed Service für Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html).
  + **Protokollierung und Überwachung: ** Informationen zur Anwendungsprotokollierung und -überwachung finden Sie unter [Protokollierung und Überwachung in Amazon Managed Service für Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/monitoring-overview.html).
  + Ihre Anwendung verwendet Checkpoints und Savepoints aus Gründen der Fehlertoleranz. Checkpoints und Savepoints sind für Studio-Notebooks standardmäßig nicht aktiviert.

Sie können Ihr Studio-Notizbuch entweder mit dem AWS-Managementkonsole oder dem erstellen AWS CLI. 

Beim Erstellen der Anwendung über die Konsole stehen Ihnen die folgenden Optionen zur Verfügung:
+ Wählen Sie in der Amazon-MSK-Konsole Ihren Cluster aus und wählen Sie dann **Daten in Echtzeit verarbeiten**.
+ Wählen Sie in der Konsole von Kinesis Data Streams Ihren Datenstrom und dann auf der Registerkarte **Anwendungen** die Option **Daten in Echtzeit verarbeiten** aus.
+ Wählen Sie in der Konsole von Managed Service für Apache Flink die Registerkarte **Studio** und dann **Studio-Notebook erstellen** aus.

# Führen Sie eine interaktive Analyse von Streaming-Daten durch
<a name="how-zeppelin-interactive"></a>

Sie verwenden ein Serverless Notebook mit Apache Zeppelin, um mit Ihren Streaming-Daten zu interagieren. Ihr Notebook kann mehrere Notizen enthalten, und jede Notiz kann einen oder mehrere Absätze enthalten, in die Sie Ihren Code schreiben können.

Die folgende beispielhafte SQL-Abfrage zeigt, wie Daten aus einer Datenquelle abgerufen werden:

```
%flink.ssql(type=update)
select * from stock;
```

Weitere Beispiele für Flink Streaming SQL-Abfragen finden Sie im [Beispiele und Tutorials für Studio-Notebooks in Managed Service für Apache Flink](how-zeppelin-examples.md) Folgenden und unter [Abfragen](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/overview/) in der Apache Flink-Dokumentation.

Sie können Flink-SQL-Abfragen im Studio-Notebook verwenden, um Streaming-Daten abzufragen. Sie können auch Python (Tabellen-API) und Scala (Table und Datastream APIs) verwenden, um Programme zu schreiben, mit denen Sie Ihre Streaming-Daten interaktiv abfragen können. Sie können die Ergebnisse Ihrer Abfragen oder Programme anzeigen, sie innerhalb von Sekunden aktualisieren und erneut ausführen, um aktualisierte Ergebnisse anzuzeigen.

## Flink-Interpreter
<a name="how-zeppelin-interactive-interpreters"></a>

Sie geben mithilfe eines *Interpreters* an, in welcher Sprache Managed Service für Apache Flink Ihre Anwendung ausführt. Sie können die folgenden Interpreter mit Managed Service für Apache Flink verwenden:


| Name | Klasse | Description | 
| --- |--- |--- |
| %flink | FlinkInterpreter | Erzeugt ExecutionEnvironment/StreamExecutionEnvironment/BatchTableEnvironment/StreamTableEnvironmentund stellt eine Scala-Umgebung bereit | 
| %flink.pyflink | PyFlinkInterpreter | Stellt eine Python-Umgebung bereit | 
| %flink.ipyflink | IPyFlinkInterpreter | Stellt eine IPython-Umgebung bereit | 
| %flink.ssql | FlinkStreamSqlInterpreter | Stellt eine Stream-SQL-Umgebung bereit | 
| %flink.bsql | FlinkBatchSqlInterpreter | Stellt eine Batch-SQL-Umgebung bereit | 

Weitere Informationen zu Flink-Interpretern finden Sie unter [ Flink-Interpreter für Apache Zeppelin](https://zeppelin.apache.org/docs/0.9.0/interpreter/flink.html).

Wenn Sie `%flink.pyflink` oder `%flink.ipyflink` als Interpreter verwenden, müssen Sie den `ZeppelinContext` verwenden, um die Ergebnisse im Notebook zu visualisieren.

 PyFlink Spezifischere Beispiele finden Sie unter [Interaktive Abfrage Ihrer Datenströme mithilfe von Managed Service für Apache Flink Studio und Python](https://aws.amazon.com/blogs/big-data/query-your-data-streams-interactively-using-kinesis-data-analytics-studio-and-python/).

## Tabellenumgebungsvariablen von Apache Zeppelin
<a name="how-zeppelin-interactive-env-vars"></a>

Apache Zeppelin bietet mithilfe von Umgebungsvariablen Zugriff auf Tabellenumgebungsressourcen. 

Sie greifen mit den folgenden Variablen auf Ressourcen der Scala-Tabellenumgebung zu:


| Variable | Ressource | 
| --- |--- |
| senv | StreamExecutionEnvironment | 
| stenv | StreamTableEnvironment for blink planner | 

Sie greifen mit den folgenden Variablen auf Ressourcen der Python-Tabellenumgebung zu:


| Variable | Ressource | 
| --- |--- |
| s\$1env | StreamExecutionEnvironment | 
| st\$1env | StreamTableEnvironment for blink planner | 

Weitere Informationen zur Verwendung von Tabellenumgebungen finden Sie unter [Concepts and Common API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/common/) in der Apache Flink-Dokumentation. 

# Stellen Sie es als Anwendung mit dauerhaftem Zustand bereit
<a name="how-notebook-durable"></a>

Sie können Ihren Code erstellen und zu Amazon S3 exportieren. Sie können den Code, den Sie in Ihrer Notiz geschrieben haben, in eine kontinuierlich laufende Stream-Verarbeitungsanwendung umwandeln. Es gibt zwei Arten, eine Apache-Flink-Anwendung auf Managed Service für Apache Flink auszuführen: Mit einem Studio-Notebook haben Sie die Möglichkeit, Ihren Code interaktiv zu entwickeln, die Ergebnisse Ihres Codes in Echtzeit anzuzeigen und ihn in Ihrer Notiz zu visualisieren. Nachdem Sie eine Notiz für die Ausführung im Streaming-Modus bereitgestellt haben, erstellt Managed Service für Apache Flink eine Anwendung für Sie, die kontinuierlich ausgeführt wird, Daten aus Ihren Quellen liest, in Ihre Ziele schreibt, den Status lang laufender Anwendungen beibehält und automatisch auf der Grundlage des Durchsatzes Ihrer Quellströme skaliert. 

**Anmerkung**  
Der S3-Bucket, in den Sie den Anwendungscode exportieren, muss sich in derselben Region wie Ihr Studio-Notebook befinden.

Sie können eine Notiz aus Ihrem Studio-Notebook nur bereitstellen, wenn sie die folgenden Kriterien erfüllt:
+ Die Absätze müssen der Reihe nach angeordnet werden. Wenn Sie Ihre Anwendung bereitstellen, werden alle Absätze in einer Notiz nacheinander ausgeführt (left-to-right, top-to-bottom), so wie sie in Ihrer Notiz erscheinen. Sie können diese Reihenfolge überprüfen, indem Sie in Ihrer Notiz **Alle Absätze ausführen** wählen.
+ Ihr Code ist eine Kombination aus Python und SQL oder Scala und SQL. Wir unterstützen Python und Scala derzeit nicht zusammen für deploy-as-application.
+ Ihre Notiz darf nur die folgenden Interpreter enthalten: `%flink`, `%flink.ssql`, `%flink.pyflink`, `%flink.ipyflink`, `%md`.
+ Die Verwendung des [Zeppelin-Kontext](https://zeppelin.apache.org/docs/0.9.0/usage/other_features/zeppelin_context.html)-Objekts `z` wird nicht unterstützt. Methoden, die nichts zurückgeben, tun nichts, außer eine Warnung zu protokollieren. Andere Methoden lösen Python-Ausnahmen aus oder können nicht in Scala kompiliert werden.
+ Eine Notiz muss zu einem einzigen Apache-Flink-Auftrag führen. 
+ Notizen mit [dynamischen Formularen](https://zeppelin.apache.org/docs/0.9.0/usage/dynamic_form/intro.html) werden für die Bereitstellung als Anwendung nicht unterstützt.
+ %md-Absätze ([Markdown](https://zeppelin.apache.org/docs/0.9.0/interpreter/markdown.html)) werden bei der Bereitstellung als Anwendung übersprungen, da davon ausgegangen wird, dass sie menschenlesbare Dokumentation enthalten, die für die Ausführung als Teil der resultierenden Anwendung nicht geeignet ist.
+ Absätze, die für die Ausführung in Zeppelin deaktiviert sind, werden bei der Bereitstellung als Anwendung übersprungen. Selbst wenn ein deaktivierter Absatz einen inkompatiblen Interpreter verwendet, z. B. `%flink.ipyflink` in einer Notiz mit `%flink`- `and %flink.ssql`-Interpretern, wird er bei der Bereitstellung der Notiz als Anwendung übersprungen und führt nicht zu einem Fehler.
+ Damit die Anwendungsbereitstellung erfolgreich ist, muss mindestens ein Absatz mit Quellcode (Flink SQL PyFlink oder Flink Scala) vorhanden sein, der für die Ausführung aktiviert ist.
+ Das Einstellen von Parallelität in der Interpreter-Direktive innerhalb eines Absatzes (z. B. `%flink.ssql(parallelism=32)`) wird in Anwendungen, die über eine Notiz bereitgestellt werden, ignoriert. Stattdessen können Sie die bereitgestellte Anwendung über die AWS Command Line Interface oder AWS API aktualisieren AWS-Managementkonsole, um die and/or ParallelismPer Parallelismus-KPU-Einstellungen entsprechend dem Grad der Parallelität zu ändern, den Ihre Anwendung benötigt, oder Sie können Autoscaling für Ihre bereitgestellte Anwendung aktivieren.
+ Wenn Sie als Anwendung mit einem dauerhaften Zustand bereitstellen, muss Ihre VPC über Internetzugang verfügen. Wenn Ihre VPC keinen Internetzugang hat, finden Sie weitere Informationen unter [Als Anwendung mit dauerhaftem Zustand in einer VPC ohne Internetzugang bereitstellen](how-zeppelin-troubleshooting.md#how-zeppelin-troubleshooting-deploying-no-internet). 

## Scala/Python-Kriterien
<a name="how-notebook-durable-scala"></a>
+ Verwenden Sie in Ihrem Scala- oder Python-Code den [Blink-Planer](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/#dependency-structure) (`senv`, `stenv` für Scala; `s_env`, `st_env` für Python) und nicht den älteren „Flink“-Planer (`stenv_2` für Scala, `st_env_2` für Python). Das Apache-Flink-Projekt empfiehlt die Verwendung des Blink-Planers für Produktionsanwendungen. Dies ist der Standardplaner in Zeppelin und Flink.
+ Ihre Python-Absätze dürfen keine [Shell-Aufrufe/Zuweisungen verwenden `!` oder [IPython magische Befehle](https://ipython.readthedocs.io/en/stable/interactive/magics.html) wie `%timeit` oder `%conda` in Notizen verwenden, die als Anwendungen](https://ipython.readthedocs.io/en/stable/interactive/python-ipython-diff.html#shell-assignment) bereitgestellt werden sollen.
+ Sie können Scala-Fallklassen nicht als Parameter von Funktionen verwenden, die an Datenflussoperatoren höherer Ordnung wie `map` und `filter` übergeben werden. Informationen zu Scala-Fallklassen finden Sie unter [FALLKLASSEN](https://docs.scala-lang.org/overviews/scala-book/case-classes.html) in der Scala-Dokumentation.

## SQL-Kriterien
<a name="how-notebook-durable-sql"></a>
+ Einfache SELECT-Anweisungen sind nicht zulässig, da es kein Äquivalent zum Ausgabeabschnitt eines Absatzes gibt, in den die Daten übermittelt werden können.
+ In jedem Absatz müssen DDL-Anweisungen (`USE`, `CREATE`, `ALTER`, `DROP`, `SET`, `RESET`) den DML-(`INSERT`)-Anweisungen vorangehen. Dies liegt daran, dass DML-Anweisungen in einem Absatz zusammen als ein einziger Flink-Auftrag eingereicht werden müssen.
+ Es darf höchstens einen Absatz geben, der DML-Anweisungen enthält. Das liegt daran, dass wir für diese deploy-as-application Funktion nur das Senden eines einzelnen Jobs an Flink unterstützen.

Weitere Informationen und ein Beispiel finden Sie unter [ Übersetzen, Redigieren und Analysieren von Streaming-Daten mithilfe von SQL-Funktionen mit Amazon Managed Service für Apache Flink, Amazon Translate und Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesisanalytics-MyApplicatioamazon-translate-and-amazon-comprehend/).

# Überprüfen Sie die IAM-Berechtigungen für Studio-Notebooks
<a name="how-zeppelin-iam"></a>

Managed Service für Apache Flink erstellt eine IAM-Rolle für Sie, wenn Sie ein Studio-Notebook über die AWS-Managementkonsole erstellen. Außerdem wird dieser Rolle eine Richtlinie zugeordnet, die den folgenden Zugriff ermöglicht:


****  

| Service | Zugriff  | 
| --- | --- | 
| CloudWatch Protokolle | Auflisten | 
| Amazon EC2 | Auflisten | 
| AWS Glue | Lesen, Schreiben | 
| Managed Service für Apache Flink | Lesen | 
| Managed Service für Apache Flink V2 | Lesen | 
| Amazon S3 | Lesen, Schreiben | 

# Verwenden Sie Konnektoren und Abhängigkeiten
<a name="how-zeppelin-connectors"></a>

Konnektoren ermöglichen es Ihnen, Daten über verschiedene Technologien hinweg zu lesen und zu schreiben. Managed Service für Apache Flink bündelt drei Standard-Konnektoren mit Ihrem Studio-Notebook. Sie können auch benutzerdefinierte Konnektoren verwenden. Weitere Informationen zu Konnektoren finden Sie unter [Tabellen- und SQL-Konnektoren](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/) in der Apache-Flink-Dokumentation.

## Standardkonnektoren
<a name="zeppelin-default-connectors"></a>

Wenn Sie das verwenden, AWS-Managementkonsole um Ihr Studio-Notizbuch zu erstellen, enthält Managed Service for Apache Flink standardmäßig die folgenden benutzerdefinierten Konnektoren:`flink-sql-connector-kinesis`, `flink-connector-kafka_2.12` und`aws-msk-iam-auth`. Um über die Konsole ein Studio-Notebook ohne diese benutzerdefinierten Konnektoren zu erstellen, wählen Sie die Option **Mit benutzerdefinierten Einstellungen erstellen**. Wenn Sie dann zur Seite **Konfigurationen** gelangen, deaktivieren Sie die Kontrollkästchen neben den beiden Konnektoren.

Wenn Sie die [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API verwenden, um Ihr Studio-Notizbuch zu erstellen, sind die `flink-connector-kafka` Konnektoren `flink-sql-connector-flink` und -Konnektoren standardmäßig nicht enthalten. Um sie hinzuzufügen, geben Sie sie als eine `MavenReference` im `CustomArtifactsConfiguration`-Datentyp an, wie in den folgenden Beispielen gezeigt.

Der Konnektor `aws-msk-iam-auth` ist der Konnektor, der mit Amazon MSK verwendet werden soll und das Feature zur automatischen Authentifizierung bei IAM enthält. 

**Anmerkung**  
Die im folgenden Beispiel gezeigten Konnektor-Versionen sind die einzigen Versionen, die wir unterstützen.

```
For the Kinesis connector:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-sql-connector-kinesis",
      "Version": "1.15.4"

   }      
}]

For authenticating with AWS MSK through AWS IAM:

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "software.amazon.msk",
      "ArtifactId": "aws-msk-iam-auth",
      "Version": "1.1.6"
   }      
}]
            
For the Apache Kafka connector:  

"CustomArtifactsConfiguration": [{
"ArtifactType": "DEPENDENCY_JAR",            
   "MavenReference": {
"GroupId": "org.apache.flink",

      "ArtifactId": "flink-connector-kafka",
      "Version": "1.15.4"

   }      
}]
```

Um diese Konnektoren zu einem vorhandenen Notizbuch hinzuzufügen, verwenden Sie den [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API-Vorgang und geben Sie sie als a `MavenReference` im `CustomArtifactsConfigurationUpdate` Datentyp an.

**Anmerkung**  
Sie können `failOnError` für den Konnektor `flink-sql-connector-kinesis` in der Tabellen-API auf true setzen.

## Fügen Sie Abhängigkeiten und benutzerdefinierte Konnektoren hinzu
<a name="zeppelin-custom-connectors"></a>

Gehen Sie folgendermaßen vor AWS-Managementkonsole , um Ihrem Studio-Notizbuch eine Abhängigkeit oder einen benutzerdefinierten Connector hinzuzufügen:

1. Laden Sie die Datei Ihres benutzerdefinierten Konnektors in Amazon S3 hoch.

1. Wählen Sie im die Option **Benutzerdefiniert erstellen AWS-Managementkonsole**, um Ihr Studio-Notizbuch zu erstellen.

1. Folgen Sie dem Workflow zur Erstellung eines Studio-Notebooks, bis Sie zum Schritt **Konfigurationen** gelangen.

1. Wählen Sie im Abschnitt **Benutzerdefinierte Konnektoren** die Option **Benutzerdefinierten Konnektor hinzufügen** aus.

1. Geben Sie den Amazon-S3-Speicherort der Abhängigkeit oder des benutzerdefinierten Konnektors an.

1. Wählen Sie **Änderungen speichern ** aus.

Um eine Abhängigkeits-JAR oder einen benutzerdefinierten Connector hinzuzufügen, wenn Sie ein neues Studio-Notizbuch mithilfe der [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API erstellen, geben Sie den Amazon S3 S3-Speicherort der Abhängigkeits-JAR oder des benutzerdefinierten Connectors im `CustomArtifactsConfiguration` Datentyp an. Um einem vorhandenen Studio-Notizbuch eine Abhängigkeit oder einen benutzerdefinierten Connector hinzuzufügen, rufen Sie den [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API-Vorgang auf und geben Sie den Amazon S3 S3-Speicherort der Abhängigkeits-JAR oder des benutzerdefinierten Connectors im `CustomArtifactsConfigurationUpdate` Datentyp an.

**Anmerkung**  
Wenn Sie eine Abhängigkeit oder einen benutzerdefinierten Konnektor einbeziehen, müssen Sie auch alle zugehörigen transitiven Abhängigkeiten einbeziehen, die nicht darin gebündelt sind.

# Implementieren Sie benutzerdefinierte Funktionen
<a name="how-zeppelin-udf"></a>

Benutzerdefinierte Funktionen (UDFs) sind Erweiterungspunkte, mit denen Sie häufig verwendete Logik oder benutzerdefinierte Logik aufrufen können, die in Abfragen nicht anders ausgedrückt werden kann. Sie können Python oder eine JVM-Sprache wie Java oder Scala verwenden, um Ihre In-Paragraphen UDFs in Ihrem Studio-Notizbuch zu implementieren. Sie können Ihrem Studio-Notizbuch auch externe JAR-Dateien hinzufügen, die in einer UDFs JVM-Sprache implementiert sind. 

Verwenden Sie bei der Implementierung JARs dieses Registers abstrakter Klassen dieser Unterklasse `UserDefinedFunction` (oder Ihrer eigenen abstrakten Klassen) den bereitgestellten Bereich in Apache Maven, `compileOnly` Abhängigkeitsdeklarationen in Gradle, den bereitgestellten Bereich in SBT oder eine entsprechende Direktive in Ihrer UDF-Projekt-Build-Konfiguration. Dadurch kann der UDF-Quellcode gegen den Flink kompiliert werden APIs, aber die Flink-API-Klassen sind selbst nicht in den Build-Artefakten enthalten. Beziehen Sie sich auf dieses [POM](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) aus dem UDF-JAR-Beispiel, das diese Voraussetzung für ein Maven-Projekt erfüllt. 

**Anmerkung**  
Weitere Informationen und ein Beispiel finden Sie unter [Übersetzen, Redigieren und Analysieren von Streaming-Daten mithilfe von SQL-Funktionen mit Amazon Managed Service für Apache Flink, Amazon Translate und Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/) im *AWS Machine Learning Blog*.

Gehen Sie folgendermaßen vor, um die Konsole zum Hinzufügen von UDF-JAR-Dateien zu Ihrem Studio-Notebook zu verwenden:

1. Laden Sie Ihre UDF-JAR-Datei in Amazon S3 hoch.

1. Wählen Sie im die Option **Benutzerdefiniert erstellen AWS-Managementkonsole, um Ihr Studio-Notizbuch** zu erstellen.

1. Folgen Sie dem Workflow zur Erstellung eines Studio-Notebooks, bis Sie zum Schritt **Konfigurationen** gelangen.

1. Wählen Sie im Abschnitt **Benutzerdefinierte Funktionen** die Option **Benutzerdefinierte Funktion hinzufügen** aus.

1. Geben Sie den Amazon-S3-Speicherort der JAR- oder ZIP-Datei an, in der Ihre UDF implementiert ist.

1. Wählen Sie **Änderungen speichern ** aus.

Um beim Erstellen eines neuen Studio-Notebooks mithilfe der [CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)API eine UDF-JAR hinzuzufügen, geben Sie den JAR-Speicherort im `CustomArtifactConfiguration` Datentyp an. Um einem vorhandenen Studio-Notizbuch eine UDF-JAR hinzuzufügen, rufen Sie den [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)API-Vorgang auf und geben Sie den JAR-Speicherort im `CustomArtifactsConfigurationUpdate` Datentyp an. Alternativ können Sie das verwenden, AWS-Managementkonsole um UDF-JAR-Dateien zu Ihrem Studio-Notebook hinzuzufügen.

## Überlegungen zu benutzerdefinierten Funktionen
<a name="how-zeppelin-udf-considerations"></a>
+ Managed Service für Apache Flink Studio verwendet die [Apache-Zeppelin-Terminologie](https://zeppelin.apache.org/docs/0.9.0/quickstart/explore_ui.html), wobei ein Notebook eine Zeppelin-Instance ist, die mehrere Notizen enthalten kann. Jede Notiz kann dann wiederum mehrere Absätze enthalten. Mit Managed Service für Apache Flink Studio wird der Interpreter-Prozess von allen Notizen im Notebook gemeinsam genutzt. Wenn Sie also eine explizite Funktionsregistrierung mithilfe von [createTemporarySystemFunction](https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableEnvironment.html#createTemporarySystemFunction-java.lang.String-java.lang.Class-) in einer Notiz durchführen, kann dieselbe so referenziert werden, wie sie in einer anderen Notiz desselben Notizbuchs ist. 

  Der Vorgang *Als Anwendung bereitstellen* bezieht sich jedoch auf eine *einzelne* Notiz und nicht auf alle Notizen im Notebook. Wenn Sie Als Anwendung bereitstellen ausführen, werden nur die Inhalte der aktiven Notiz zur Generierung der Anwendung verwendet. Jede explizite Funktionsregistrierung, die in anderen Notebooks durchgeführt wird, ist nicht Teil der generierten Anwendungsabhängigkeiten. Darüber hinaus erfolgt bei der Option „Als Anwendung bereitstellen“ eine implizite Funktionsregistrierung, indem der Hauptklassenname von JAR in eine Zeichenfolge in Kleinbuchstaben umgewandelt wird.

   Wenn `TextAnalyticsUDF` beispielsweise die Hauptklasse für UDF-JAR ist, führt eine implizite Registrierung zum Funktionsnamen `textanalyticsudf`. Wenn also eine explizite Funktionsregistrierung in Notiz 1 von Studio wie folgt erfolgt, dann können alle anderen Notizen in diesem Notebook (z. B. Notiz 2) aufgrund des gemeinsamen Interpreters mit dem Namen `myNewFuncNameForClass` auf die Funktion verweisen:

  `stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())`

   Bei der Bereitstellung als Anwendung in Notiz 2 ist diese explizite Registrierung jedoch *nicht in den Abhängigkeiten enthalten*, sodass die bereitgestellte Anwendung nicht wie erwartet funktioniert. Aufgrund der impliziten Registrierung wird standardmäßig erwartet, dass alle Verweise auf diese Funktion mit `textanalyticsudf` und nicht `myNewFuncNameForClass` erfolgen.

   Falls eine Registrierung von benutzerdefinierten Funktionsnamen erforderlich ist, wird davon ausgegangen, dass Notiz 2 selbst einen weiteren Absatz enthält, in dem eine weitere explizite Registrierung wie folgt durchgeführt wird: 

  ```
  %flink(parallelism=l)
  import com.amazonaws.kinesis.udf.textanalytics.TextAnalyticsUDF 
  # re-register the JAR for UDF with custom name
  stenv.createTemporarySystemFunction("myNewFuncNameForClass", new TextAnalyticsUDF())
  ```

  ```
  %flink. ssql(type=update, parallelism=1) 
  INSERT INTO
      table2
  SELECT
      myNewFuncNameForClass(column_name)
  FROM
      table1
  ;
  ```
+ Wenn Ihr UDF-JAR Flink enthält SDKs, konfigurieren Sie Ihr Java-Projekt so, dass der UDF-Quellcode mit dem Flink kompiliert werden kann SDKs, die Flink-SDK-Klassen selbst jedoch nicht im Build-Artefakt enthalten sind, z. B. in der JAR. 

  Sie können den `provided`-Scope in Apache Maven, `compileOnly`-Abhängigkeitsdeklarationen in Gradle, `provided`-Scope in SBT oder eine gleichwertige Direktive in der Build-Konfiguration ihres UDF-Projekts verwenden. Beziehen Sie sich auf dieses [POM](https://github.com/aws-samples/kinesis-udfs-textanalytics/blob/ec27108faa48f1a4c5d173ed3a2ef4565b58b5b5/kinesis-udfs-textanalytics-linear/pom.xml#L47) aus dem UDF-JAR-Beispiel, das diese Voraussetzung für ein Maven-Projekt erfüllt. Ein vollständiges step-by-step Tutorial finden Sie unter [Übersetzen, Redigieren und Analysieren von Streaming-Daten mithilfe von SQL-Funktionen mit Amazon Managed Service für Apache Flink, Amazon Translate und Amazon Comprehend](https://aws.amazon.com/blogs/machine-learning/translate-redact-and-analyze-streaming-data-using-sql-functions-with-amazon-kinesis-data-analytics-amazon-translate-and-amazon-comprehend/).

# Aktivieren von Checkpointing
<a name="how-zeppelin-checkpoint"></a>

Sie aktivieren Checkpointing mithilfe der Umgebungseinstellungen. Informationen zum Checkpointing finden Sie unter [Fehlertoleranz](https://docs.aws.amazon.com/managed-flink/latest/java/how-fault.html) im [Managed Service für Apache Flink Entwicklerhandbuch](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Stellen Sie das Checkpoint-Intervall ein
<a name="how-zeppelin-checkpoint-interval"></a>

Im folgenden Scala-Codebeispiel wird das Checkpointing-Intervall Ihrer Anwendung auf eine Minute festgelegt:

```
// start a checkpoint every 1 minute
stenv.enableCheckpointing(60000)
```

Im folgenden Python-Codebeispiel wird das Checkpointing-Intervall Ihrer Anwendung auf eine Minute festgelegt:

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval", "1min"
)
```

## Stellen Sie den Checkpoint-Typ ein
<a name="how-zeppelin-checkpoint-type"></a>

Das folgende Scala-Codebeispiel setzt den Checkpoint-Modus Ihrer Anwendung auf `EXACTLY_ONCE` (Standard):

```
// set mode to exactly-once (this is the default)
stenv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
```

Das folgende Python-Codebeispiel setzt den Checkpoint-Modus Ihrer Anwendung auf `EXACTLY_ONCE` (Standard):

```
st_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode", "EXACTLY_ONCE"
)
```

# Aktualisieren Sie Studio Runtime
<a name="upgrading-studio-runtime"></a>

Dieser Abschnitt enthält Informationen zum Upgrade Ihrer Studio-Notebook-Runtime. Wir empfehlen, dass Sie immer auf die neueste unterstützte Studio Runtime aktualisieren.

## Führen Sie ein Upgrade Ihres Notebooks auf eine neue Studio Runtime durch
<a name="upgrading-notebook"></a>

Je nachdem, wie Sie Studio verwenden, unterscheiden sich die Schritte zum Upgrade Ihrer Runtime. Wählen Sie die Option, die zu Ihrem Anwendungsfall passt.

### SQL-Abfragen oder Python-Code ohne externe Abhängigkeiten
<a name="notebook-no-dependencies"></a>

Wenn Sie SQL oder Python ohne externe Abhängigkeiten verwenden, verwenden Sie den folgenden Runtime-Upgrade-Prozess. Wir empfehlen Ihnen, auf die neueste Runtime-Version zu aktualisieren. Der Upgrade-Vorgang ist derselbe, unabhängig von der Runtime-Version, von der aus Sie das Upgrade durchführen. 

1. Erstellen Sie ein neues Studio-Notizbuch mit der neuesten Runtime.

1. Kopieren Sie den Code jeder Notiz aus dem alten Notizbuch und fügen Sie ihn in das neue Notizbuch ein.

1. Passen Sie den Code im neuen Notizbuch so an, dass er mit allen Apache Flink-Funktionen kompatibel ist, die sich gegenüber der vorherigen Version geändert haben.
   + Führen Sie das neue Notizbuch aus. Öffnen Sie das Notizbuch und führen Sie es Note für Note nacheinander aus und testen Sie, ob es funktioniert.
   + Nehmen Sie alle erforderlichen Änderungen am Code vor.
   + Stoppen Sie das neue Notizbuch.

1. Wenn Sie das alte Notizbuch als Anwendung bereitgestellt hätten:
   + Stellen Sie das neue Notebook als separate, neue Anwendung bereit.
   + Stoppen Sie die alte Anwendung.
   + Führen Sie die neue Anwendung ohne Snapshot aus.

1. Stoppen Sie das alte Notebook, falls es läuft. Starten Sie das neue Notizbuch nach Bedarf für die interaktive Nutzung.

**Prozessablauf für Upgrades ohne externe Abhängigkeiten**

![\[Das folgende Diagramm stellt den empfohlenen Arbeitsablauf für die Aktualisierung Ihres Notebooks ohne externe Abhängigkeiten dar.\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/images/MSF-Studio-upgrade-without-dependencies.png)


### SQL-Abfragen oder Python-Code mit externen Abhängigkeiten
<a name="notebook-dependencies"></a>

Gehen Sie wie folgt vor, wenn Sie SQL oder Python verwenden und externe Abhängigkeiten wie Konnektoren oder benutzerdefinierte Artefakte verwenden, z. B. benutzerdefinierte Funktionen, die in Python oder Java implementiert sind. Wir empfehlen Ihnen, auf die neueste Runtime zu aktualisieren. Der Vorgang ist derselbe, unabhängig von der Runtime-Version, von der aus Sie das Upgrade durchführen.

1. Erstellen Sie ein neues Studio-Notizbuch mit der neuesten Runtime.

1. Kopieren Sie den Code jeder Notiz aus dem alten Notizbuch und fügen Sie ihn in das neue Notizbuch ein.

1. Aktualisieren Sie die externen Abhängigkeiten und benutzerdefinierten Artefakte.
   + Suchen Sie nach neuen Konnektoren, die mit der Apache Flink-Version der neuen Runtime kompatibel sind. Die richtigen [Konnektoren für die Flink-Version finden Sie in der Apache Flink-Dokumentation unter Table & SQL](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/overview/) Connectors.
   + Aktualisieren Sie den Code der benutzerdefinierten Funktionen, sodass er den Änderungen in der Apache Flink-API und allen Python- oder JAR-Abhängigkeiten entspricht, die von den benutzerdefinierten Funktionen verwendet werden. Verpacken Sie Ihr aktualisiertes benutzerdefiniertes Artefakt erneut.
   + Fügen Sie diese neuen Anschlüsse und Artefakte dem neuen Notizbuch hinzu.

1. Passen Sie den Code im neuen Notizbuch so an, dass er mit allen Apache Flink-Funktionen kompatibel ist, die sich gegenüber der vorherigen Version geändert haben.
   + Führen Sie das neue Notizbuch aus. Öffnen Sie das Notizbuch und führen Sie es Note für Note nacheinander aus und testen Sie, ob es funktioniert.
   + Nehmen Sie alle erforderlichen Änderungen am Code vor.
   + Stoppen Sie das neue Notizbuch.

1. Wenn Sie das alte Notizbuch als Anwendung bereitgestellt hätten:
   + Stellen Sie das neue Notebook als separate, neue Anwendung bereit.
   + Stoppen Sie die alte Anwendung.
   + Führen Sie die neue Anwendung ohne Snapshot aus.

1. Stoppen Sie das alte Notebook, falls es läuft. Starten Sie das neue Notizbuch nach Bedarf für die interaktive Nutzung.

**Prozessablauf für das Upgrade mit externen Abhängigkeiten**

![\[Das folgende Diagramm stellt den empfohlenen Arbeitsablauf für die Aktualisierung Ihres Notebooks mit externen Abhängigkeiten dar.\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/images/MSF-Studio-upgrade-with-dependencies.png)


# Arbeiten Sie mit AWS Glue
<a name="how-zeppelin-glue"></a>

Ihr Studio-Notizbuch speichert und ruft Informationen über seine Datenquellen und Datenquellen ab AWS Glue. Wenn Sie Ihr Studio-Notizbuch erstellen, geben Sie die AWS Glue Datenbank an, die Ihre Verbindungsinformationen enthält. Wenn Sie auf Ihre Datenquellen und Datensenken zugreifen, geben Sie die in der Datenbank enthaltenen AWS Glue Tabellen an. Ihre AWS Glue Tabellen bieten Zugriff auf die AWS Glue Verbindungen, die die Speicherorte, Schemas und Parameter Ihrer Datenquellen und Ziele definieren.

Studio-Notebooks verwenden Tabelleneigenschaften, um anwendungsspezifische Daten zu speichern. Weitere Informationen finden Sie unter [Tabelleneigenschaften](how-zeppelin-glue-properties.md).

Ein Beispiel für die Einrichtung einer AWS Glue Verbindung, einer Datenbank und einer Tabelle für die Verwendung mit Studio-Notebooks finden Sie [Erstellen Sie eine Datenbank AWS Glue](example-notebook.md#example-notebook-glue) im [Tutorial: Erstellen Sie ein Studio-Notizbuch in Managed Service für Apache Flink](example-notebook.md) Tutorial.

# Tabelleneigenschaften
<a name="how-zeppelin-glue-properties"></a>

Zusätzlich zu den Datenfeldern stellen Ihre AWS Glue Tabellen mithilfe von Tabelleneigenschaften weitere Informationen für Ihr Studio-Notizbuch bereit. Managed Service für Apache Flink verwendet die folgenden AWS Glue Tabelleneigenschaften:
+ [Definieren Sie Apache Flink-Zeitwerte](#how-zeppelin-glue-timestamp): Diese Eigenschaften definieren, wie Managed Service für Apache Flink interne Datenverarbeitungszeitwerte von Apache Flink ausgibt.
+ [Verwenden Sie den Flink-Anschluss und die Formateigenschaften](#how-zeppelin-glue-connector): Diese Eigenschaften liefern Informationen über Ihre Datenströme.

Gehen Sie wie folgt vor, um einer AWS Glue Tabelle eine Eigenschaft hinzuzufügen:

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Wählen Sie aus der Tabellenliste die Tabelle aus, die Ihre Anwendung zum Speichern von Datenverbindungsinformationen verwendet. Wählen Sie **Aktion**, **Tabellendetails bearbeiten** aus.

1. Geben Sie unter **Tabelleneigenschaften** den Wert **managed-flink.proctime** für **Schlüssel** und **user\$1action\$1time** für **Wert** ein.

## Definieren Sie Apache Flink-Zeitwerte
<a name="how-zeppelin-glue-timestamp"></a>

Apache Flink stellt Zeitwerte bereit, die beschreiben, wann Ereignisse bei der Stream-Verarbeitung aufgetreten sind, z. B. [Verarbeitungszeit](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) und [ Ereigniszeit](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time). Um diese Werte in Ihre Anwendungsausgabe aufzunehmen, definieren Sie Eigenschaften in Ihrer AWS Glue Tabelle, die die Laufzeit von Managed Service for Apache Flink anweisen, diese Werte in die angegebenen Felder auszugeben. 

Die Schlüssel und Werte, die Sie in Ihren Tabelleneigenschaften verwenden, lauten wie folgt:


| Zeitstempeltyp | Key (Schlüssel) | Value (Wert) | 
| --- |--- |--- |
| [Dauer der Verarbeitung](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#processing-time) | managed-flink.proctime | Der Spaltenname, der verwendet AWS Glue wird, um den Wert verfügbar zu machen. Dieser Spaltenname entspricht keiner vorhandenen Tabellenspalte. | 
| [Zeit des Ereignisses](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/streaming/time_attributes.html#event-time) | managed-flink.rowtime | Der Spaltenname, der verwendet AWS Glue wird, um den Wert verfügbar zu machen. Dieser Spaltenname entspricht einer vorhandenen Tabellenspalte. | 
| managed-flink.watermark. *column\$1name*. Millisekunden | Das Wasserzeichenintervall in Millisekunden | 

## Verwenden Sie den Flink-Anschluss und die Formateigenschaften
<a name="how-zeppelin-glue-connector"></a>

Mithilfe von AWS Glue -Tabelleneigenschaften stellen Sie den Flink-Konnektoren Ihrer Anwendung Informationen über Ihre Datenquellen zur Verfügung. Im Folgenden einige Beispiele für die Eigenschaften, die Managed Service für Apache Flink für Konnektoren verwendet:


| Konnektortyp | Key (Schlüssel) | Value (Wert) | 
| --- |--- |--- |
| [Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/kafka.html#connector-options) | format | Das Format, das zur Deserialisierung und Serialisierung von Kafka-Nachrichten verwendet wird, z. B. oder. json csv | 
| scan.startup.mode | Der Startmodus für den Kafka-Verbraucher, z. B. oder. earliest-offset timestamp | 
| [Kinesis](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html#connector-options) | format | Das Format, das zum Deserialisieren und Serialisieren von Kinesis-Datenstream-Datensätzen verwendet wird, z. B. oder. json csv | 
| aws.region | Die AWS Region, in der der Stream definiert ist.  | 
| [S3 (Dateisystem)](https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html) | Format | Das Format, das zum Deserialisieren und Serialisieren von Dateien verwendet wird, z. B. oder. json csv | 
| path | Der Amazon S3 S3-Pfad, z. s3://mybucket/ B. | 

Weitere Informationen zu anderen Konnektoren neben Kinesis und Apache Kafka finden Sie in der Dokumentation Ihres Konnektors.

# Beispiele und Tutorials für Studio-Notebooks in Managed Service für Apache Flink
<a name="how-zeppelin-examples"></a>

**Topics**
+ [

# Tutorial: Erstellen Sie ein Studio-Notizbuch in Managed Service für Apache Flink
](example-notebook.md)
+ [

# Tutorial: Stellen Sie ein Studio-Notebook als Managed Service für Apache Flink-Anwendung mit dauerhaftem Zustand bereit
](example-notebook-deploy.md)
+ [

# Sehen Sie sich Beispielabfragen zur Analyse von Daten in einem Studio-Notizbuch an
](how-zeppelin-sql-examples.md)

# Tutorial: Erstellen Sie ein Studio-Notizbuch in Managed Service für Apache Flink
<a name="example-notebook"></a>

Das folgende Tutorial zeigt, wie Sie ein Studio-Notebook erstellen, das Daten aus einem Kinesis-Datenstream oder einem Amazon MSK-Cluster liest.

**Topics**
+ [

## Erfüllen der Voraussetzungen
](#example-notebook-setup)
+ [

## Erstellen Sie eine Datenbank AWS Glue
](#example-notebook-glue)
+ [

## Nächste Schritte: Erstellen Sie ein Studio-Notizbuch mit Kinesis Data Streams oder Amazon MSK
](#examples-notebook-nextsteps)
+ [

# Erstellen Sie ein Studio-Notebook mit Kinesis Data Streams
](example-notebook-streams.md)
+ [

# Erstellen Sie ein Studio-Notebook mit Amazon MSK
](example-notebook-msk.md)
+ [

# Bereinigen Sie Ihre Anwendung und die abhängigen Ressourcen
](example-notebook-cleanup.md)

## Erfüllen der Voraussetzungen
<a name="example-notebook-setup"></a>

Stellen Sie sicher, dass Sie AWS CLI Version 2 oder höher haben. Informationen zur Installation der neuesten AWS CLI Version finden Sie unter [Installation, Aktualisierung und Deinstallation der AWS CLI Version 2.](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)

## Erstellen Sie eine Datenbank AWS Glue
<a name="example-notebook-glue"></a>

Ihr Studio-Notebook verwendet eine [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)-Datenbank für Metadaten zu Ihrer Amazon MSK-Datenquelle.

**Erstellen Sie eine AWS Glue Datenbank**

1. Öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Wählen Sie **Add database** (Datenbank hinzufügen). Geben Sie im Fenster **Datenbank hinzufügen** **default** als **Namen der Datenbank** ein. Wählen Sie **Erstellen** aus. 

## Nächste Schritte: Erstellen Sie ein Studio-Notizbuch mit Kinesis Data Streams oder Amazon MSK
<a name="examples-notebook-nextsteps"></a>

Mit diesem Tutorial können Sie ein Studio-Notebook erstellen, das entweder Kinesis Data Streams oder Amazon MSK verwendet:
+ [Erstellen Sie ein Studio-Notebook mit Kinesis Data Streams](example-notebook-streams.md): Mit Kinesis Data Streams können Sie schnell eine Anwendung erstellen, die einen Kinesis Data Stream als Quelle verwendet. Sie müssen nur einen Kinesis Data Stream als abhängige Ressource erstellen.
+ [Erstellen Sie ein Studio-Notebook mit Amazon MSK](example-notebook-msk.md): Mit Amazon MSK erstellen Sie eine Anwendung, die einen Amazon MSK-Cluster als Quelle verwendet. Sie müssen eine Amazon VPC, eine Amazon EC2-Client-Instance und einen Amazon MSK-Cluster als abhängige Ressourcen erstellen.

# Erstellen Sie ein Studio-Notebook mit Kinesis Data Streams
<a name="example-notebook-streams"></a>

In diesem Tutorial wird beschrieben, wie Sie ein Studio-Notebook erstellen, das einen Kinesis Data Stream als Quelle verwendet.

**Topics**
+ [

## Erfüllen der Voraussetzungen
](#example-notebook-streams-setup)
+ [

## Erstellen Sie eine Tabelle AWS Glue
](#example-notebook-streams-glue)
+ [

## Erstellen Sie ein Studio-Notebook mit Kinesis Data Streams
](#example-notebook-streams-create)
+ [

## Senden von Daten an den Kinesis Data Stream
](#example-notebook-streams-send)
+ [

## Testen Sie Ihr Studio-Notebook
](#example-notebook-streams-test)

## Erfüllen der Voraussetzungen
<a name="example-notebook-streams-setup"></a>

Bevor Sie ein Studio-Notebook erstellen, erstellen Sie einen Kinesis Data Stream (`ExampleInputStream`). Ihre Anwendung verwendet diesen Stream als Anwendungsquelle.

Sie können diesen Stream mithilfe der Amazon Kinesis-Konsole oder des folgenden AWS CLI -Befehls erstellen. Anweisungen für die Konsole finden Sie unter [Erstellen und Aktualisieren von Datenströmen](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) im *Amazon Kinesis Data Streams Entwicklerhandbuch*. Benennen Sie den Stream **ExampleInputStream** und legen Sie die **Anzahl der offenen Shards** auf **1** fest.

Verwenden Sie den folgenden Amazon Kinesis `create-stream` AWS CLI Kinesis-Befehl AWS CLI, um den Stream (`ExampleInputStream`) mit dem zu erstellen.

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## Erstellen Sie eine Tabelle AWS Glue
<a name="example-notebook-streams-glue"></a>

Ihr Studio-Notebook verwendet eine [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)-Datenbank für Metadaten zu Ihrer Kinesis Data Streams-Datenquelle.

**Anmerkung**  
Sie können die Datenbank entweder zuerst manuell erstellen oder sie beim Erstellen des Notebooks von Managed Service für Apache Flink für Sie erstellen lassen. Ebenso können Sie die Tabelle entweder manuell erstellen, wie im folgenden Abschnitt beschrieben, oder Sie können den Konnektorcode zum Erstellen einer Tabelle für Managed Service für Apache Flink in Ihrem Notebook innerhalb von Apache Zeppelin verwenden, um Ihre Tabelle über eine DDL-Anweisung zu erstellen. Sie können dann einchecken AWS Glue , um sicherzustellen, dass die Tabelle korrekt erstellt wurde.

**Erstellen einer Tabelle**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Wenn Sie noch keine AWS Glue Datenbank haben, wählen Sie in der linken Navigationsleiste **Datenbanken** aus. Wählen Sie **Datenbank hinzufügen**. Geben Sie im Fenster **Datenbank hinzufügen** **default** als **Namen der Datenbank** ein. Wählen Sie **Create (Erstellen)** aus.

1. Wählen Sie in der linken Navigationsleiste die Option **Tabellen**. Wählen Sie auf der Seite **Tabellen** die Optionen **Tabellen hinzufügen**, **Tabelle manuell hinzufügen** aus.

1. Geben Sie auf der Seite **Eigenschaften Ihrer Tabelle einrichten** **stock** als **Tabellennamen** ein. Stellen Sie sicher, dass Sie die Datenbank auswählen, die Sie zuvor erstellt haben. Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Datenstore hinzufügen** die Option **Kinesis** aus. Geben Sie als **Streamnamen** **ExampleInputStream** ein. Wählen Sie für **Kinesis-Quell-URL** die Eingabetaste **https://kinesis.us-east-1.amazonaws.com**. Wenn Sie die **Kinesis-Quell-URL** kopieren und einfügen, achten Sie darauf, alle führenden oder nachfolgenden Leerzeichen zu löschen. Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Klassifikation** die Option **JSON** aus. Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Schema definieren** die Option „Spalte hinzufügen“, um eine Spalte hinzuzufügen. Fügen Sie Spalten mit den folgenden Eigenschaften hinzu:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/example-notebook-streams.html)

   Wählen Sie **Weiter**.

1. Überprüfen Sie auf der nächsten Seite Ihre Einstellungen und wählen Sie **Fertigstellen.**

1. Wählen Sie die neu erstellte Tabelle aus der Liste der Tabellen aus.

1. Wählen Sie **Tabelle bearbeiten** und fügen Sie eine Eigenschaft mit dem Schlüssel `managed-flink.proctime` und dem Wert `proctime` hinzu.

1. Wählen Sie **Anwenden** aus.

## Erstellen Sie ein Studio-Notebook mit Kinesis Data Streams
<a name="example-notebook-streams-create"></a>

Nachdem Sie die Ressourcen erstellt haben, die Ihre Anwendung verwendet, erstellen Sie Ihr Studio-Notebook. 

**Topics**
+ [

### Erstellen Sie ein Studio-Notizbuch mit dem AWS-Managementkonsole
](#example-notebook-create-streams-console)
+ [

### Erstellen Sie ein Studio-Notizbuch mit dem AWS CLI
](#example-notebook-msk-create-api)

### Erstellen Sie ein Studio-Notizbuch mit dem AWS-Managementkonsole
<a name="example-notebook-create-streams-console"></a>

1. Die Managed Service for Apache Flink-Konsole zu [ https://console.aws.amazon.com/managed-flink/Hause öffnen? region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard). 

1. Wählen Sie auf der Seite **Managed Service für Apache Flink-Anwendungen** die Registerkarte **Studio** aus. Wählen Sie **Studio-Notebook erstellen**.
**Anmerkung**  
Sie können ein Studio-Notebook auch über die Amazon MSK- oder Kinesis Data Streams-Konsolen erstellen, indem Sie Ihren Amazon MSK-Eingabe-Cluster oder Kinesis Data Stream auswählen und dann **Daten in Echtzeit verarbeiten** auswählen.

1. Geben Sie auf der Seite **Notebook-Instance erstellen** folgende Informationen ein:
   + Geben Sie **MyNotebook** als Namen des Notebooks ein.
   + Wählen Sie **Standard** für die **AWS -Glue-Datenbank**.

   Wählen Sie **Studio-Notebook erstellen**.

1. **Wählen Sie auf der Seite „Ausführen“ aus. **MyNotebook**** Warten Sie, bis der **Status** **Wird ausgeführt** angezeigt wird. Es fallen Gebühren an, wenn das Notebook läuft.

### Erstellen Sie ein Studio-Notizbuch mit dem AWS CLI
<a name="example-notebook-msk-create-api"></a>

Gehen Sie wie folgt vor AWS CLI, um Ihr Studio-Notizbuch mit dem zu erstellen:

1. Überprüfen Sie die Konto-ID. Sie benötigen diesen Wert, um die Anwendung zu erstellen.

1. Erstellen Sie die Rolle `arn:aws:iam::AccountID:role/ZeppelinRole` und fügen Sie der automatisch erstellten Rolle über die Konsole die folgenden Berechtigungen hinzu.

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. Erstellen Sie eine Datei mit dem Namen `create.json` und den folgenden Inhalten. Ersetzen Sie die Platzhalterwerte durch Ihre Informationen.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Um Ihre Anwendung zu erstellen, führen Sie den folgenden Befehl aus.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Wenn der Befehl abgeschlossen ist, sehen Sie eine Ausgabe, die die Details für Ihr neues Studio-Notebook enthält. Es folgt ein Beispiel für die Ausgabe.

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Um Ihre Anwendung zu starten, führen Sie den folgenden Befehl aus. Ersetzen Sie die Beispielwerte durch Ihre Konto-ID.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Senden von Daten an den Kinesis Data Stream
<a name="example-notebook-streams-send"></a>

Gehen Sie wie folgt vor, um Testdaten an Ihren Kinesis Data Stream zu senden:

1. Öffnen Sie den [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html). 

1. Wählen Sie „**Cognito-Benutzer erstellen mit CloudFormation**“.

1. Die CloudFormation Konsole wird mit der Kinesis Data Generator-Vorlage geöffnet. Wählen Sie **Weiter**.

1. Auf der Seite **Festlegen von Komponentendetails** geben Sie den Benutzernamen und das Passwort für Ihren Cognito-Benutzer ein. Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Stack-Optionen konfigurieren** **Weiter** aus.

1. Wählen Sie auf der Seite „** Kinesis-Data-Generator-CognitoBenutzer überprüfen**“ die Option **Ich bestätige, dass AWS CloudFormation möglicherweise IAM-Ressourcen erstellt** werden. Kontrollkästchen. Wählen Sie **Stapel erstellen** aus.

1. Warten Sie, bis der CloudFormation Stapel fertig erstellt ist. **Nachdem der Stack abgeschlossen ist, öffnen Sie den **Kinesis-Data-Generator-Cognito-User-Stack** in der Konsole und wählen Sie die Registerkarte Outputs aus. CloudFormation ** **KinesisDataGeneratorUrl**Öffnen Sie die URL, die für den Ausgabewert aufgeführt ist.

1. Melden Sie sich auf der **Amazon Kinesis Data Generator**-Seite mit den Anmeldeinformationen an, die Sie in Schritt 4 erstellt haben.

1. Geben Sie auf der nächsten Seite die folgenden Werte an:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/example-notebook-streams.html)

   Fügen Sie für **Datensatzvorlage** den folgenden Code ein:

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. Wählen Sie **Daten senden** aus.

1. Der Generator sendet Daten an den Kinesis Data Stream. 

   Lassen Sie den Generator laufen, während Sie den nächsten Abschnitt abschließen.

## Testen Sie Ihr Studio-Notebook
<a name="example-notebook-streams-test"></a>

In diesem Abschnitt verwenden Sie Ihr Studio-Notebook, um Daten aus Ihrem Kinesis Data Stream abzufragen.

1. Die Managed Service for Apache Flink-Konsole zu [ https://console.aws.amazon.com/managed-flink/Hause öffnen? region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Wählen Sie auf der Seite **Managed Service für Apache Flink-Anwendungen** die Registerkarte **Studio-Notebook** aus. Wählen Sie **MyNotebook**.

1. **Wählen Sie auf der Seite „In Apache Zeppelin öffnen“. **MyNotebook****

   Die Oberfläche von Apache Zeppelin wird in einer neuen Registerkarte geöffnet.

1. Auf der Seite **Willkommen bei Zeppelin\$1** wählen Sie **Zeppelin Notiz** aus.

1. Geben Sie auf der Seite **Zeppelin Notiz** die folgende Abfrage in eine neue Notiz ein:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Wählen Sie das Ausführungssymbol.

   Nach kurzer Zeit werden in der Notiz Daten aus dem Kinesis Data Stream angezeigt.

Um das Apache Flink-Dashboard für Ihre Anwendung zu öffnen und betriebliche Aspekte zu sehen, wählen Sie **FLINK JOB**. Weitere Informationen zum Flink-Dashboard finden Sie unter [Apache Flink-Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) im [Managed Service für Apache Flink Entwicklerhandbuch](https://docs.aws.amazon.com/).

Weitere Beispiele für Flink-Streaming-SQL-Abfragen finden Sie unter [Abfragen](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in der [Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Erstellen Sie ein Studio-Notebook mit Amazon MSK
<a name="example-notebook-msk"></a>

In diesem Tutorial wird beschrieben, wie Sie ein Studio-Notebook erstellen, das einen Amazon-MSK-Cluster als Quelle verwendet.

**Topics**
+ [

## Einen Amazon MSK-Cluster einrichten
](#example-notebook-msk-setup)
+ [

## Fügen Sie Ihrer VPC ein NAT-Gateway hinzu
](#example-notebook-msk-nat)
+ [

## Erstellen Sie eine AWS Glue Verbindung und eine Tabelle
](#example-notebook-msk-glue)
+ [

## Erstellen Sie ein Studio-Notebook mit Amazon MSK
](#example-notebook-msk-create)
+ [

## Senden Sie Daten an Ihren Amazon MSK-Cluster
](#example-notebook-msk-send)
+ [

## Testen Sie Ihr Studio-Notebook
](#example-notebook-msk-test)

## Einen Amazon MSK-Cluster einrichten
<a name="example-notebook-msk-setup"></a>

Für dieses Tutorial benötigen Sie einen Amazon-MSK-Cluster, der Klartextzugriff ermöglicht. Wenn Sie noch keinen Amazon MSK-Cluster eingerichtet haben, folgen Sie dem Tutorial [Erste Schritte mit Amazon MSK, um eine Amazon](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) VPC, einen Amazon MSK-Cluster, ein Thema und eine Amazon-Client-Instance zu erstellen. EC2 

Gehen Sie beim Befolgen des Tutorials wie folgt vor:
+ Ändern Sie in [Schritt 3: Amazon MSK-Cluster erstellen](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html) bei Schritt 4 den `ClientBroker`-Wert von `TLS` auf **PLAINTEXT**.

## Fügen Sie Ihrer VPC ein NAT-Gateway hinzu
<a name="example-notebook-msk-nat"></a>

Wenn Sie einen Amazon MSK-Cluster erstellt haben, indem Sie dem Tutorial [Erste Schritte mit Amazon MSK](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html) gefolgt sind, oder wenn Ihre bestehende Amazon VPC noch kein NAT-Gateway für ihre privaten Subnetze hat, müssen Sie Ihrer Amazon VPC ein NAT-Gateway hinzufügen. Das folgende Diagramm zeigt die Architektur. 

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/images/vpc_05.png)


Gehen Sie wie folgt vor, um ein NAT-Gateway für Ihre Amazon VPC zu erstellen:

1. Öffnen Sie die Amazon-VPC-Konsole unter [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Wählen Sie in der linken Navigationsleiste **NAT-Gateway** aus.

1. Wählen Sie auf der Seite **NAT-Gateways** die Option **NAT-Gateway erstellen** aus.

1. Geben Sie auf der Seite **NAT-Gateway erstellen** die folgenden Werte an:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/example-notebook-msk.html)

   Wählen Sie **NAT-Gateway erstellen** aus.

1. Wählen Sie in der linken Navigationsleiste **Routing-Tabellen** aus.

1. Klicken Sie auf **Create Route Table (Routing-Tabelle erstellen)**.

1. Geben Sie auf der Seite **Routing-Tabelle erstellen** folgende Informationen ein:
   + **Name-Tag:** **ZeppelinRouteTable**
   + **VPC**: Wählen Sie Ihre VPC (z. B. **AWS KafkaTutorialVPC**).

   Wählen Sie **Create (Erstellen)** aus.

1. Wählen Sie in der Liste der Routentabellen. **ZeppelinRouteTable** Klicken Sie auf der Registerkarte **Routen** auf **Routen bearbeiten**.

1. Wählen Sie auf der Seite **Routen bearbeiten** die Option **Route hinzufügen** aus.

1. Geben Sie im ****Für-**Ziel** **0.0.0.0/0** ein. Wählen Sie für **Target** die Option **NAT Gateway**, **ZeppelinGateway**. Wählen Sie **Routen speichern** aus. Klicken Sie auf **Close** (Schließen).

1. Wählen Sie auf der Seite Routing-Tabellen die **ZeppelinRouteTable**Option „**Subnetzzuordnungen**“ aus. Wählen Sie **Subnetzzuordnungen bearbeiten** aus.

1. **Wählen Sie auf der Seite **Subnetzzuordnungen bearbeiten** die Optionen **AWS KafkaTutorialSubnet2 und AWS KafkaTutorialSubnet 3** aus.** Wählen Sie **Save (Speichern)** aus.

## Erstellen Sie eine AWS Glue Verbindung und eine Tabelle
<a name="example-notebook-msk-glue"></a>

Ihr Studio-Notebook verwendet eine [AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)-Datenbank für Metadaten zu Ihrer Amazon MSK-Datenquelle. In diesem Abschnitt erstellen Sie eine AWS Glue Verbindung, die beschreibt, wie Sie auf Ihren Amazon MSK-Cluster zugreifen können, und eine AWS Glue Tabelle, die beschreibt, wie Sie die Daten in Ihrer Datenquelle für Clients wie Ihr Studio-Notebook präsentieren. 

**Eine Verbindung erstellen**

1. Melden Sie sich bei der an AWS-Managementkonsole und öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Wenn Sie noch keine AWS Glue Datenbank haben, wählen Sie in der linken Navigationsleiste **Datenbanken** aus. Wählen Sie **Datenbank hinzufügen**. Geben Sie im Fenster **Datenbank hinzufügen** **default** als **Namen der Datenbank** ein. Wählen Sie **Create (Erstellen)** aus.

1. Wählen Sie in der linken Navigationsleiste **Verbindungen** aus. Wählen Sie **Verbindung hinzufügen** aus.

1. Geben Sie im Fenster **Verbindung hinzufügen** die folgenden Werte ein:
   + Geben Sie für **Verbindungsname** **ZeppelinConnection** ein.
   + Wählen Sie für **Verbindungstyp** den Eintrag **Kafka**.
   + Geben Sie für den **Kafka-Bootstrap-Server URLs** die Bootstrap-Broker-String für Ihren Cluster an. Sie können die Bootstrap-Broker entweder über die MSK-Konsole oder durch Eingabe des folgenden CLI-Befehls abrufen:

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + Deaktivieren Sie das Kontrollkästchen **SSL-Verbindung erforderlich**.

   Wählen Sie **Weiter**.

1. Geben Sie auf der **VPC**-Seite die folgenden Werte an:
   + Wählen Sie für **VPC** den Namen Ihrer VPC (z. B. ** AWS KafkaTutorialVPC**).
   + **Wählen Sie für **Subnetz 2** aus.AWS KafkaTutorialSubnet**
   + Wählen Sie für **Sicherheitsgruppen** alle verfügbaren Gruppen aus.

   Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Verbindungseigenschaften** / **Verbindungszugriff** die Option **Fertigstellen** aus.

**Erstellen einer Tabelle**
**Anmerkung**  
Sie können die Tabelle entweder manuell erstellen, wie in den folgenden Schritten beschrieben, oder Sie können den Konnektorcode zum Erstellen einer Tabelle für Managed Service für Apache Flink in Ihrem Notebook innerhalb von Apache Zeppelin verwenden, um Ihre Tabelle über eine DDL-Anweisung zu erstellen. Sie können dann einchecken AWS Glue , um sicherzustellen, dass die Tabelle korrekt erstellt wurde.

1. Wählen Sie in der linken Navigationsleiste die Option **Tabellen**. Wählen Sie auf der Seite **Tabellen** die Optionen **Tabellen hinzufügen**, **Tabelle manuell hinzufügen** aus.

1. Geben Sie auf der Seite **Eigenschaften Ihrer Tabelle einrichten** **stock** als **Tabellennamen** ein. Stellen Sie sicher, dass Sie die Datenbank auswählen, die Sie zuvor erstellt haben. Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Datenspeicher hinzufügen** die Option **Kafka** aus. Geben Sie als **Themennamen** Ihren Themennamen ein (z. B. **AWS KafkaTutorialTopic**). Wählen Sie für **Verbindung **ZeppelinConnection****.

1. Wählen Sie auf der Seite **Klassifikation** die Option **JSON** aus. Wählen Sie **Weiter**.

1. Wählen Sie auf der Seite **Schema definieren** die Option „Spalte hinzufügen“, um eine Spalte hinzuzufügen. Fügen Sie Spalten mit den folgenden Eigenschaften hinzu:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/example-notebook-msk.html)

   Wählen Sie **Weiter**.

1. Überprüfen Sie auf der nächsten Seite Ihre Einstellungen und wählen Sie **Fertigstellen.**

1. Wählen Sie die neu erstellte Tabelle aus der Liste der Tabellen aus.

1. Wählen Sie **Tabelle bearbeiten** und fügen Sie die folgenden Eigenschaften hinzu:
   + Schlüssel:`managed-flink.proctime`, Wert: `proctime`
   + Schlüssel:`flink.properties.group.id`, Wert: `test-consumer-group`
   + Schlüssel:`flink.properties.auto.offset.reset`, Wert: `latest`
   + Schlüssel:`classification`, Wert: `json`

   Ohne diese Schlüssel/Wert-Paare tritt im Flink-Notebook ein Fehler auf. 

1. Wählen Sie **Anwenden** aus.

## Erstellen Sie ein Studio-Notebook mit Amazon MSK
<a name="example-notebook-msk-create"></a>

Nachdem Sie die Ressourcen erstellt haben, die Ihre Anwendung verwendet, erstellen Sie Ihr Studio-Notebook. 

**Topics**
+ [

### Erstellen Sie ein Studio-Notizbuch mit dem AWS-Managementkonsole
](#example-notebook-create-msk-console)
+ [

### Erstellen Sie ein Studio-Notizbuch mit dem AWS CLI
](#example-notebook-msk-create-api)

**Anmerkung**  
Sie können ein Studio-Notebook auch von der Amazon MSK-Konsole aus erstellen, indem Sie einen vorhandenen Cluster auswählen und dann **Daten in Echtzeit verarbeiten** wählen.

### Erstellen Sie ein Studio-Notizbuch mit dem AWS-Managementkonsole
<a name="example-notebook-create-msk-console"></a>

1. Die Managed Service for Apache Flink-Konsole zu [ https://console.aws.amazon.com/managed-flink/Hause öffnen? region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Wählen Sie auf der Seite **Managed Service für Apache Flink-Anwendungen** die Registerkarte **Studio** aus. Wählen Sie **Studio-Notebook erstellen**.
**Anmerkung**  
Um ein Studio-Notebook über die Amazon MSK- oder Kinesis Data Streams-Konsolen zu erstellen, wählen Sie Ihren Amazon MSK-Eingabe-Cluster oder Kinesis Data Stream aus und wählen Sie dann **Daten in Echtzeit verarbeiten** aus.

1. Geben Sie auf der Seite **Notebook-Instance erstellen** folgende Informationen ein:
   + Geben Sie **MyNotebook** als **Studio-Notebookname**.
   + Wählen Sie **Standard** für die **AWS -Glue-Datenbank**.

   Wählen Sie **Studio-Notebook erstellen**.

1. **Wählen Sie auf der Seite die Registerkarte Konfiguration aus. **MyNotebook**** Wählen Sie im Abschnitt **Netzwerk** die Option **Bearbeiten**.

1. Wählen Sie auf der MyNotebook Seite **Netzwerk bearbeiten für** die **VPC-Konfiguration basierend auf dem Amazon MSK-Cluster** aus. Wählen Sie Ihren Amazon MSK-Cluster für den **Amazon MSK-Cluster** aus. Wählen Sie **Änderungen speichern**.

1. **Wählen Sie auf der **MyNotebook**Seite die Option Ausführen aus.** Warten Sie, bis der **Status** **Wird ausgeführt** angezeigt wird.

### Erstellen Sie ein Studio-Notizbuch mit dem AWS CLI
<a name="example-notebook-msk-create-api"></a>

Gehen Sie wie folgt vor AWS CLI, um Ihr Studio-Notizbuch mit dem zu erstellen:

1. Stellen Sie sicher, dass Sie über die folgenden Informationen verfügen: Sie benötigen diese Werte, um Ihre Anwendung zu erstellen.
   + Ihre Konto-ID.
   + Das Subnetz IDs und die Sicherheitsgruppen-ID für die Amazon VPC, die Ihren Amazon MSK-Cluster enthält.

1. Erstellen Sie eine Datei mit dem Namen `create.json` und den folgenden Inhalten. Ersetzen Sie die Platzhalterwerte durch Ihre Informationen.

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. Um Ihre Anwendung zu erstellen, führen Sie den folgenden Befehl aus.

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. Wenn der Befehl abgeschlossen ist, sollte eine Ausgabe wie die folgende angezeigt werden, die die Details für Ihr neues Studio-Notebook enthält:

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. Um Ihre Anwendung zu starten, führen Sie den folgenden Befehl aus. Ersetzen Sie die Beispielwerte durch Ihre Konto-ID.

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Senden Sie Daten an Ihren Amazon MSK-Cluster
<a name="example-notebook-msk-send"></a>

In diesem Abschnitt führen Sie ein Python-Skript in Ihrem EC2 Amazon-Client aus, um Daten an Ihre Amazon MSK-Datenquelle zu senden.

1. Connect zu Ihrem EC2 Amazon-Client her.

1. Führen Sie die folgenden Befehle aus, um Python Version 3, Pip und das Kafka für Python-Paket zu installieren, und bestätigen Sie die Aktionen:

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. Konfigurieren Sie das AWS CLI auf Ihrem Client-Computer, indem Sie den folgenden Befehl eingeben:

   ```
   aws configure
   ```

   Geben Sie Ihre Kontoanmeldeinformationen ein, und **us-east-1** für die `region`.

1. Erstellen Sie eine Datei mit dem Namen `stock.py` und den folgenden Inhalten. Ersetzen Sie den Beispielwert durch die Bootstrap Brokers-Zeichenfolge Ihres Amazon MSK-Clusters und aktualisieren Sie den Themennamen, falls Ihr Thema nicht: **AWS KafkaTutorialTopic**

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. Führen Sie das Skript mit dem folgenden Befehl aus:

   ```
   $ python3 stock.py
   ```

1. Lassen Sie das Skript laufen, während Sie den folgenden Abschnitt abschließen.

## Testen Sie Ihr Studio-Notebook
<a name="example-notebook-msk-test"></a>

In diesem Abschnitt verwenden Sie Ihr Studio-Notebook, um Daten aus Ihrem Amazon MSK-Cluster abzufragen.

1. [Die Managed Service for Apache Flink-Konsole zu Hause öffnen? https://console.aws.amazon.com/managed-flink/ region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard).

1. Wählen Sie auf der Seite **Managed Service für Apache Flink-Anwendungen** die Registerkarte **Studio-Notebook** aus. Wählen Sie **MyNotebook**.

1. **Wählen Sie auf der Seite „In Apache Zeppelin öffnen“. **MyNotebook****

   Die Oberfläche von Apache Zeppelin wird in einer neuen Registerkarte geöffnet.

1. Auf der Seite **Willkommen bei Zeppelin\$1** wählen Sie **Zeppelin neue Notiz** aus.

1. Geben Sie auf der Seite **Zeppelin Notiz** die folgende Abfrage in eine neue Notiz ein:

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   Wählen Sie das Ausführungssymbol.

   Die Anwendung zeigt Daten aus dem Amazon MSK-Cluster an.

Um das Apache Flink-Dashboard für Ihre Anwendung zu öffnen und betriebliche Aspekte zu sehen, wählen Sie **FLINK JOB**. Weitere Informationen zum Flink-Dashboard finden Sie unter [Apache Flink-Dashboard](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html) im [Managed Service für Apache Flink Entwicklerhandbuch](https://docs.aws.amazon.com/).

Weitere Beispiele für Flink-Streaming-SQL-Abfragen finden Sie unter [Abfragen](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in der [Apache Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

# Bereinigen Sie Ihre Anwendung und die abhängigen Ressourcen
<a name="example-notebook-cleanup"></a>

## Löschen Sie Ihr Studio-Notebook
<a name="example-notebook-cleanup-app"></a>

1. Öffnen Sie die Managed Service für Apache Flink-Konsole.

1. Wählen Sie **MyNotebook**.

1. Wählen Sie **Aktionen** und dann **Löschen** aus.

## Löschen Sie Ihre AWS Glue Datenbank und Ihre Verbindung
<a name="example-notebook-cleanup-glue"></a>

1. Öffnen Sie die AWS Glue Konsole unter [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/).

1. Wählen Sie in der linken Navigationsleiste **Datenbanken** aus. Markieren Sie das Kontrollkästchen neben **Standard**, um es auszuwählen. Wählen Sie **Aktion**, **Datenbank löschen**. Bestätigen Sie Ihre Auswahl.

1. Wählen Sie in der linken Navigationsleiste **Verbindungen** aus. Markieren Sie das Kontrollkästchen neben, **ZeppelinConnection**um es auszuwählen. Wählen Sie **Aktion**, **Verbindung löschen**. Bestätigen Sie Ihre Auswahl.

## So löschen Sie die IAM-Rolle und -Richtlinie
<a name="example-notebook-msk-cleanup-iam"></a>

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Klicken Sie in der linken Navigationsleiste auf **Rollen**.

1. Verwenden Sie die Suchleiste, um nach der **ZeppelinRole**Rolle zu suchen.

1. Wählen Sie die **ZeppelinRole**Rolle aus. Wählen Sie **Rolle löschen** aus. Bestätigen Sie das Löschen.

## Löschen Sie Ihre CloudWatch Protokollgruppe
<a name="example-notebook-cleanup-cw"></a>

Die Konsole erstellt eine CloudWatch Logs-Gruppe und einen Log-Stream für Sie, wenn Sie Ihre Anwendung mit der Konsole erstellen. Sie haben keine Protokollgruppe und keinen Stream, wenn Sie Ihre Anwendung mit der AWS CLI erstellt haben.

1. Öffnen Sie die CloudWatch Konsole unter [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Wählen Sie in der linken Navigationsleiste **Protokollgruppen** aus.

1. Wählen Sie die Gruppe**/AWS/KinesisAnalytics/MyNotebook**log aus.

1. Wählen Sie **Actions (Aktionen)**, **Delete log group(s) (Protokollgruppe(n) löschen)** aus. Bestätigen Sie das Löschen.

## Kinesis Data Streams Streams-Ressourcen bereinigen
<a name="example-notebook-cleanup-streams"></a>

Um Ihren Kinesis Stream zu löschen, öffnen Sie die Kinesis Data Streams-Konsole, wählen Sie Ihren Kinesis Stream aus und wählen Sie **Aktionen**, **Löschen**.

## Bereinigen von MSK-Ressourcen
<a name="example-notebook-cleanup-msk"></a>

Führen Sie die Schritte in diesem Abschnitt aus, wenn Sie für dieses Tutorial einen Amazon MSK-Cluster erstellt haben. Dieser Abschnitt enthält Anweisungen zur Bereinigung Ihrer Amazon EC2-Client-Instance, Amazon VPC und Amazon MSK-Cluster.

### Löschen Sie Ihren Amazon MSK-Cluster
<a name="example-notebook-msk-cleanup-msk"></a>

Gehen Sie wie folgt vor, wenn Sie für dieses Tutorial einen Amazon MSK-Cluster erstellt haben.

1. Die Amazon MSK-Konsole zu [https://console.aws.amazon.com/msk/Hause öffnen? region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/).

1. Wählen Sie **AWS KafkaTutorialCluster**. Wählen Sie **Löschen** aus. Geben Sie **delete** in das angezeigte Fenster ein und bestätigen Sie Ihre Auswahl.

### Beenden Ihrer Client-Instance
<a name="example-notebook-msk-cleanup-client"></a>

Gehen Sie wie folgt vor, wenn Sie für dieses Tutorial eine Amazon EC2-Client-Instance erstellt haben.

1. Öffnen Sie die Amazon-EC2-Konsole unter [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Wählen Sie in der linken Navigationsleiste **Instances** aus.

1. Klicken Sie auf das Kästchen neben, um es auszuwählen. **ZeppelinClient**

1. Wählen Sie **Instance-Status**, **Instance beenden**.

### Löschen Ihrer Amazon VPC
<a name="example-notebook-msk-cleanup-vpc"></a>

Gehen Sie wie folgt vor, wenn Sie für dieses Tutorial eine Amazon VPC erstellt haben.

1. Öffnen Sie die Amazon-EC2-Konsole unter [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/).

1. Wählen Sie in der linken Navigationsleiste **Netzwerkschnittstellen** aus.

1. Geben Sie Ihre VPC-ID in das Suchfeld ein und drücken Sie die Eingabetaste.

1. Aktivieren Sie das Kontrollkästchen in der Kopfzeile der Tabelle, um alle angezeigten Netzwerkschnittstellen auszuwählen.

1. Wählen Sie **Actions (Aktionen)**, **Loslösen (Detach)**. Wählen Sie in dem daraufhin angezeigten Fenster unter **Trennung erzwingen** die Option **Aktivieren** aus. Wählen Sie **Trennen** und warten Sie, bis alle Netzwerkschnittstellen den Status **Verfügbar** erreichen.

1. Aktivieren Sie das Kontrollkästchen in der Kopfzeile der Tabelle, um alle angezeigten Netzwerkschnittstellen erneut auszuwählen.

1. Wählen Sie **Aktionen**, **Löschen** aus. Bestätigen Sie die Aktion.

1. Öffnen Sie die Amazon-VPC-Konsole unter [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/).

1. Wählen Sie **AWS KafkaTutorialVPC** aus. Wählen Sie **Aktionen**, **VPC löschen** aus. Geben Sie **delete** ein und bestätigen Sie den Löschvorgang.

# Tutorial: Stellen Sie ein Studio-Notebook als Managed Service für Apache Flink-Anwendung mit dauerhaftem Zustand bereit
<a name="example-notebook-deploy"></a>

Das folgende Tutorial zeigt, wie Sie ein Studio-Notebook als Managed-Service für Apache Flink-Anwendung mit einem dauerhaften Status bereitstellen.

**Topics**
+ [

## Erfüllen der Voraussetzungen
](#example-notebook-durable-setup)
+ [

## Stellen Sie eine Anwendung mit einem dauerhaften Status bereit, indem Sie den AWS-Managementkonsole
](#example-notebook-deploy-console)
+ [

## Stellen Sie eine Anwendung mit einem dauerhaften Zustand bereit, indem Sie AWS CLI
](#example-notebook-deploy-cli)

## Erfüllen der Voraussetzungen
<a name="example-notebook-durable-setup"></a>

Erstellen Sie ein neues Studio-Notizbuch, indem Sie den Anweisungen folgen[Tutorial: Erstellen Sie ein Studio-Notizbuch in Managed Service für Apache Flink](example-notebook.md) und entweder Kinesis Datenstrom oder Amazon MSK verwenden. Name des Studio-Notizbuchs `ExampleTestDeploy`.

## Stellen Sie eine Anwendung mit einem dauerhaften Status bereit, indem Sie den AWS-Managementkonsole
<a name="example-notebook-deploy-console"></a>

1. Fügen Sie einen S3-Bucket-Speicherort hinzu, an dem der gepackte Code unter **Speicherort des Anwendungscodes gespeichert werden soll - *optional*** in der Konsole. Dies ermöglicht die Schritte zum Bereitstellen und Ausführen Ihrer Anwendung direkt vom Notebook aus.

1. Fügen Sie der Anwendungsrolle die erforderlichen Berechtigungen hinzu, um die von Ihnen verwendete Rolle zum Lesen und Schreiben in einen Amazon S3-Bucket zu aktivieren und um eine Managed Service for Apache Flink-Anwendung zu starten:
   + Amazon S3 FullAccess
   + Amazon hat es geschafft- flinkFullAccess
   + Zugriff auf Ihre Quellen, Ziele und VPCs gegebenenfalls Weitere Informationen finden Sie unter [Überprüfen Sie die IAM-Berechtigungen für Studio-Notebooks](how-zeppelin-iam.md).

1. Verwenden Sie den folgenden Beispielcode:

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. Mit der Einführung dieses Feature sehen Sie in der rechten oberen Ecke jeder Notiz in Ihrem Notizbuch ein neues Dropdown-Menü mit dem Namen des Notizbuchs. Sie haben die folgenden Möglichkeiten:
   + Sehen Sie sich die Studio-Notizbucheinstellungen in der AWS-Managementkonsole an.
   + Erstellen Sie Ihren Zeppelin Note und exportieren Sie ihn zu Amazon S3. Geben Sie an dieser Stelle einen Namen für Ihre Anwendung ein und wählen Sie **Erstellen und Exportieren**. Sie erhalten eine Benachrichtigung, wenn der Export abgeschlossen ist.
   + Bei Bedarf können Sie alle zusätzlichen Tests der ausführbaren Datei in Amazon S3 anzeigen und ausführen.
   + Sobald der Build abgeschlossen ist, können Sie Ihren Code als Kinesis-Streaming-Anwendung mit dauerhaftem Zustand und automatischer Skalierung bereitstellen.
   + Verwenden Sie das Drop-down-Menü und wählen Sie **Zeppelin Note als Kinesis-Streaming-Anwendung bereitstellen**. Überprüfen Sie den Namen der Anwendung und wählen Sie **Über AWS Konsole bereitstellen** aus.
   + Dadurch gelangen Sie zu der AWS-Managementkonsole Seite zum Erstellen einer Managed Service for Apache Flink-Anwendung. Beachten Sie, dass Anwendungsname, Parallelität, Codespeicherort, Standard-Glue-DB, VPC (falls zutreffend) und IAM-Rollen vorab ausgefüllt wurden. Stellen Sie sicher, dass die IAM-Rollen über die erforderlichen Berechtigungen für Ihre Quellen und Ziele verfügen. Snapshots sind standardmäßig aktiviert, um eine dauerhafte Verwaltung des Anwendungsstatus zu gewährleisten.
   + Wählen Sie **Erstellen der Anwendung**.
   + Sie können alle Einstellungen **konfigurieren** und ändern und anschließend **Ausführen** wählen, um Ihre Streaming-Anwendung zu starten.

## Stellen Sie eine Anwendung mit einem dauerhaften Zustand bereit, indem Sie AWS CLI
<a name="example-notebook-deploy-cli"></a>

Um eine Anwendung mithilfe von bereitzustellen AWS CLI, müssen Sie Ihr System so aktualisieren, AWS CLI dass es das mit Ihren Beta 2-Informationen bereitgestellte Servicemodell verwendet. Weitere Informationen zur Verwendung des aktualisierten Servicemodells finden Sie unter [Erfüllen der VoraussetzungenErfüllen der Voraussetzungen](example-notebook.md#example-notebook-setup).

Der folgende Beispielcode erstellt ein neues Studio-Notizbuch:

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

Das folgende Codebeispiel startet ein Studio-Notebook:

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

Der folgende Code gibt die URL für die Apache Zeppelin-Notizbuch-Seite einer Anwendung zurück:

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Sehen Sie sich Beispielabfragen zur Analyse von Daten in einem Studio-Notizbuch an
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [

## Erstellen Sie Tabellen mit Amazon MSK/Apache Kafka
](#how-zeppelin-examples-creating-tables)
+ [

## Erstellen Sie Tabellen mit Kinesis
](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [

## Fragen Sie ein taumelndes Fenster ab
](#how-zeppelin-examples-tumbling)
+ [

## Fragen Sie ein Schiebefenster ab
](#how-zeppelin-examples-sliding)
+ [

## Verwenden Sie interaktives SQL
](#how-zeppelin-examples-interactive-sql)
+ [

## Verwenden Sie den BlackHole SQL-Konnektor
](#how-zeppelin-examples-blackhole-connector-sql)
+ [

## Verwenden Sie Scala, um Beispieldaten zu generieren
](#notebook-example-data-generator)
+ [

## Verwenden Sie interaktives Scala
](#notebook-example-interactive-scala)
+ [

## Interaktives Python verwenden
](#notebook-example-interactive-python)
+ [

## Verwenden Sie eine Kombination aus interaktivem Python, SQL und Scala
](#notebook-example-interactive-pythonsqlscala)
+ [

## Verwenden Sie einen kontoübergreifenden Kinesis-Datenstream
](#notebook-example-crossaccount-kds)

Informationen zu den SQL-Abfrageeinstellungen von Apache Flink finden Sie unter [ Flink auf Zeppelin-Notebooks für interaktive Datenanalyse](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html).

Um Ihre Anwendung im Apache-Flink-Dashboard anzuzeigen, wählen Sie **FLINK-AUFTRAG** auf der Seite **Zeppelin Notiz** Ihrer Anwendung.

Weitere Informationen zu Fensterabfragen finden Sie unter [Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html) in der [Apache-Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

Weitere Beispiele für Streaming-SQL-Abfragen in Apache Flink finden Sie unter [Abfragen](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in der [Apache-Flink-Dokumentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

## Erstellen Sie Tabellen mit Amazon MSK/Apache Kafka
<a name="how-zeppelin-examples-creating-tables"></a>

Sie können den Amazon-MSK-Flink-Konnektor mit Managed Service für Apache Flink Studio verwenden, um Ihre Verbindung mit Klartext-, SSL- oder IAM-Authentifizierung zu authentifizieren. Erstellen Sie Ihre Tabellen mit den spezifischen Eigenschaften gemäß Ihren Anforderungen.

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

Sie können diese mit anderen Eigenschaften im [Apache-Kafka-SQL-Konnektor](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/) kombinieren.

## Erstellen Sie Tabellen mit Kinesis
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

Im folgenden Beispiel erstellen Sie eine Tabelle mit Kinesis:

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

Weitere Informationen zu anderen Eigenschaften, die Sie verwenden können, finden Sie unter [Amazon Kinesis Data Streams SQL-Konnektor](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/).

## Fragen Sie ein taumelndes Fenster ab
<a name="how-zeppelin-examples-tumbling"></a>

Die folgende Flink-Streaming-SQL-Abfrage wählt den höchsten Preis in jedem fünfsekündigen rollierenden Fenster aus der `ZeppelinTopic`-Tabelle aus:

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## Fragen Sie ein Schiebefenster ab
<a name="how-zeppelin-examples-sliding"></a>

Die folgende Flink-Streaming-SQL-Abfrage wählt den höchsten Preis in jedem fünfsekündigen rollierenden Fenster aus der `ZeppelinTopic`-Tabelle aus:

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## Verwenden Sie interaktives SQL
<a name="how-zeppelin-examples-interactive-sql"></a>

In diesem Beispiel wird der Höchstwert der Ereignis- und Verarbeitungszeit sowie die Summe der Werte aus der Schlüssel-Wert-Tabelle ausgegeben. Stellen Sie sicher, dass Sie das Beispielskript zur Datengenerierung aus dem laufenden [Verwenden Sie Scala, um Beispieldaten zu generieren](#notebook-example-data-generator) haben. Informationen zum Ausprobieren anderer SQL-Abfragen wie Filtern und Joins in Ihrem Studio-Notebook finden Sie in der Apache-Flink-Dokumentation: [Abfragen](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html) in der Apache-Flink-Dokumentation.

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## Verwenden Sie den BlackHole SQL-Konnektor
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

Der BlackHole SQL-Connector erfordert nicht, dass Sie einen Kinesis-Datenstream oder einen Amazon MSK-Cluster erstellen, um Ihre Abfragen zu testen. Informationen zum SQL-Konnektor finden Sie unter BlackHole [BlackHole SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html) in der Apache Flink-Dokumentation. In diesem Beispiel ist der Standardkatalog ein speicherinterner Katalog.

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Verwenden Sie Scala, um Beispieldaten zu generieren
<a name="notebook-example-data-generator"></a>

In diesem Beispiel wird Scala verwendet, um Beispieldaten zu generieren. Sie können diese Beispieldaten verwenden, um verschiedene Abfragen zu testen. Verwenden Sie die Anweisung create table, um die Schlüssel-Wert-Tabelle zu erstellen.

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## Verwenden Sie interaktives Scala
<a name="notebook-example-interactive-scala"></a>

Dies ist die Scala-Übersetzung von [Verwenden Sie interaktives SQL](#how-zeppelin-examples-interactive-sql). Weitere Scala-Beispiele finden Sie unter [Tabellen-API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) in der Apache-Flink-Dokumentation.

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Interaktives Python verwenden
<a name="notebook-example-interactive-python"></a>

Dies ist die Python-Übersetzung von [Verwenden Sie interaktives SQL](#how-zeppelin-examples-interactive-sql). Weitere Python-Beispiele finden Sie unter [Tabellen-API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html) in der Apache-Flink-Dokumentation. 

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## Verwenden Sie eine Kombination aus interaktivem Python, SQL und Scala
<a name="notebook-example-interactive-pythonsqlscala"></a>

Sie können eine beliebige Kombination aus SQL, Python und Scala in Ihrem Notebook für interaktive Analysen verwenden. In einem Studio-Notebook, das Sie als dauerhafte Anwendung bereitstellen möchten, können Sie eine Kombination aus SQL und Scala verwenden. Dieses Beispiel zeigt Ihnen die Abschnitte, die ignoriert werden, und diejenigen, die in der Anwendung mit dem dauerhaften Zustand bereitgestellt werden.

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## Verwenden Sie einen kontoübergreifenden Kinesis-Datenstream
<a name="notebook-example-crossaccount-kds"></a>

Um einen Kinesis-Datenstrom zu verwenden, der sich in einem anderen Konto als dem Konto befindet, das Ihr Studio-Notebook enthält, erstellen Sie eine Serviceausführungsrolle in dem Konto, in dem Ihr Studio-Notebook ausgeführt wird, und eine Rollenvertrauensrichtlinie für das Konto, das den Datenstrom enthält. Verwenden Sie `aws.credentials.provider`, `aws.credentials.role.arn` und `aws.credentials.role.sessionName` im Kinesis-Konnektor in Ihrer DDL-Anweisung create table, um eine Tabelle anhand des Datenstroms zu erstellen.

Verwenden Sie die folgende Serviceausführungsrolle für das Studio-Notebook-Konto.

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

Verwenden Sie die `AmazonKinesisFullAccess`-Richtlinie und die folgende Rollenvertrauensrichtlinie für das Datenstrom-Konto.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

Verwenden Sie den folgenden Absatz für die create-table-Anweisung.

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```

# Beheben Sie Probleme mit Studio-Notebooks für Managed Service für Apache Flink
<a name="how-zeppelin-troubleshooting"></a>

Dieser Abschnitt enthält Informationen zur Fehlerbehebung für Studio-Notebooks.

## Stoppen Sie eine blockierte Anwendung
<a name="how-zeppelin-troubleshooting-stopping"></a>

Um eine Anwendung zu beenden, die in einem vorübergehenden Zustand feststeckt, rufen Sie die [StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)Aktion auf, wobei der `Force` Parameter auf `true` gesetzt ist. Weitere Informationen finden Sie unter [Ausführen von Anwendungen](https://docs.aws.amazon.com/managed-flink/latest/java/how-running-apps.html) im [Managed Service für Apache Flink Entwicklerhandbuch](https://docs.aws.amazon.com/managed-flink/latest/java/).

## Als Anwendung mit dauerhaftem Zustand in einer VPC ohne Internetzugang bereitstellen
<a name="how-zeppelin-troubleshooting-deploying-no-internet"></a>

Die deploy-as-application Funktion Managed Service for Apache Flink Studio unterstützt keine VPC-Anwendungen ohne Internetzugang. Wir empfehlen, dass Sie Ihre Anwendung in Studio erstellen und dann Managed Service für Apache Flink verwenden, um manuell eine Flink-Anwendung zu erstellen und die ZIP-Datei auszuwählen, die Sie in Ihrem Notebook erstellt haben.

Die folgenden Schritte beschreiben, wie Sie dies tun: 

1. Erstellen und exportieren Sie Ihre Studio-Anwendung in Amazon S3. Dies sollte eine ZIP-Datei sein. 

1. Erstellen Sie manuell eine Anwendung, die Managed Service für Apache Flink nutzt, mit einem Codepfad, der auf den Speicherort der ZIP-Datei in Amazon S3 verweist. Darüber hinaus müssen Sie die Anwendung mit den folgenden `env`-Variablen (2 `groupID`, 3 `var` insgesamt) konfigurieren: 

1. kinesis.analytics.flink.run.options

   1. python: source/note.py

   1. jarfile: lib/ .jar PythonApplicationDependencies

1. managed.deploy\$1as\$1app.options

   1. DatabaseARN: *<glue database ARN (Amazon Resource Name)>*

1. Möglicherweise müssen Sie Managed Service für Apache Flink Studio und Managed Service für Apache Flink IAM-Rollen für die Services, die Ihre Anwendung verwendet, Berechtigungen erteilen. Sie können dieselbe IAM-Rolle für beide Apps verwenden.

## Deploy-as-app Reduzierung von Größe und Bauzeit
<a name="how-zeppelin-troubleshooting-deploying-as-app-reduce-build-time"></a>

Studio deploy-as-app für Python-Anwendungen packt alles, was in der Python-Umgebung verfügbar ist, da wir nicht ermitteln können, welche Bibliotheken Sie benötigen. Dies kann zu einer Größe führen, die größer als nötig ist deploy-as-app. Das folgende Verfahren zeigt, wie Sie die Größe der deploy-as-app Python-Anwendung reduzieren können, indem Sie Abhängigkeiten deinstallieren.

Wenn Sie eine Python-Anwendung mit deploy-as-app Funktionen aus Studio erstellen, sollten Sie erwägen, vorinstallierte Python-Pakete aus dem System zu entfernen, wenn Ihre Anwendungen nicht davon abhängig sind. Dies trägt nicht nur dazu bei, die endgültige Artefaktgröße zu reduzieren, um zu verhindern, dass das Servicelimit für die Anwendungsgröße überschritten wird, sondern verbessert auch die Erstellungszeit von Anwendungen, die diese Funktion verwenden. deploy-as-app

Sie können den folgenden Befehl ausführen, um alle installierten Python-Pakete mit ihrer jeweiligen installierten Größe aufzulisten und Pakete mit signifikanter Größe selektiv zu entfernen.

```
%flink.pyflink

!pip list --format freeze | awk -F = {'print $1'} | xargs pip show | grep -E 'Location:|Name:' | cut -d ' ' -f 2 | paste -d ' ' - - | awk '{gsub("-","_",$1); print $2 "/" tolower($1)}' | xargs du -sh 2> /dev/null | sort -hr
```

**Anmerkung**  
`apache-beam` wird von Flink Python zum Betrieb benötigt. Sie sollten dieses Paket und seine Abhängigkeiten niemals entfernen.

Im Folgenden finden Sie eine Liste der vorinstallierten Python-Pakete in Studio V2, deren Entfernung in Betracht gezogen werden kann:

```
scipy
statsmodels
plotnine
seaborn
llvmlite
bokeh
pandas
matplotlib
botocore
boto3
numba
```

**So entfernen Sie ein Python-Paket aus dem Zeppelin-Notebook:**

1. Prüfen Sie, ob Ihre Anwendung von dem Paket oder einem seiner konsumierenden Pakete abhängt, bevor Sie es entfernen. Mit [pipdeptree](https://pypi.org/project/pipdeptree/) können Sie die Abhängigkeiten eines Pakets identifizieren.

1. Führen Sie den folgenden Befehl aus, um ein Paket zu entfernen:

   ```
   %flink.pyflink
   !pip uninstall -y <package-to-remove>
   ```

1. Wenn Sie ein Paket abrufen müssen, das Sie versehentlich entfernt haben, führen Sie den folgenden Befehl aus:

   ```
   %flink.pyflink
   !pip install <package-to-install>
   ```

**Example Beispiel: Entfernen Sie `scipy` das Paket, bevor Sie Ihre Python-Anwendung mit deploy-as-app Funktion bereitstellen.**  

1. Verwenden Sie `pipdeptree`, um alle `scipy`-Verbraucher zu ermitteln und zu überprüfen, ob Sie `scipy` sicher entfernen können.
   + Installieren Sie das Tool über das Notebook:

     ```
     %flink.pyflink             
     !pip install pipdeptree
     ```
   + Rufen Sie den umgekehrten Abhängigkeitsbaum von `scipy` ab, indem Sie Folgendes ausführen:

     ```
     %flink.pyflink
     !pip -r -p scipy
     ```

     Sie sollten eine ähnliche Ausgabe wie die folgende sehen (aus Platzgründen gekürzt):

     ```
     ...
     ------------------------------------------------------------------------ 
     scipy==1.8.0 
     ├── plotnine==0.5.1 [requires: scipy>=1.0.0] 
     ├── seaborn==0.9.0 [requires: scipy>=0.14.0] 
     └── statsmodels==0.12.2 [requires: scipy>=1.1] 
         └── plotnine==0.5.1 [requires: statsmodels>=0.8.0]
     ```

1. Prüfen Sie sorgfältig die Verwendung von `seaborn`, `statsmodels` und `plotnine` in Ihren Anwendungen. Wenn Ihre Anwendungen nicht von `scipy`, `seaborn`, `statemodels` oder `plotnine` abhängig sind, können Sie alle diese Pakete oder nur diejenigen entfernen, die Ihre Anwendungen nicht benötigen.

1. Entfernen Sie das Paket, indem Sie Folgendes ausführen:

   ```
   !pip uninstall -y scipy plotnine seaborn statemodels
   ```

## Job abbrechen
<a name="how-notbook-canceling-jobs"></a>

In diesem Abschnitt erfahren Sie, wie Sie Apache-Flink-Aufträge abbrechen, auf die Sie von Apache Zeppelin aus nicht zugreifen können. Wenn Sie einen solchen Auftrag abbrechen möchten, rufen Sie das Apache-Flink-Dashboard auf, kopieren Sie die Auftrags-ID und verwenden Sie sie dann in einem der folgenden Beispiele.

Um einen einzelnen Auftrag abzubrechen:

```
%flink.pyflink
import requests

requests.patch("https://zeppelin-flink:8082/jobs/[job_id]", verify=False)
```

Um alle laufenden Aufträge abzubrechen:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    if (job["status"] == "RUNNING"):
        print(requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False))
```

Um alle Aufträge abzubrechen:

```
%flink.pyflink
import requests

r = requests.get("https://zeppelin-flink:8082/jobs", verify=False)
jobs = r.json()['jobs']

for job in jobs:
    requests.patch("https://zeppelin-flink:8082/jobs/{}".format(job["id"]), verify=False)
```

## Starten Sie den Apache Flink-Interpreter neu
<a name="how-notbook-restarting-interpreter"></a>

Um den Apache-Flink-Interpreter in Ihrem Studio-Notebook neu zu starten

1. Wählen Sie **Konfiguration** in der oberen rechten Ecke des Bildschirms.

1. Wählen Sie **Interpreter**.

1. Wählen Sie **Neustart** und dann **OK**.

# Erstellen Sie benutzerdefinierte IAM-Richtlinien für Managed Service für Apache Flink Studio-Notebooks
<a name="how-zeppelin-appendix-iam"></a>

Normalerweise verwenden Sie verwaltete IAM-Richtlinien, um Ihrer Anwendung den Zugriff auf abhängige Ressourcen zu ermöglichen. Wenn Sie eine genauere Kontrolle über die Berechtigungen Ihrer Anwendung benötigen, können Sie eine benutzerdefinierte IAM-Richtlinie verwenden. Dieser Abschnitt enthält Beispiele für benutzerdefinierte IAM-Richtlinien.

**Anmerkung**  
Ersetzen Sie in den folgenden Richtlinienbeispielen den Platzhaltertext durch die Werte Ihrer Anwendung.

**Topics**
+ [

## AWS Glue
](#how-zeppelin-iam-glue)
+ [

## CloudWatch Logs
](#how-zeppelin-iam-cw)
+ [

## Kinesis-Streams
](#how-zeppelin-iam-streams)
+ [

## Amazon-MSK-Cluster
](#how-zeppelin-iam-msk)

## AWS Glue
<a name="how-zeppelin-iam-glue"></a>

Die folgende Beispielrichtlinie gewährt Berechtigungen für den Zugriff auf eine AWS Glue Datenbank.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "GlueTable",
            "Effect": "Allow",
            "Action": [
                "glue:GetConnection",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetDatabase",
                "glue:CreateTable",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:123456789012:connection/*",
                "arn:aws:glue:us-east-1:123456789012:table/<database-name>/*",
                "arn:aws:glue:us-east-1:123456789012:database/<database-name>",
                "arn:aws:glue:us-east-1:123456789012:database/hive",
                "arn:aws:glue:us-east-1:123456789012:catalog"
            ]
        },
        {
            "Sid": "GlueDatabase",
            "Effect": "Allow",
            "Action": "glue:GetDatabases",
            "Resource": "*"
        }
    ]
}
```

------

## CloudWatch Logs
<a name="how-zeppelin-iam-cw"></a>

Die folgende Richtlinie gewährt Berechtigungen für den Zugriff auf CloudWatch Protokolle:

```
{
      "Sid": "ListCloudwatchLogGroups",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogGroups"
      ],
      "Resource": [
        "arn:aws:logs:<region>:<accountId>:log-group:*"
      ]
    },
    {
      "Sid": "ListCloudwatchLogStreams",
      "Effect": "Allow",
      "Action": [
        "logs:DescribeLogStreams"
      ],
      "Resource": [
        "<logGroupArn>:log-stream:*"
      ]
    },
    {
      "Sid": "PutCloudwatchLogs",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "<logStreamArn>"
      ]
    }
```

**Anmerkung**  
Wenn Sie Ihre Anwendung mithilfe der Konsole erstellen, fügt die Konsole Ihrer Anwendungsrolle die für den Zugriff auf CloudWatch Protokolle erforderlichen Richtlinien hinzu.

## Kinesis-Streams
<a name="how-zeppelin-iam-streams"></a>

Ihre Anwendung kann einen Kinesis-Stream für eine Quelle oder ein Ziel verwenden. Ihre Anwendung benötigt Leseberechtigungen, um aus einem Quell-Stream zu lesen, und Schreibberechtigungen, um in einen Ziel-Stream zu schreiben.

Die folgende Richtlinie gewährt Berechtigungen zum Lesen aus einem Kinesis-Stream, der als Quelle verwendet wird:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisShardDiscovery",
            "Effect": "Allow",
            "Action": "kinesis:ListShards",
            "Resource": "*"
        },
        {
            "Sid": "KinesisShardConsumption",
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer",
                "kinesis:DeregisterStreamConsumer"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        },
        {
            "Sid": "KinesisEfoConsumer",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamConsumer",
                "kinesis:SubscribeToShard"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>/consumer/*"
        }
    ]
}
```

------

Die folgende Richtlinie gewährt Berechtigungen zum Schreiben in einen Kinesis-Stream, der als Ziel verwendet wird:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisStreamSink",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:123456789012:stream/<stream-name>"
        }
    ]
}
```

------

Wenn Ihre Anwendung auf einen verschlüsselten Kinesis-Stream zugreift, müssen Sie zusätzliche Berechtigungen für den Zugriff auf den Stream und den Verschlüsselungsschlüssel des Streams gewähren. 

Die folgende Richtlinie gewährt Berechtigungen für den Zugriff auf einen verschlüsselten Quell-Stream und den Verschlüsselungsschlüssel des Streams:

```
{
      "Sid": "ReadEncryptedKinesisStreamSource",
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "<inputStreamKeyArn>"
      ]
    }
    ,
```

Die folgende Richtlinie gewährt Berechtigungen für den Zugriff auf einen verschlüsselten Ziel-Stream und den Verschlüsselungsschlüssel des Streams:

```
{
      "Sid": "WriteEncryptedKinesisStreamSink",
      "Effect": "Allow",
      "Action": [
        "kms:GenerateDataKey"
      ],
      "Resource": [
        "<outputStreamKeyArn>"
      ]
    }
```

## Amazon-MSK-Cluster
<a name="how-zeppelin-iam-msk"></a>

Um Zugriff auf einen Amazon-MSK-Cluster zu gewähren, gewähren Sie Zugriff auf die VPC des Clusters. Richtlinienbeispiele für den Zugriff auf eine Amazon VPC finden Sie unter [ VPC-Anwendungsberechtigungen](https://docs.aws.amazon.com/managed-flink/latest/java/vpc-permissions.html).