As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Neste exercício, será criado um aplicativo Managed Service for Apache Flink com fluxos de dados como origem e coletor.
Esta seção contém as seguintes etapas:
- Criar dois fluxos de dados do Amazon Kinesis Data Streams
- Gravação de registros de amostra no fluxo de entrada
- Baixar e examinar o código Java Apache Flink Streaming
- Compilar o código do aplicativo
- Fazer o upload do código Java Apache Flink Streaming
- Criar e executar o aplicativo do Managed Service for Apache Flink
Criar dois fluxos de dados do Amazon Kinesis Data Streams
Antes de criar um Amazon Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis (ExampleInputStream
e ExampleOutputStream
). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.
É possível criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para instruções do console, consulte Criar e atualizar fluxos de dados.
Como criar os fluxos de dados (AWS CLI)
-
Para criar o primeiro fluxo (
ExampleInputStream
), use o comandocreate-stream
AWS CLI do Amazon Kinesis a seguir.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Gravação de registros de amostra no fluxo de entrada
Nesta seção, será usado um script Python para gravar registros de amostra no fluxo para o aplicativo processar.
nota
Essa seção requer AWS SDK for Python (Boto)
-
Crie um arquivo denominado
stock.py
com o conteúdo a seguir:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
-
Mais adiante neste tutorial, será executado o script
stock.py
para enviar dados para o aplicativo.$ python stock.py
Baixar e examinar o código Java Apache Flink Streaming
O código de aplicativo Java destas amostras está disponível no GitHub. Para fazer download do código do aplicativo, faça o seguinte:
-
Duplique o repositório remoto com o seguinte comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
-
Navegue até o diretório
GettingStarted
.
O código do aplicativo está localizado nos arquivos CustomSinkStreamingJob.java
e CloudWatchLogSink.java
. Observe o seguinte sobre o código do aplicativo:
-
A aplicação usa uma origem do Kinesis para ler o fluxo de origem. O trecho a seguir cria o coletor do Kinesis:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Compilar o código do aplicativo
Nesta seção, será usado o compilador do Apache Maven para criar o código Java para o aplicativo. Para obter informações sobre como instalar o Apache Maven e o Java Development Kit (JDK), consulte Pré-requisitos para concluir os exercícios.
Seu aplicativo Java requer os seguintes componentes:
-
Um arquivo Project Object Model (pom.xml)
. Esse arquivo contém informações sobre a configuração e as dependências do aplicativo, incluindo as bibliotecas do Amazon Managed Service for Apache Flink. -
Um método
main
que contém a lógica do aplicativo.
nota
Para usar o conector do Kinesis para a aplicação a seguir, é necessário baixar o código-fonte do conector e compilá-lo como descrito na documentação do Apache Flink
Como criar e compilar o código do aplicativo
-
Crie um aplicativo Java/Maven em seu ambiente de desenvolvimento. Para obter informações sobre como criar um aplicativo, consulte a documentação do seu ambiente de desenvolvimento:
-
Use o código a seguir para um arquivo chamado
StreamingJob.java
.package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
Observe o seguinte sobre o exemplo de código anterior:
-
Este arquivo contém o método
main
que define a funcionalidade do aplicativo. -
Seu aplicativo cria conectores de origem e de destino para acessar recursos externos usando um objeto
StreamExecutionEnvironment
. -
O aplicativo cria conectores de origem e de destino usando propriedades estáticas. Para usar as propriedades dinâmicas do aplicativo, use os métodos
createSourceFromApplicationProperties
ecreateSinkFromApplicationProperties
para criar os conectores. Esses métodos leem as propriedades do aplicativo para configurar os conectores.
-
-
Para usar o seu código de aplicativo, compile-o e empacote-o em um arquivo JAR. Há duas formas de compilar e empacotar o código:
-
Use a ferramenta de linha de comando do Maven. Crie seu arquivo JAR executando o seguinte comando no diretório que contém o arquivo
pom.xml
:mvn package
-
Use o ambiente de desenvolvimento. Consulte a documentação de seu ambiente de desenvolvimento para obter mais detalhes.
É possível carregar o pacote como um arquivo JAR, ou pode compactar o pacote e carregá-lo como um arquivo ZIP. Se o aplicativo for criado usando a AWS CLI, é necessário especificar o tipo de conteúdo de código (JAR ou ZIP).
-
-
Se houver erros durante a compilação, verifique se sua variável de ambiente
JAVA_HOME
está definida corretamente.
Se o aplicativo for compilado com êxito, o arquivo a seguir é criado:
target/java-getting-started-1.0.jar
Fazer o upload do código Java Apache Flink Streaming
Nesta seção, será criado um bucket do Amazon Simple Storage Service (Amazon S3) e realizado o upload do código do aplicativo.
Para fazer upload do código do aplicativo
Abra o console do Amazon S3 em https://console.aws.amazon.com/s3/
. -
Selecione Criar bucket.
-
Insira
ka-app-code-
no campo Nome do bucket. Adicione um sufixo para o nome do bucket, como o nome do usuário, para torná-lo globalmente exclusivo. Selecione Next (Próximo).<username>
-
Na etapa Configurar opções, mantenha as configurações como estão e selecione Próximo.
-
Na etapa Definir permissões, mantenha as configurações como estão e selecione Próximo.
-
Selecione Criar bucket.
-
No console do Amazon S3, selecione o bucket ka-app-code-
<username>
e selecione Fazer upload. -
Na etapa Selecionar arquivos, selecione Adicionar arquivos. Navegue até o arquivo
java-getting-started-1.0.jar
, criado na etapa anterior. Escolha Próximo. -
Na etapa Definir permissões, mantenha as configurações como estão. Escolha Próximo.
-
Na etapa Definir propriedades, mantenha as configurações como estão. Escolha Carregar.
O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pela aplicação.
Criar e executar o aplicativo do Managed Service for Apache Flink
É possível criar e executar um aplicativo Managed Service for Apache Flink usando o console ou a AWS CLI.
nota
Ao criar o aplicativo usando o console, os recursos do AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs também são criados. Ao criar o aplicativo usando a AWS CLI, esses recursos devem ser criados separadamente.
Criar e executar o aplicativo (console)
Siga estas etapas para criar, configurar, atualizar e executar o aplicativo usando o console.
Criar o aplicativo
Abra o console do Kinesis em https://console.aws.amazon.com/kinesis
. -
No painel do Amazon Kinesis, escolha Criar aplicativo de análise.
-
Na página Kinesis Analytics – Criar aplicativo, forneça os detalhes do aplicativo da seguinte forma:
-
Em Nome do aplicativo, insira
MyApplication
. -
Em Descrição, insira
My java test app
. -
Em Runtime, escolha Apache Flink 1.6.
-
-
Em Permissões de acesso, escolha Criar/atualizar o perfil do IAM
kinesis-analytics-MyApplication-us-west-2
. -
Selecione Criar aplicativo.
nota
Ao criar um aplicativo Amazon Managed Service for Apache Flink usando o console, existe a opção de ter um perfil do IAM e uma política criada para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Função:
kinesis-analytics-
MyApplication
-us-west-2
Editar a política do IAM
Edite a política do IAM para adicionar permissões de acesso aos fluxos de dados do Kinesis.
Abra o console do IAM em https://console.aws.amazon.com/iam/
. -
Selecione Políticas. Selecione a política
kinesis-analytics-service-MyApplication-us-west-2
que o console criou na seção anterior. -
Na página Resumo, selecione Editar política. Selecione a guia JSON.
-
Adicione a seção destacada do exemplo de política a seguir à política. Substitua os exemplos de IDs de conta (
012345678901
) pelo ID da conta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
] }
Configurar o aplicativo
-
Na página MyApplication, selecione Configurar.
-
Na página Configurar aplicativo, forneça o Local do código:
-
Em Bucket do Amazon S3, insira
ka-app-code-
.<username>
-
Em Caminho do objeto do Amazon S3, insira
java-getting-started-1.0.jar
.
-
-
Na seção Acesso aos recursos do aplicativo, em Permissões de acesso, selecione Criar/atualizar o perfil do IAM
kinesis-analytics-MyApplication-us-west-2
. -
Em Propriedades, ID do grupo, insira
ProducerConfigProperties
. -
Insira as seguintes propriedades e valores de aplicativo:
Chave Valor flink.inputstream.initpos
LATEST
aws:region
us-west-2
AggregationEnabled
false
-
Em Monitoramento, confirme se Nível de monitoramento de métricas está definido como Aplicativo.
-
Em Registro em log do CloudWatch, marque a caixa de seleção Habilitar.
-
Selecione Atualizar.
nota
Ao optar por habilitar o registro em log do CloudWatch, o Managed Service for Apache Flink cria um grupo de logs e um fluxo de logs. Os nomes desses recursos são os seguintes:
-
Grupo de logs:
/aws/kinesis-analytics/MyApplication
-
Fluxo de logs:
kinesis-analytics-log-stream
Execute o aplicativo
-
Na página MyApplication, selecione Executar. Confirme a ação.
-
Quando o aplicativo estiver em execução, atualize a página. O console mostra o Gráfico do aplicativo.
Interromper o aplicativo
Na página MyApplication, selecione Interromper. Confirme a ação.
Atualizar o aplicativo
Usando o console, é possível atualizar configurações do aplicativo, como as propriedades do aplicativo, as configurações de monitoramento e a localização ou o nome do arquivo JAR do aplicativo. Também é possível recarregar o JAR do aplicativo do bucket do Amazon S3 se for necessário atualizar o código do aplicativo.
Na página MyApplication, selecione Configurar. Atualize as configurações do aplicativo e selecione Atualizar.
Criar e executar o aplicativo (AWS CLI)
Nesta seção, use a AWS CLI para criar e executar o aplicativo Flink do Managed Service for Apache Flink. O Managed Service for Apache Flink usa o comando kinesisanalyticsv2
AWS CLI para criar e interagir com aplicativos Managed Service for Apache Flink.
Criar uma política de permissões
Primeiro, crie uma política de permissões com duas instruções: uma que concede permissões para a ação read
no fluxo de origem, e outra que concede permissões para ações write
no fluxo de destino. Em seguida, anexe a política a um perfil do IAM (que será criado na próxima seção). Assim, ao assumir o perfil, o serviço Managed Service for Apache Flink terá as permissões necessárias para ler o fluxo de origem e gravar no fluxo de coleta.
Use o código a seguir para criar a política de permissões KAReadSourceStreamWriteSinkStream
. Substitua
pelo nome de usuário usado para criar o bucket do Amazon S3 e armazenar o código do aplicativo. Substitua o ID da conta nos Nomes de recurso da Amazon (ARNs) (username
) pelo ID da conta.012345678901
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": ["arn:aws:s3:::ka-app-code-username
",
"arn:aws:s3:::ka-app-code-username
/*"
]
},
{
"Sid": "ReadInputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream"
},
{
"Sid": "WriteOutputStream",
"Effect": "Allow",
"Action": "kinesis:*",
"Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream"
}
]
}
Para obter instruções passo a passo para criar uma política de permissões, consulte Tutorial: crie e anexe a sua primeira política gerenciada pelo cliente no Guia do usuário do IAM.
nota
Para acessar outros serviços da AWS, pode-se usar o AWS SDK for Java. O Managed Service for Apache Flink define automaticamente as credenciais exigidas pelo SDK como as credenciais do perfil do IAM associado a sua aplicação. Não é necessária nenhuma etapa adicional.
Criar uma perfil do IAM
Nesta seção, será criado um perfil do IAM que o Managed Service for Apache Flink pode assumir para ler um fluxo de origem e gravar no fluxo de coleta.
O Managed Service for Apache Flink não pode acessar seu fluxo sem permissões. Essas permissões são concedidas usando um perfil do IAM. Cada perfil do IAM tem duas políticas anexadas. A política de confiança concede ao Managed Service for Apache Flink permissão para assumir o perfil, e a política de permissões determina o que o serviço pode fazer depois de assumir a função.
Anexe a política de permissões que criou na seção anterior a essa função.
Para criar uma perfil do IAM
Abra o console do IAM em https://console.aws.amazon.com/iam/
. -
No painel de navegação, selecione Funções e Criar função.
-
Em Selecionar tipo de identidade de confiança, selecione Serviço da AWS. Em Selecionar o serviço que usará esta função, selecione Kinesis. Em Selecionar seu caso de uso, selecione Kinesis Analytics.
Selecione Próximo: permissões.
-
Na página Attach permissions policies, selecione Next: Review. É possível anexar políticas de permissões depois de criar a função.
-
Na página Criar função, insira
KA-stream-rw-role
para o Nome da função. Selecione Criar função.Foi criado um perfil do IAM chamado
KA-stream-rw-role
. Em seguida, atualize as políticas de confiança e de permissões para a função. -
Anexe a política de permissões à função.
nota
Para este exercício, o Managed Service for Apache Flink assume esse perfil para ler dados de um fluxo de dados do Kinesis (origem) e gravar a saída em outro fluxo de dados do Kinesis. Depois, anexe a política criada na etapa anterior, Criar uma política de permissões.
-
Na página Resumo, selecione a guia Permissões.
-
Selecione Attach Policies.
-
Na caixa de pesquisa, insira
KAReadSourceStreamWriteSinkStream
(a política criada na seção anterior). -
Escolha a política KAReadInputStreamWriteOutputStream e escolha Anexar política.
-
Agora você criou a função de execução de serviço que seu aplicativo usa para acessar os recursos. Anote o ARN da nova função.
Para obter instruções passo a passo sobre como criar um perfil, consulte Criação de um perfil do IAM (console) no Guia do usuário do IAM.
Criar o aplicativo do Managed Service for Apache Flink
-
Salve o seguinte código JSON em um arquivo chamado
create_request.json
. Substitua o ARN da função de amostra pelo ARN da função criada anteriormente. Substitua o sufixo do ARN do bucket (
) pelo sufixo selecionado na seção anterior. Substitua o ID da conta de exemplo (username
) na função de execução do serviço pelo ID da conta.012345678901
{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::
012345678901
:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username
", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
Execute a ação
CreateApplication
com a solicitação anterior para criar o aplicativo:aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
O aplicativo agora é criado. Inicie o aplicativo na próxima etapa.
Iniciar o aplicativo
Nesta seção, a ação StartApplication
será usada para iniciar o aplicativo.
Para iniciar o aplicativo
-
Salve o seguinte código JSON em um arquivo chamado
start_request.json
.{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
-
Execute a ação
StartApplication
com a solicitação anterior para iniciar o aplicativo:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
O aplicativo agora está em execução. É possível verificar as métricas do Managed Service for Apache Flink no console do Amazon CloudWatch para verificar se o aplicativo está funcionando.
Interromper o aplicativo
Nesta seção, a ação StopApplication
será usada para interromper o aplicativo.
Como interromper o aplicativo
-
Salve o seguinte código JSON em um arquivo chamado
stop_request.json
.{"ApplicationName": "test" }
-
Execute a ação
StopApplication
com a seguinte solicitação para interromper o aplicativo:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
O aplicativo agora está interrompido.