Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Crie um aplicativo usando o Apache Beam
Neste exercício, você cria um aplicativo Managed Service for Apache Flink que transforma dados usando o Apache Beam.
nota
Para configurar os pré-requisitos necessários para este exercício, primeiro conclua o exercício Tutorial: Comece a usar o serviço DataStream API gerenciado para Apache Flink.
Este tópico contém as seguintes seções:
Crie recursos dependentes
Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:
Dois fluxos de dados do Kinesis (
ExampleInputStream
eExampleOutputStream
)Um bucket do Amazon S3 para armazenar o código do aplicativo (
ka-app-code-
)<username>
Você pode criar os fluxos do Kinesis e o bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esses recursos, consulte os tópicos a seguir:
Criando e atualizando fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams. Nomeie seus fluxos de dados
ExampleInputStream
eExampleOutputStream
.Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login, como
ka-app-code-
.<username>
Grave registros de amostra no fluxo de entrada
Nesta seção, você usa um script Python para gravar strings aleatórias no stream para o aplicativo processar.
nota
Essa seção requer AWS SDK for Python (Boto)
-
Crie um arquivo denominado
ping.py
com o conteúdo a seguir:import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
-
Execute o script
ping.py
:$ python ping.py
Mantenha o script em execução enquanto você conclui o restante do tutorial.
Baixe e examine o código do aplicativo
O código do aplicativo Java para este exemplo está disponível em GitHub. Para fazer download do código do aplicativo, faça o seguinte:
Instale o cliente do Git se você ainda não tiver feito isso. Para obter mais informações, consulte Instalando o Git
. Duplique o repositório remoto com o seguinte comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
Navegue até o diretório
amazon-kinesis-data-analytics-java-examples/Beam
.
O código do aplicativo está localizado no arquivo BasicBeamStreamingJob.java
. Observe o seguinte sobre o código do aplicativo:
O aplicativo usa o Apache Beam ParDo
para processar registros recebidos invocando uma função de transformação personalizada chamada. PingPongFn
O código para invocar a função
PingPongFn
é o seguinte:.apply("Pong transform", ParDo.of(new PingPongFn())
O serviço gerenciado para aplicativos Apache Flink que usam o Apache Beam requer os seguintes componentes. Se você não incluir esses componentes e versões no seu
pom.xml
, seu aplicativo carregará as versões incorretas das dependências do ambiente e, como as versões não coincidem, seu aplicativo falhará no runtime.<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
A função de transformação
PingPongFn
passa os dados de entrada para o fluxo de saída, a menos que os dados de entrada sejam ping. Nesse caso, ela emite a string pong\npara o fluxo de saída.O código da função de transformação é o seguinte:
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
Compilar o código do aplicativo
Para compilar o aplicativo, faça o seguinte:
Instale o Java e o Maven, caso ainda não o tenha feito. Para obter mais informações, consulte Preencha os pré-requisitos necessários no tutorial Tutorial: Comece a usar o serviço DataStream API gerenciado para Apache Flink.
Compile o aplicativo com o seguinte comando:
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
nota
O código-fonte fornecido depende de bibliotecas do Java 11.
A compilação do aplicativo cria o JAR arquivo do aplicativo (target/basic-beam-app-1.0.jar
).
Faça o upload do código Java de streaming do Apache Flink
Nesta seção, você faz o upload do seu aplicativo no bucket do Amazon S3 que você criou na seção Crie recursos dependentes.
-
No console do Amazon S3, escolha o - ka-app-code
<username>
bucket e escolha Upload. -
Na etapa Selecionar arquivos, selecione Adicionar arquivos. Navegue até o arquivo
basic-beam-app-1.0.jar
que você criou na etapa anterior. Você não precisa alterar nenhuma das configurações para o objeto, em seguida, selecione Upload.
O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pelo aplicativo.
Crie e execute o serviço gerenciado para o aplicativo Apache Flink
Siga estas etapas para criar, configurar, atualizar e executar o aplicativo usando o console.
Criar o aplicativo
Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com
-
No painel do Managed Service for Apache Flink, selecione Criar aplicativo de análise.
-
Na página Managed Service for Apache Flink - Criar aplicativo, forneça os detalhes do aplicativo da seguinte forma:
-
Em Nome do aplicativo, insira
MyApplication
. -
Em Runtime, selecione Apache Flink.
nota
Atualmente, o Apache Beam não é compatível com a versão 1.19 ou posterior do Apache Flink.
Selecione Apache Flink versão 1.15 no menu suspenso de versões.
-
-
Para permissões de acesso, escolha Criar/atualizar IAM função
kinesis-analytics-MyApplication-us-west-2
. -
Selecione Create application (Criar aplicativo).
nota
Ao criar um serviço gerenciado para o aplicativo Apache Flink usando o console, você tem a opção de criar uma IAM função e uma política para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses IAM recursos são nomeados usando o nome do aplicativo e a região da seguinte forma:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Função:
kinesis-analytics-MyApplication-
us-west-2
Edite a IAM política
Edite a IAM política para adicionar permissões para acessar os fluxos de dados do Kinesis.
Abra o IAM console em https://console.aws.amazon.com/iam/
. -
Selecione Policies (Políticas). Selecione a política
kinesis-analytics-service-MyApplication-us-west-2
que o console criou para você na seção anterior. -
Na página Summary (Resumo), selecione Edit policy (Editar política). Escolha a JSONguia.
-
Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (
012345678901
) com o ID da sua conta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:
012345678901
:log-group:*", "arn:aws:s3:::ka-app-code-<username>
/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901
:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" }
Configurar o aplicativo
-
Na MyApplicationpágina, escolha Configurar.
-
Na página Configurar aplicativo, forneça o Local do código:
-
Em Bucket do Amazon S3, insira
ka-app-code-
.<username>
-
Em Caminho do objeto do Amazon S3, insira
basic-beam-app-1.0.jar
.
-
-
Em Acesso aos recursos do aplicativo, para permissões de acesso, escolha Criar/atualizar IAM função
kinesis-analytics-MyApplication-us-west-2
. -
Insira o seguinte:
ID do grupo Chave Valor BeamApplicationProperties
InputStreamName
ExampleInputStream
BeamApplicationProperties
OutputStreamName
ExampleOutputStream
BeamApplicationProperties
AwsRegion
us-west-2
-
Em Monitoramento, confirme se Nível de monitoramento de métricas está definido como Aplicativo.
-
Para CloudWatch registrar, marque a caixa de seleção Ativar.
-
Selecione Atualizar.
nota
Quando você opta por ativar o CloudWatch registro, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:
-
Grupo de logs:
/aws/kinesis-analytics/MyApplication
-
Fluxo de logs:
kinesis-analytics-log-stream
Esse fluxo de logs é usado para monitorar o aplicativo. Esse não é o mesmo fluxo de logs que o aplicativo usa para enviar resultados.
Execute o aplicativo
O gráfico de tarefas do Flink pode ser visualizado executando o aplicativo, abrindo o painel do Apache Flink e selecionando a tarefa desejada do Flink.
Você pode verificar as métricas do Managed Service for Apache Flink no CloudWatch console para verificar se o aplicativo está funcionando.
Limpe AWS os recursos
Esta seção inclui procedimentos para limpar AWS recursos criados no tutorial Tumbling Window.
Este tópico contém as seguintes seções:
Exclua seu aplicativo Managed Service for Apache Flink
Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com
no painel Managed Service for Apache Flink, escolha. MyApplication
Na página do aplicativo, selecione Excluir e, em seguida, confirme a exclusão.
Exclua seus streams de dados do Kinesis
Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.
No painel Kinesis Data Streams, escolha. ExampleInputStream
Na ExampleInputStreampágina, escolha Excluir Kinesis Stream e confirme a exclusão.
Na página Kinesis Streams, escolha o, escolha Ações ExampleOutputStream, escolha Excluir e confirme a exclusão.
Exclua seu objeto e bucket do Amazon S3
Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/
Escolha o ka-app-code -
<username>
balde.Selecione Excluir e, em seguida, insira o nome do bucket para confirmar a exclusão.
Exclua seus IAM recursos
Abra o IAM console em https://console.aws.amazon.com/iam/
. Na barra de navegação, selecione Políticas.
No controle do filtro, insira kinesis.
Escolha a política kinesis-analytics-service- MyApplication -us-west-2.
Selecione Ações da política e, em seguida, Excluir.
Na barra de navegação, selecione Roles (Funções).
Escolha a função kinesis-analytics- MyApplication -us-west-2.
Selecione Excluir função e, em seguida, confirme a exclusão.
Exclua seus CloudWatch recursos
Abra o CloudWatch console em https://console.aws.amazon.com/cloudwatch/
. No painel de navegação, selecione Logs.
Escolha o grupo de registros MyApplication/aws/kinesis-analytics/.
Selecione Excluir grupo de logs e, em seguida, confirme a exclusão.
Próximas etapas
Agora que você criou e executou um aplicativo básico do Managed Service for Apache Flink que transforma dados usando o Apache Beam, consulte o aplicativo a seguir para obter um exemplo de uma solução mais avançada do Managed Service for Apache Flink.
Workshop sobre o Beam on Managed Service for Apache Flink Streaming
: Neste workshop, exploramos um exemplo completo que combina aspectos de lote e streaming em um pipeline uniforme do Apache Beam.