

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Erstellen Sie eine Managed Service for Apache Flink-Anwendung und führen Sie sie aus
<a name="get-started-exercise"></a>

In diesem Schritt erstellen Sie eine Managed Service für Apache Flink-Anwendung mit Kinesis-Datenströmen als Quelle und Senke.

**Topics**
+ [Erstellen Sie abhängige Ressourcen](#get-started-exercise-0)
+ [Einrichten der lokalen Entwicklungsumgebung](#get-started-exercise-2)
+ [Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn](#get-started-exercise-5)
+ [Schreiben Sie Beispieldatensätze in den Eingabestream](#get-started-exercise-5-4)
+ [Führen Sie Ihre Anwendung lokal aus](#get-started-exercise-5-run)
+ [Beobachten Sie Eingabe- und Ausgabedaten in Kinesis-Streams](#get-started-exercise-input-output)
+ [Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird](#get-started-exercise-stop)
+ [Kompilieren und verpacken Sie Ihren Anwendungscode](#get-started-exercise-5-5)
+ [Laden Sie die JAR-Datei mit dem Anwendungscode hoch](#get-started-exercise-6)
+ [Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink](#get-started-exercise-7)
+ [Nächster Schritt](#get-started-exercise-next-step-4)

## Erstellen Sie abhängige Ressourcen
<a name="get-started-exercise-0"></a>

Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen: 
+ Zwei Kinesis-Datenströme für Eingabe und Ausgabe
+ Ein Amazon S3 S3-Bucket zum Speichern des Anwendungscodes
**Anmerkung**  
In diesem Tutorial wird davon ausgegangen, dass Sie Ihre Anwendung in der Region us-east-1 US East (Nord-Virginia) bereitstellen. Wenn Sie eine andere Region verwenden, passen Sie alle Schritte entsprechend an.

### Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams
<a name="get-started-exercise-1"></a>

Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie zwei Kinesis Data Streams (`ExampleInputStream` und `ExampleOutputStream`). Ihre Anwendung verwendet diese Streams für die Quell- und Ziel-Streams der Anwendung.

Sie können diese Streams entweder mit der Amazon Kinesis Kinesis-Konsole oder mit dem folgenden AWS CLI Befehl erstellen. Anweisungen für die Konsole finden Sie unter [Erstellen und Aktualisieren von Datenströmen](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) im *Amazon Kinesis Data Streams Entwicklerhandbuch*. Um die Streams mit dem zu erstellen AWS CLI, verwenden Sie die folgenden Befehle und passen Sie sie an die Region an, die Sie für Ihre Anwendung verwenden.

**So erstellen Sie die Daten-Streams (AWS CLI)**

1. Verwenden Sie den folgenden Amazon Kinesis `create-stream` AWS CLI Kinesis-Befehl, um den ersten Stream (`ExampleInputStream`) zu erstellen:

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleInputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

1. Um den zweiten Stream zu erstellen, den die Anwendung zum Schreiben der Ausgabe verwendet, führen Sie denselben Befehl aus und ändern Sie den Stream-Namen in`ExampleOutputStream`:

   ```
   $ aws kinesis create-stream \
   --stream-name ExampleOutputStream \
   --shard-count 1 \
   --region us-east-1 \
   ```

### Erstellen Sie einen Amazon S3 S3-Bucket für den Anwendungscode
<a name="get-started-exercise-1-5"></a>

Sie können ein Amazon-S3-Bucket mithilfe der Konsole erstellen. Informationen zum Erstellen eines Amazon S3 S3-Buckets mithilfe der Konsole finden Sie unter [Erstellen eines Buckets](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) im [Amazon S3 S3-Benutzerhandbuch](https://docs.aws.amazon.com/AmazonS3/latest/userguide/). Benennen Sie den Amazon S3 S3-Bucket mit einem weltweit eindeutigen Namen, indem Sie beispielsweise Ihren Anmeldenamen anhängen.

**Anmerkung**  
 Stellen Sie sicher, dass Sie den Bucket in der Region erstellen, die Sie für dieses Tutorial verwenden (us-east-1).

### Sonstige Ressourcen
<a name="get-started-exercise-1-6"></a>

Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink automatisch die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:
+ Eine Protokollgruppe mit dem Namen `/AWS/KinesisAnalytics-java/<my-application>`
+ Einen Protokollstream mit dem Namen `kinesis-analytics-log-stream`

## Einrichten der lokalen Entwicklungsumgebung
<a name="get-started-exercise-2"></a>

Für die Entwicklung und das Debuggen können Sie die Apache Flink-Anwendung auf Ihrem Computer direkt von der IDE Ihrer Wahl aus ausführen. Alle Apache Flink-Abhängigkeiten werden wie normale Java-Abhängigkeiten mit Apache Maven behandelt. 

**Anmerkung**  
Auf Ihrem Entwicklungscomputer müssen Sie Java JDK 11, Maven und Git installiert haben. Wir empfehlen Ihnen, eine Entwicklungsumgebung wie [Eclipse, Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) oder [IntelliJ](https://www.jetbrains.com/idea/) IDEA zu verwenden. Um zu überprüfen, ob Sie alle Voraussetzungen erfüllen, finden Sie unter. [Erfüllen Sie die Voraussetzungen für das Abschließen der Übungen](getting-started.md#setting-up-prerequisites) Sie müssen **keinen** Apache Flink-Cluster auf Ihrem Computer installieren. 

### Authentifizieren Sie Ihre Sitzung AWS
<a name="get-started-exercise-2-5"></a>

Die Anwendung verwendet Kinesis-Datenströme, um Daten zu veröffentlichen. Bei der lokalen Ausführung benötigen Sie eine gültige AWS authentifizierte Sitzung mit Schreibberechtigungen in den Kinesis-Datenstrom. Verwenden Sie die folgenden Schritte, um Ihre Sitzung zu authentifizieren:

1. Wenn Sie das Profil AWS CLI und ein benanntes Profil mit gültigen Anmeldeinformationen nicht konfiguriert haben, finden Sie weitere Informationen unter. [Richten Sie das AWS Command Line Interface ()AWS CLI ein](setup-awscli.md)

1. Vergewissern Sie sich, dass Ihre korrekt konfiguriert AWS CLI ist und dass Ihre Benutzer über Schreibberechtigungen in den Kinesis-Datenstrom verfügen, indem Sie den folgenden Testdatensatz veröffentlichen:

   ```
   $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
   ```

1. Wenn Ihre IDE über ein Plug-in zur Integration verfügt AWS, können Sie dieses verwenden, um die Anmeldeinformationen an die Anwendung zu übergeben, die in der IDE ausgeführt wird. Weitere Informationen finden Sie unter [AWS Toolkit für IntelliJ IDEA](https://aws.amazon.com/intellij/) und [AWS Toolkit](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html) for Eclipse.

## Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn
<a name="get-started-exercise-5"></a>

Der Java-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:

1. Klonen Sie das Remote-Repository, indem Sie den folgenden Befehl verwenden:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Navigieren Sie zum `amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted` Verzeichnis .

### Überprüfen Sie die Anwendungskomponenten
<a name="get-started-exercise-5-1"></a>

Die Anwendung ist vollständig in der `com.amazonaws.services.msf.BasicStreamingJob` Klasse implementiert. Die `main()` Methode definiert den Datenfluss, um die Streaming-Daten zu verarbeiten und auszuführen. 

**Anmerkung**  
Für ein optimiertes Entwicklererlebnis ist die Anwendung so konzipiert, dass sie ohne Codeänderungen sowohl auf Amazon Managed Service für Apache Flink als auch lokal für die Entwicklung in Ihrer IDE ausgeführt werden kann.
+ Um die Laufzeitkonfiguration zu lesen, damit sie funktioniert, wenn sie in Amazon Managed Service for Apache Flink und in Ihrer IDE ausgeführt wird, erkennt die Anwendung automatisch, ob sie lokal in der IDE eigenständig ausgeführt wird. In diesem Fall lädt die Anwendung die Laufzeitkonfiguration anders:

  1. Wenn die Anwendung feststellt, dass sie in Ihrer IDE im Standalone-Modus ausgeführt wird, erstellen Sie die `application_properties.json` Datei, die im **Ressourcenordner** des Projekts enthalten ist. Der Inhalt der Datei folgt.

  1. Wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird, lädt das Standardverhalten die Anwendungskonfiguration aus den Laufzeiteigenschaften, die Sie in der Amazon Managed Service for Apache Flink-Anwendung definieren. Siehe [Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink](#get-started-exercise-7).

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ Die `main()` Methode definiert den Anwendungsdatenfluss und führt ihn aus. 
  + Initialisiert die Standard-Streaming-Umgebungen. In diesem Beispiel zeigen wir, wie Sie sowohl die API für `StreamExecutionEnvironment` die Verwendung mit der DataSteam API als auch die API für `StreamTableEnvironment` die Verwendung mit SQL und der Tabelle erstellen. Die beiden Umgebungsobjekte sind zwei separate Verweise auf dieselbe Laufzeitumgebung, die unterschiedlich verwendet werden soll APIs. 

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    ```
  + Laden Sie die Konfigurationsparameter der Anwendung. Dadurch werden sie automatisch von der richtigen Stelle geladen, je nachdem, wo die Anwendung ausgeführt wird:

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + Die Anwendung definiert mithilfe des [Kinesis Consumer-Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-consumer) eine Quelle, um Daten aus dem Eingabestream zu lesen. Die Konfiguration des Eingabestreams ist im `PropertyGroupId` = `InputStream0` definiert. Der Name und die Region des Streams sind in den jeweiligen Eigenschaften `aws.region` benannt`stream.name`. Der Einfachheit halber liest diese Quelle die Datensätze als Zeichenfolge. 

    ```
    private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) {
        String inputStreamName = inputProperties.getProperty("stream.name");
        return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties);
    }
    ...
    
    public static void main(String[] args) throws Exception { 
       ...
       SourceFunction<String> source = createSource(applicationParameters.get("InputStream0"));
       DataStream<String> input = env.addSource(source, "Kinesis Source");  
       ...
    }
    ```
  + Die Anwendung definiert dann mithilfe des [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kinesis/#kinesis-streams-sink) Connectors eine Senke, um Daten an den Ausgabestrom zu senden. Name und Region des Ausgabestreams sind im `PropertyGroupId` = definiert`OutputStream0`, ähnlich wie beim Eingabestream. Die Senke ist direkt mit der internen Senke verbunden`DataStream`, die Daten von der Quelle bezieht. In einer echten Anwendung gibt es eine gewisse Transformation zwischen Quelle und Senke. 

    ```
    private static KinesisStreamsSink<String> createSink(Properties outputProperties) {
        String outputStreamName = outputProperties.getProperty("stream.name");
        return KinesisStreamsSink.<String>builder()
                .setKinesisClientProperties(outputProperties)
                .setSerializationSchema(new SimpleStringSchema())
                .setStreamName(outputStreamName)
                .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
                .build();
    }
    ...
    public static void main(String[] args) throws Exception { 
       ...
       Sink<String> sink = createSink(applicationParameters.get("OutputStream0"));
       input.sinkTo(sink);
       ...
    }
    ```
  + Schließlich führen Sie den Datenfluss aus, den Sie gerade definiert haben. Dies muss die letzte Anweisung der `main()` Methode sein, nachdem Sie alle Operatoren definiert haben, die für den Datenfluss erforderlich sind:

    ```
    env.execute("Flink streaming Java API skeleton");
    ```

### Verwenden Sie die Datei pom.xml
<a name="get-started-exercise-5-2"></a>

Die Datei pom.xml definiert alle Abhängigkeiten, die von der Anwendung benötigt werden, und richtet das Maven Shade-Plugin ein, um das Fat-Jar zu erstellen, das alle von Flink benötigten Abhängigkeiten enthält. 
+ Einige Abhängigkeiten haben einen Gültigkeitsbereich. `provided` Diese Abhängigkeiten sind automatisch verfügbar, wenn die Anwendung in Amazon Managed Service for Apache Flink ausgeführt wird. Sie sind erforderlich, um die Anwendung zu kompilieren oder die Anwendung lokal in Ihrer IDE auszuführen. Weitere Informationen finden Sie unter [Führen Sie Ihre Anwendung lokal aus](#get-started-exercise-5-run). Stellen Sie sicher, dass Sie dieselbe Flink-Version wie die Runtime verwenden, die Sie in Amazon Managed Service for Apache Flink verwenden werden.

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Sie müssen dem POM zusätzliche Apache Flink-Abhängigkeiten mit dem Standardbereich hinzufügen, z. B. den von dieser Anwendung verwendeten [Kinesis-Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kinesis/). Weitere Informationen finden Sie unter [Verwenden Sie Apache Flink-Konnektoren](how-flink-connectors.md). Sie können auch alle zusätzlichen Java-Abhängigkeiten hinzufügen, die für Ihre Anwendung erforderlich sind. 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kinesis</artifactId>
      <version>${aws.connector.version}</version>
  </dependency>
  ```
+ Das Maven Java Compiler-Plugin stellt sicher, dass der Code mit Java 11 kompiliert wird, der JDK-Version, die derzeit von Apache Flink unterstützt wird. 
+ Das Maven Shade-Plugin packt das Fat-Jar, mit Ausnahme einiger Bibliotheken, die von der Runtime bereitgestellt werden. Es spezifiziert auch zwei Transformatoren: und. `ServicesResourceTransformer` `ManifestResourceTransformer` Letzteres konfiguriert die Klasse, die die `main` Methode zum Starten der Anwendung enthält. Wenn Sie die Hauptklasse umbenennen, vergessen Sie nicht, diesen Transformator zu aktualisieren.
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## Schreiben Sie Beispieldatensätze in den Eingabestream
<a name="get-started-exercise-5-4"></a>

In diesem Abschnitt senden Sie Beispieldatensätze an den Stream, damit die Anwendung sie verarbeiten kann. Sie haben zwei Möglichkeiten, Beispieldaten zu generieren, entweder mit einem Python-Skript oder mit dem [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator).

### Generieren Sie Beispieldaten mit einem Python-Skript
<a name="get-started-exercise-5-4-1"></a>

Sie können ein Python-Skript verwenden, um Beispieldatensätze an den Stream zu senden.

**Anmerkung**  
Um dieses Python-Skript auszuführen, müssen Sie Python 3.x verwenden und die [AWS SDK for Python (Boto)](https://aws.amazon.com/developer/language/python/) -Bibliothek installiert haben.

**Um mit dem Senden von Testdaten an den Kinesis-Eingabestream zu beginnen:**

1. Laden Sie das `stock.py` Python-Skript für den Datengenerator aus dem [ GitHub Datengenerator-Repository](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/data-generator) herunter.

1. Führen Sie das `stock.py`Skript aus:

   ```
   $ python stock.py
   ```

Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen. Sie können jetzt Ihre Apache Flink-Anwendung ausführen.

### Generieren Sie Beispieldaten mit Kinesis Data Generator
<a name="get-started-exercise-5-4-2"></a>

Alternativ zur Verwendung des Python-Skripts können Sie [Kinesis Data Generator](https://github.com/awslabs/amazon-kinesis-data-generator) verwenden, der auch in einer [gehosteten Version](https://awslabs.github.io/amazon-kinesis-data-generator/web/producer.html) verfügbar ist, um Zufallsstichprobendaten an den Stream zu senden. Kinesis Data Generator läuft in Ihrem Browser und Sie müssen nichts auf Ihrem Computer installieren. 

**So richten Sie Kinesis Data Generator ein und führen ihn aus:**

1. Folgen Sie den Anweisungen in der [Kinesis Data Generator-Dokumentation](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html), um den Zugriff auf das Tool einzurichten. Sie werden eine CloudFormation Vorlage ausführen, die einen Benutzer und ein Passwort einrichtet. 

1. Greifen Sie über die von der CloudFormation Vorlage generierte URL auf Kinesis Data Generator zu. Sie finden die URL auf der Registerkarte „**Ausgabe**“, nachdem die CloudFormation Vorlage fertiggestellt wurde. 

1. Konfigurieren Sie den Datengenerator:
   + **Region:** Wählen Sie die Region aus, die Sie für dieses Tutorial verwenden: us-east-1
   + **Stream/Delivery-Stream:** Wählen Sie den Eingabestream aus, den die Anwendung verwenden soll: `ExampleInputStream`
   + **Datensätze pro Sekunde**: 100
   + **Datensatzvorlage:** Kopieren Sie die folgende Vorlage und fügen Sie sie ein:

     ```
     {
       "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}},
       "ticker" : "{{random.arrayElement(
             ["AAPL", "AMZN", "MSFT", "INTC", "TBV"]
         )}}",
       "price" : {{random.number(100)}}          
     }
     ```

1. Testen Sie die Vorlage: Wählen Sie **Testvorlage** und stellen Sie sicher, dass der generierte Datensatz dem folgenden ähnelt:

   ```
   { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
   ```

1. Starten Sie den Datengenerator: Wählen Sie **Select Send Data**.

Kinesis Data Generator sendet jetzt Daten an den`ExampleInputStream`. 

## Führen Sie Ihre Anwendung lokal aus
<a name="get-started-exercise-5-run"></a>

Sie können Ihre Flink-Anwendung lokal in Ihrer IDE ausführen und debuggen.

**Anmerkung**  
Bevor Sie fortfahren, stellen Sie sicher, dass die Eingabe- und Ausgabestreams verfügbar sind. Siehe [Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams](#get-started-exercise-1). Stellen Sie außerdem sicher, dass Sie über Lese- und Schreibberechtigungen für beide Streams verfügen. Siehe [Authentifizieren Sie Ihre Sitzung AWS](#get-started-exercise-2-5).   
Für die Einrichtung der lokalen Entwicklungsumgebung sind Java 11 JDK, Apache Maven und eine IDE für die Java-Entwicklung erforderlich. Stellen Sie sicher, dass Sie die erforderlichen Voraussetzungen erfüllen. Siehe [Erfüllen Sie die Voraussetzungen für das Abschließen der Übungen](getting-started.md#setting-up-prerequisites).

### Importieren Sie das Java-Projekt in Ihre IDE
<a name="get-started-exercise-5-run-1"></a>

Um mit der Arbeit an der Anwendung in Ihrer IDE zu beginnen, müssen Sie sie als Java-Projekt importieren. 

Das von Ihnen geklonte Repository enthält mehrere Beispiele. Jedes Beispiel ist ein separates Projekt. Importieren Sie für dieses Tutorial den Inhalt im `./java/GettingStarted` Unterverzeichnis in Ihre IDE. 

Fügen Sie den Code mithilfe von Maven als vorhandenes Java-Projekt ein.

**Anmerkung**  
Der genaue Vorgang zum Importieren eines neuen Java-Projekts hängt von der verwendeten IDE ab.

### Überprüfen Sie die lokale Anwendungskonfiguration
<a name="get-started-exercise-5-run-2"></a>

Bei der lokalen Ausführung verwendet die Anwendung die Konfiguration in der `application_properties.json` Datei im Ressourcenordner des Projekts unter`./src/main/resources`. Sie können diese Datei bearbeiten, um verschiedene Kinesis-Stream-Namen oder Regionen zu verwenden.

```
[
  {
    "PropertyGroupId": "InputStream0",
    "PropertyMap": {
      "stream.name": "ExampleInputStream",
      "flink.stream.initpos": "LATEST",
      "aws.region": "us-east-1"
    }
  },
  {
    "PropertyGroupId": "OutputStream0",
    "PropertyMap": {
      "stream.name": "ExampleOutputStream",
      "aws.region": "us-east-1"
    }
  }
]
```

### Richten Sie Ihre IDE-Run-Konfiguration ein
<a name="get-started-exercise-5-run-3"></a>

Sie können die Flink-Anwendung direkt von Ihrer IDE aus ausführen und debuggen, indem Sie die Hauptklasse ausführen`com.amazonaws.services.msf.BasicStreamingJob`, wie Sie jede Java-Anwendung ausführen würden. Bevor Sie die Anwendung ausführen, müssen Sie die Run-Konfiguration einrichten. Das Setup hängt von der IDE ab, die Sie verwenden. Weitere Informationen finden Sie beispielsweise unter [Konfigurationen ausführen/debuggen](https://www.jetbrains.com/help/idea/run-debug-configuration.html) in der IntelliJ IDEA-Dokumentation. Insbesondere müssen Sie Folgendes einrichten:

1. **Fügen Sie die `provided` Abhängigkeiten zum Klassenpfad** hinzu. Dies ist erforderlich, um sicherzustellen, dass die Abhängigkeiten mit `provided` Gültigkeitsbereich an die Anwendung übergeben werden, wenn sie lokal ausgeführt wird. Ohne diese Einrichtung zeigt die Anwendung sofort einen `class not found` Fehler an. 

1. **Übergeben Sie die AWS Anmeldeinformationen für den Zugriff auf die Kinesis-Streams an die Anwendung**. Am schnellsten ist es, das [AWS Toolkit für IntelliJ](https://aws.amazon.com/intellij/) IDEA zu verwenden. Mit diesem IDE-Plugin in der Run-Konfiguration können Sie ein bestimmtes Profil auswählen. AWS AWS Die Authentifizierung erfolgt mit diesem Profil. Sie müssen die AWS Anmeldeinformationen nicht direkt weitergeben. 

1. Stellen Sie sicher, dass die IDE die Anwendung mit **JDK 11** ausführt.

### Führen Sie die Anwendung in Ihrer IDE aus
<a name="get-started-exercise-5-run-4"></a>

Nachdem Sie die Run-Konfiguration für eingerichtet haben`BasicStreamingJob`, können Sie sie wie eine normale Java-Anwendung ausführen oder debuggen. 

**Anmerkung**  
Sie können das von Maven generierte Fat-Jar nicht direkt über die `java -jar ...` Befehlszeile ausführen. Dieses JAR enthält nicht die Kernabhängigkeiten von Flink, die für die eigenständige Ausführung der Anwendung erforderlich sind.

Wenn die Anwendung erfolgreich gestartet wird, protokolliert sie einige Informationen über den eigenständigen Minicluster und die Initialisierung der Konnektoren. Darauf folgen eine Reihe von INFO- und einige WARN-Logs, die Flink normalerweise beim Start der Anwendung ausgibt.

```
13:43:31,405 INFO  com.amazonaws.services.msf.BasicStreamingJob                 [] - Loading application properties from 'flink-application-properties-dev.json'
13:43:31,549 INFO  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb.
13:43:31,676 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb.
13:43:31,677 INFO  org.apache.flink.runtime.minicluster.MiniCluster             [] - Starting Flink Mini Cluster
....
```

Nach Abschluss der Initialisierung gibt die Anwendung keine weiteren Protokolleinträge aus. **Während der Datenfluss erfolgt, wird kein Protokoll ausgegeben.**

Um zu überprüfen, ob die Anwendung Daten korrekt verarbeitet, können Sie die Eingabe- und Ausgabe-Kinesis-Streams überprüfen, wie im folgenden Abschnitt beschrieben.

**Anmerkung**  
 Es ist das normale Verhalten einer Flink-Anwendung, keine Protokolle über fließende Daten auszugeben. Das Ausgeben von Protokollen für jeden Datensatz mag für das Debuggen praktisch sein, kann aber bei der Ausführung in der Produktion zu erheblichem Mehraufwand führen. 

## Beobachten Sie Eingabe- und Ausgabedaten in Kinesis-Streams
<a name="get-started-exercise-input-output"></a>

Sie können Datensätze beobachten, die vom (generierenden Beispiel-Python) oder dem Kinesis Data Generator (Link) an den Eingabestream gesendet wurden, indem Sie den **Data Viewer** in der Amazon Kinesis Kinesis-Konsole verwenden. 

**Um Aufzeichnungen zu beobachten**

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Stellen Sie sicher, dass die Region mit der Region übereinstimmt, in der Sie dieses Tutorial ausführen, und zwar standardmäßig us-east-1 US East (Nord-Virginia). Ändern Sie die Region, falls sie nicht übereinstimmt. 

1. Wählen Sie **Datenströme**. 

1. Wählen Sie den Stream aus, den Sie beobachten möchten, entweder `ExampleInputStream` oder `ExampleOutputStream.`

1. Wählen Sie die Registerkarte „**Datenanzeige**“. 

1. Wählen Sie einen beliebigen **Shard** aus, behalten Sie „**Letzte**“ als **Startposition** bei und wählen Sie dann „**Datensätze abrufen**“. Möglicherweise wird die Fehlermeldung „Für diese Anfrage wurde kein Datensatz gefunden“ angezeigt. Wenn ja, wählen Sie „**Erneut versuchen, Datensätze abzurufen**“. Die neuesten Datensätze, die im Stream veröffentlicht wurden, werden angezeigt. 

1. Wählen Sie den Wert in der Datenspalte aus, um den Inhalt des Datensatzes im JSON-Format zu überprüfen.

## Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
<a name="get-started-exercise-stop"></a>

Stoppen Sie die Anwendung, die in Ihrer IDE ausgeführt wird. Die IDE bietet normalerweise eine „Stopp“ -Option. Der genaue Standort und die Methode hängen von der IDE ab, die Sie verwenden. 

## Kompilieren und verpacken Sie Ihren Anwendungscode
<a name="get-started-exercise-5-5"></a>

In diesem Abschnitt verwenden Sie Apache Maven, um den Java-Code zu kompilieren und in eine JAR-Datei zu packen. Sie können Ihren Code mit dem Maven-Befehlszeilentool oder Ihrer IDE kompilieren und verpacken.

**Um mit der Maven-Befehlszeile zu kompilieren und zu paketieren:**

Gehen Sie in das Verzeichnis, das das GettingStarted Java-Projekt enthält, und führen Sie den folgenden Befehl aus:

```
$ mvn package
```

**Um mit Ihrer IDE zu kompilieren und zu paketieren:**

Führen Sie es `mvn package` von Ihrer IDE-Maven-Integration aus aus.

In beiden Fällen wird die folgende JAR-Datei erstellt:`target/amazon-msf-java-stream-app-1.0.jar`.

**Anmerkung**  
 Wenn Sie ein „Build-Projekt“ von Ihrer IDE aus ausführen, wird die JAR-Datei möglicherweise nicht erstellt.

## Laden Sie die JAR-Datei mit dem Anwendungscode hoch
<a name="get-started-exercise-6"></a>

In diesem Abschnitt laden Sie die JAR-Datei, die Sie im vorherigen Abschnitt erstellt haben, in den Amazon Simple Storage Service (Amazon S3) -Bucket hoch, den Sie zu Beginn dieses Tutorials erstellt haben. Wenn Sie diesen Schritt noch nicht abgeschlossen haben, finden Sie weitere Informationen unter (Link).

**Um die JAR-Datei mit dem Anwendungscode hochzuladen**

1. Öffnen Sie die Amazon S3 S3-Konsole unter [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Wählen Sie den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben.

1. Klicken Sie auf **Upload**.

1. Klicken Sie auf **Add files**.

1. Navigieren Sie zu der im vorherigen Schritt generierten JAR-Datei:`target/amazon-msf-java-stream-app-1.0.jar`. 

1. Wählen Sie **Hochladen**, ohne andere Einstellungen zu ändern.

**Warnung**  
Stellen Sie sicher, dass Sie die richtige JAR-Datei in auswählen`<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar`.   
Das `target` Verzeichnis enthält auch andere JAR-Dateien, die Sie nicht hochladen müssen.

## Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
<a name="get-started-exercise-7"></a>

Sie können eine Anwendung von Managed Service für Apache Flink entweder über die Konsole oder AWS CLI erstellen und ausführen. Für dieses Tutorial verwenden Sie die Konsole. 

**Anmerkung**  
Wenn Sie die Anwendung mithilfe der Konsole erstellen, werden Ihre AWS Identity and Access Management (IAM) und Amazon CloudWatch Logs-Ressourcen für Sie erstellt. Wenn Sie die Anwendung mithilfe von erstellen AWS CLI, erstellen Sie diese Ressourcen separat.

**Topics**
+ [Erstellen der Anwendung](#get-started-exercise-7-console-create)
+ [Bearbeiten Sie die IAM-Richtlinie](#get-started-exercise-7-console-iam)
+ [Konfigurieren Sie die Anwendung](#get-started-exercise-7-console-configure)
+ [Führen Sie die Anwendung aus.](#get-started-exercise-7-console-run)
+ [Beobachten Sie die Metriken der laufenden Anwendung](#get-started-exercise-7-console-stop)
+ [Beobachten Sie die Ausgabedaten in Kinesis-Streams](#get-started-exercise-7-console-output)
+ [Beenden Sie die Anwendung](#get-started-exercise-stop)

### Erstellen der Anwendung
<a name="get-started-exercise-7-console-create"></a>

**So erstellen Sie die Anwendung**

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon MSF-Konsole unter https://console.aws.amazon.com /flink.

1. Stellen Sie sicher, dass die richtige Region ausgewählt ist: us-east-1 US East (Nord-Virginia)

1. Öffnen Sie das Menü auf der rechten Seite und wählen Sie **Apache Flink-Anwendungen und dann Streaming-Anwendung** **erstellen**. Wählen Sie alternativ im Container Erste Schritte auf der Startseite die Option **Streaming-Anwendung erstellen** aus. 

1. Gehen Sie auf der Seite **Streaming-Anwendung erstellen** wie folgt vor:
   + **Wählen Sie eine Methode, um die Stream-Verarbeitungsanwendung einzurichten:** Wählen Sie **Von Grund auf neu erstellen**.
   + **Apache Flink-Konfiguration, Flink-Version der Anwendung: Wählen Sie **Apache Flink**** 1.20.

1. Konfigurieren Sie Ihre Anwendung
   + **Name der Anwendung:** Geben Sie ein**MyApplication**.
   + **Beschreibung:** eingeben**My java test app**.
   + **Zugriff auf Anwendungsressourcen:** Wählen Sie „**IAM-Rolle `kinesis-analytics-MyApplication-us-east-1` mit den erforderlichen Richtlinien erstellen/aktualisieren**“.

1. Konfigurieren Sie Ihre **Vorlage für Anwendungseinstellungen**
   + **Vorlagen:** Wählen Sie **Entwicklung**.

1. Wählen Sie unten auf der Seite die Option **Streaming-Anwendung erstellen** aus.

**Anmerkung**  
Beim Erstellen einer Anwendung von Managed Service für Apache Flink mit der Konsole haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:  
Richtlinie: `kinesis-analytics-service-MyApplication-us-east-1`
Rolle: `kinesisanalytics-MyApplication-us-east-1`
Amazon Managed Service für Apache Flink war früher als Kinesis Data Analytics bekannt. Dem Namen der Ressourcen, die automatisch erstellt werden, wird aus Gründen der Abwärtskompatibilität ein Präfix `kinesis-analytics-` vorangestellt.

### Bearbeiten Sie die IAM-Richtlinie
<a name="get-started-exercise-7-console-iam"></a>

Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf die Kinesis-Datenströme.

**Um die Richtlinie zu bearbeiten**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie **Policies (Richtlinien)**. Wählen Sie die **`kinesis-analytics-service-MyApplication-us-east-1`**-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. 

1. Wählen Sie **Bearbeiten** und dann die Registerkarte **JSON**.

1. Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (*012345678901*) durch Ihre Konto-ID.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

1.  Wählen Sie unten auf der Seite **Weiter** und dann **Änderungen speichern** aus.

### Konfigurieren Sie die Anwendung
<a name="get-started-exercise-7-console-configure"></a>

Bearbeiten Sie die Anwendungskonfiguration, um das Anwendungscode-Artefakt festzulegen.

**Um die Konfiguration zu bearbeiten**

1. Wählen Sie auf der **MyApplication**Seite **Configure** aus.

1. Gehen Sie im Abschnitt **Speicherort des Anwendungscodes** wie folgt vor:
   + Wählen Sie für **Amazon S3 S3-Bucket** den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben. Wählen Sie **Durchsuchen** und wählen Sie den richtigen Bucket aus. **Wählen** Sie dann Auswählen aus. Klicken Sie nicht auf den Bucket-Namen.
   + Geben Sie als **Pfad zum Amazon-S3-Objekt** den Wert **amazon-msf-java-stream-app-1.0.jar** ein.

1. Wählen Sie für **Zugriffsberechtigungen** die Option **IAM-Rolle `kinesis-analytics-MyApplication-us-east-1` mit den erforderlichen Richtlinien erstellen/aktualisieren** aus.

1. Fügen Sie im Abschnitt **Runtime-Eigenschaften** die folgenden Eigenschaften hinzu.

1. Wählen **Sie Neues Element** hinzufügen und fügen Sie jeden der folgenden Parameter hinzu:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/get-started-exercise.html)

1. Ändern Sie keinen der anderen Abschnitte.

1. Wählen Sie **Änderungen speichern ** aus.

**Anmerkung**  
Wenn Sie sich dafür entscheiden, die CloudWatch Amazon-Protokollierung zu aktivieren, erstellt Managed Service für Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:   
Protokollgruppe: `/aws/kinesis-analytics/MyApplication`
Protokollstream: `kinesis-analytics-log-stream`

### Führen Sie die Anwendung aus.
<a name="get-started-exercise-7-console-run"></a>

Die Anwendung ist jetzt konfiguriert und kann ausgeführt werden.

**Ausführen der Anwendung**

1. Wählen Sie auf der Konsole für Amazon Managed Service für Apache Flink **My Application** und anschließend **Run** aus.

1. Wählen Sie auf der nächsten Seite, der Konfigurationsseite für die Anwendungswiederherstellung, die Option **Mit neuestem Snapshot ausführen** und anschließend **Ausführen** aus. 

   Der **Status** in den **Anwendungsdetails** wechselt von `Ready` zu `Starting` und dann zu dem `Running` Zeitpunkt, an dem die Anwendung gestartet wurde.

Wenn sich die Anwendung im `Running` Status befindet, können Sie jetzt das Flink-Dashboard öffnen. 

**So öffnen Sie das -Dashboard**

1. Wählen Sie **Apache Flink-Dashboard öffnen**. Das Dashboard wird auf einer neuen Seite geöffnet.

1. Wählen Sie in der Liste „**Laufende Jobs**“ den einzelnen Job aus, den Sie sehen können. 
**Anmerkung**  
Wenn Sie die Runtime-Eigenschaften festgelegt oder die IAM-Richtlinien falsch bearbeitet haben, ändert sich der Anwendungsstatus möglicherweise in`Running`, aber das Flink-Dashboard zeigt an, dass der Job kontinuierlich neu gestartet wird. Dies ist ein häufiges Fehlerszenario, wenn die Anwendung falsch konfiguriert ist oder keine Zugriffsberechtigungen für die externen Ressourcen hat.   
In diesem Fall überprüfen Sie im Flink-Dashboard auf der Registerkarte **Ausnahmen** die Ursache des Problems.

### Beobachten Sie die Metriken der laufenden Anwendung
<a name="get-started-exercise-7-console-stop"></a>

Auf der **MyApplication**Seite, im Abschnitt ** CloudWatch Amazon-Metriken**, können Sie einige der grundlegenden Metriken der laufenden Anwendung sehen. 

**Um die Metriken einzusehen**

1. Wählen Sie neben der Schaltfläche „**Aktualisieren**“ in der Dropdownliste die Option **10 Sekunden** aus.

1. Wenn die Anwendung läuft und fehlerfrei ist, können Sie sehen, dass die **Uptime-Metrik** kontinuierlich zunimmt.

1. Die Metrik für **vollständige Neustarts** sollte Null sein. Wenn sie zunimmt, kann es bei der Konfiguration zu Problemen kommen. Um das Problem zu untersuchen, sehen Sie sich den Tab **Ausnahmen** im Flink-Dashboard an.

1. Die Metrik „**Anzahl fehlgeschlagener Checkpoints**“ sollte in einer fehlerfreien Anwendung Null sein. 
**Anmerkung**  
Dieses Dashboard zeigt einen festen Satz von Metriken mit einer Granularität von 5 Minuten. Sie können ein benutzerdefiniertes Anwendungs-Dashboard mit beliebigen Metriken im CloudWatch Dashboard erstellen.

### Beobachten Sie die Ausgabedaten in Kinesis-Streams
<a name="get-started-exercise-7-console-output"></a>

Vergewissern Sie sich, dass Sie weiterhin Daten in der Eingabe veröffentlichen, entweder mit dem Python-Skript oder dem Kinesis Data Generator. 

Sie können jetzt die Ausgabe der Anwendung beobachten, die auf Managed Service for Apache Flink ausgeführt wird, indem Sie den Datenviewer in der verwenden [https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/), ähnlich wie Sie es bereits zuvor getan haben. 

**Um die Ausgabe anzusehen**

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Stellen Sie sicher, dass die Region mit der Region übereinstimmt, die Sie für die Ausführung dieses Tutorials verwenden. Standardmäßig ist es US-East-1US East (Nord-Virginia). Ändern Sie bei Bedarf die Region.

1. Wählen Sie **Data Streams**. 

1. Wählen Sie den Stream aus, den Sie beobachten möchten. Verwenden Sie für dieses Tutorial `ExampleOutputStream`. 

1.  Wählen Sie die Registerkarte **Datenanzeige**. 

1. Wählen Sie einen beliebigen **Shard** aus, behalten Sie „**Letzte**“ als **Startposition** bei und wählen Sie dann „**Datensätze abrufen**“. Möglicherweise wird die Fehlermeldung „Für diese Anfrage wurde kein Datensatz gefunden“ angezeigt. Wenn ja, wählen Sie „**Erneut versuchen, Datensätze abzurufen**“. Die neuesten Datensätze, die im Stream veröffentlicht wurden, werden angezeigt.

1. Wählen Sie den Wert in der Datenspalte aus, um den Inhalt des Datensatzes im JSON-Format zu überprüfen.

### Beenden Sie die Anwendung
<a name="get-started-exercise-stop"></a>

Um die Anwendung zu beenden, rufen Sie die Konsolenseite der Anwendung Managed Service for Apache Flink mit dem Namen auf. `MyApplication`

**So stoppen Sie die Anwendung**

1. **Wählen Sie in der Dropdownliste **Aktion** die Option Stopp aus.**

1. Der **Status** in den **Anwendungsdetails** wechselt von `Running` zu und dann zu dem `Ready` Zeitpunkt`Stopping`, an dem die Anwendung vollständig gestoppt wurde. 
**Anmerkung**  
Vergessen Sie nicht, auch das Senden von Daten aus dem Python-Skript oder dem Kinesis Data Generator an den Eingabestream zu beenden.

## Nächster Schritt
<a name="get-started-exercise-next-step-4"></a>

[Ressourcen bereinigen AWS](getting-started-cleanup.md)