Crie um aplicativo usando o Apache Beam - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie um aplicativo usando o Apache Beam

Neste exercício, você cria um aplicativo Managed Service for Apache Flink que transforma dados usando o Apache Beam. O Apache Beam é um modelo de programação para processar dados de streaming. Para obter informações sobre como usar o Apache Beam com o Managed Service para Apache Flink, consulte. Use o Apache Beam com o Managed Service para aplicativos Apache Flink

nota

Para configurar os pré-requisitos necessários para este exercício, primeiro conclua o exercício Tutorial: Comece a usar o serviço DataStream API gerenciado para Apache Flink.

Crie recursos dependentes

Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Dois fluxos de dados do Kinesis (ExampleInputStream e ExampleOutputStream)

  • Um bucket do Amazon S3 para armazenar o código do aplicativo (ka-app-code-<username>)

Você pode criar os fluxos do Kinesis e o bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esses recursos, consulte os tópicos a seguir:

  • Criando e atualizando fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams. Nomeie seus fluxos de dados ExampleInputStream e ExampleOutputStream.

  • Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login, como ka-app-code-<username>.

Grave registros de amostra no fluxo de entrada

Nesta seção, você usa um script Python para gravar strings aleatórias no stream para o aplicativo processar.

nota

Essa seção requer AWS SDK for Python (Boto).

  1. Crie um arquivo denominado ping.py com o conteúdo a seguir:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Execute o script ping.py:

    $ python ping.py

    Mantenha o script em execução enquanto você conclui o restante do tutorial.

Baixe e examine o código do aplicativo

O código do aplicativo Java para este exemplo está disponível em GitHub. Para fazer download do código do aplicativo, faça o seguinte:

  1. Instale o cliente do Git se você ainda não tiver feito isso. Para obter mais informações, consulte Instalando o Git.

  2. Duplique o repositório remoto com o seguinte comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Navegue até o diretório amazon-kinesis-data-analytics-java-examples/Beam.

O código do aplicativo está localizado no arquivo BasicBeamStreamingJob.java. Observe o seguinte sobre o código do aplicativo:

  • O aplicativo usa o Apache Beam ParDopara processar registros recebidos invocando uma função de transformação personalizada chamada. PingPongFn

    O código para invocar a função PingPongFn é o seguinte:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • O serviço gerenciado para aplicativos Apache Flink que usam o Apache Beam requer os seguintes componentes. Se você não incluir esses componentes e versões no seu pom.xml, seu aplicativo carregará as versões incorretas das dependências do ambiente e, como as versões não coincidem, seu aplicativo falhará no runtime.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • A função de transformação PingPongFn passa os dados de entrada para o fluxo de saída, a menos que os dados de entrada sejam ping. Nesse caso, ela emite a string pong\npara o fluxo de saída.

    O código da função de transformação é o seguinte:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Compilar o código do aplicativo

Para compilar o aplicativo, faça o seguinte:

  1. Instale o Java e o Maven, caso ainda não o tenha feito. Para obter mais informações, consulte Preencha os pré-requisitos necessários no tutorial Tutorial: Comece a usar o serviço DataStream API gerenciado para Apache Flink.

  2. Compile o aplicativo com o seguinte comando:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    nota

    O código-fonte fornecido depende de bibliotecas do Java 11.

A compilação do aplicativo cria o JAR arquivo do aplicativo (target/basic-beam-app-1.0.jar).

Faça o upload do código Java de streaming do Apache Flink

Nesta seção, você faz o upload do seu aplicativo no bucket do Amazon S3 que você criou na seção Crie recursos dependentes.

  1. No console do Amazon S3, escolha o - ka-app-code<username>bucket e escolha Upload.

  2. Na etapa Selecionar arquivos, selecione Adicionar arquivos. Navegue até o arquivo basic-beam-app-1.0.jar que você criou na etapa anterior.

  3. Você não precisa alterar nenhuma das configurações para o objeto, em seguida, selecione Upload.

O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pelo aplicativo.

Crie e execute o serviço gerenciado para o aplicativo Apache Flink

Siga estas etapas para criar, configurar, atualizar e executar o aplicativo usando o console.

Criar o aplicativo

  1. Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com

  2. No painel do Managed Service for Apache Flink, selecione Criar aplicativo de análise.

  3. Na página Managed Service for Apache Flink - Criar aplicativo, forneça os detalhes do aplicativo da seguinte forma:

    • Em Nome do aplicativo, insira MyApplication.

    • Em Runtime, selecione Apache Flink.

      nota

      Atualmente, o Apache Beam não é compatível com a versão 1.19 ou posterior do Apache Flink.

    • Selecione Apache Flink versão 1.15 no menu suspenso de versões.

  4. Para permissões de acesso, escolha Criar/atualizar IAM função kinesis-analytics-MyApplication-us-west-2.

  5. Selecione Create application (Criar aplicativo).

nota

Ao criar um serviço gerenciado para o aplicativo Apache Flink usando o console, você tem a opção de criar uma IAM função e uma política para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses IAM recursos são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Função: kinesis-analytics-MyApplication-us-west-2

Edite a IAM política

Edite a IAM política para adicionar permissões para acessar os fluxos de dados do Kinesis.

  1. Abra o IAM console em https://console.aws.amazon.com/iam/.

  2. Selecione Policies (Políticas). Selecione a política kinesis-analytics-service-MyApplication-us-west-2 que o console criou para você na seção anterior.

  3. Na página Summary (Resumo), selecione Edit policy (Editar política). Escolha a JSONguia.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (012345678901) com o ID da sua conta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configurar o aplicativo

  1. Na MyApplicationpágina, escolha Configurar.

  2. Na página Configurar aplicativo, forneça o Local do código:

    • Em Bucket do Amazon S3, insira ka-app-code-<username>.

    • Em Caminho do objeto do Amazon S3, insira basic-beam-app-1.0.jar.

  3. Em Acesso aos recursos do aplicativo, para permissões de acesso, escolha Criar/atualizar IAM função kinesis-analytics-MyApplication-us-west-2.

  4. Insira o seguinte:

    ID do grupo Chave Valor
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. Em Monitoramento, confirme se Nível de monitoramento de métricas está definido como Aplicativo.

  6. Para CloudWatch registrar, marque a caixa de seleção Ativar.

  7. Selecione Atualizar.

nota

Quando você opta por ativar o CloudWatch registro, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Esse fluxo de logs é usado para monitorar o aplicativo. Esse não é o mesmo fluxo de logs que o aplicativo usa para enviar resultados.

Execute o aplicativo

O gráfico de tarefas do Flink pode ser visualizado executando o aplicativo, abrindo o painel do Apache Flink e selecionando a tarefa desejada do Flink.

Você pode verificar as métricas do Managed Service for Apache Flink no CloudWatch console para verificar se o aplicativo está funcionando.

Limpe AWS os recursos

Esta seção inclui procedimentos para limpar AWS recursos criados no tutorial Tumbling Window.

Exclua seu aplicativo Managed Service for Apache Flink

  1. Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com

  2. no painel Managed Service for Apache Flink, escolha. MyApplication

  3. Na página do aplicativo, selecione Excluir e, em seguida, confirme a exclusão.

Exclua seus streams de dados do Kinesis

  1. Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.

  2. No painel Kinesis Data Streams, escolha. ExampleInputStream

  3. Na ExampleInputStreampágina, escolha Excluir Kinesis Stream e confirme a exclusão.

  4. Na página Kinesis Streams, escolha o, escolha Ações ExampleOutputStream, escolha Excluir e confirme a exclusão.

Exclua seu objeto e bucket do Amazon S3

  1. Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/

  2. Escolha o ka-app-code -<username> balde.

  3. Selecione Excluir e, em seguida, insira o nome do bucket para confirmar a exclusão.

Exclua seus IAM recursos

  1. Abra o IAM console em https://console.aws.amazon.com/iam/.

  2. Na barra de navegação, selecione Políticas.

  3. No controle do filtro, insira kinesis.

  4. Escolha a política kinesis-analytics-service- MyApplication -us-west-2.

  5. Selecione Ações da política e, em seguida, Excluir.

  6. Na barra de navegação, selecione Roles (Funções).

  7. Escolha a função kinesis-analytics- MyApplication -us-west-2.

  8. Selecione Excluir função e, em seguida, confirme a exclusão.

Exclua seus CloudWatch recursos

  1. Abra o CloudWatch console em https://console.aws.amazon.com/cloudwatch/.

  2. No painel de navegação, selecione Logs.

  3. Escolha o grupo de registros MyApplication/aws/kinesis-analytics/.

  4. Selecione Excluir grupo de logs e, em seguida, confirme a exclusão.

Próximas etapas

Agora que você criou e executou um aplicativo básico do Managed Service for Apache Flink que transforma dados usando o Apache Beam, consulte o aplicativo a seguir para obter um exemplo de uma solução mais avançada do Managed Service for Apache Flink.