Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Crie e execute um serviço gerenciado para o aplicativo Apache Flink para Python
Nesta seção, você cria um serviço gerenciado para o aplicativo Apache Flink para Python com um stream do Kinesis como origem e coletor.
Esta seção contém as seguintes etapas.
- Crie recursos dependentes
- Configurar seu ambiente de desenvolvimento local
- Baixe e examine o código Python de streaming do Apache Flink
- Gerenciar JAR dependências
- Grave registros de amostra no fluxo de entrada
- Execute seu aplicativo localmente
- Observe os dados de entrada e saída nos streams do Kinesis
- Pare de executar seu aplicativo localmente
- Package o código do seu aplicativo
- Faça o upload do pacote do aplicativo em um bucket do Amazon S3
- Crie e configure o serviço gerenciado para o aplicativo Apache Flink
- Próxima etapa
Crie recursos dependentes
Antes de criar um Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:
-
Duas transmissões do Kinesis para entrada e saída.
-
Um bucket do Amazon S3 para armazenar o código do aplicativo.
nota
Este tutorial pressupõe que você esteja implantando seu aplicativo na região us-east-1. Se você usa outra região, deve adaptar todas as etapas adequadamente.
Crie dois streams do Kinesis
Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, crie dois streams de dados do Kinesis (ExampleInputStream
eExampleOutputStream
) na mesma região que você usará para implantar seu aplicativo (us-east-1 neste exemplo). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.
Você pode criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para obter instruções sobre o console, consulte Criar e atualizar fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams.
Como criar os fluxos de dados (AWS CLI)
-
Para criar o primeiro stream (
ExampleInputStream
), use o seguinte comando do Amazon Kinesiscreate-stream
AWS CLI .$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
-
Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
Criar um bucket do Amazon S3
Você pode criar um bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esse recurso, consulte os tópicos a seguir:
-
Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo, por exemplo, anexando seu nome de login.
nota
Certifique-se de criar o bucket do S3 na região que você usa para este tutorial (us-east-1).
Outros recursos
Quando você cria seu aplicativo, o Managed Service for Apache Flink cria os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:
-
Um grupo de logs chamado
/AWS/KinesisAnalytics-java/<my-application>
. -
Um fluxo de logs chamado
kinesis-analytics-log-stream
.
Configurar seu ambiente de desenvolvimento local
Para desenvolvimento e depuração, você pode executar o aplicativo Python Flink em sua máquina. Você pode iniciar o aplicativo a partir da linha de comando com python
main.py
ou em um Python IDE de sua escolha.
nota
Em sua máquina de desenvolvimento, você deve ter o Python 3.10 ou 3.11, o Java 11, o Apache Maven e o Git instalados. Recomendamos que você use um IDE como PyCharm
Instale a PyFlink biblioteca
Para desenvolver seu aplicativo e executá-lo localmente, você deve instalar a biblioteca Flink Python.
-
Crie um ambiente Python autônomo VirtualEnv usando o Conda ou qualquer ferramenta Python similar.
-
Instale a PyFlink biblioteca nesse ambiente. Use a mesma versão de tempo de execução do Apache Flink que você usará no Amazon Managed Service para Apache Flink. Atualmente, o tempo de execução recomendado é 1.19.1.
$ pip install apache-flink==1.19.1
-
Certifique-se de que o ambiente esteja ativo ao executar seu aplicativo. Se você executar o aplicativo noIDE, verifique se o IDE está usando o ambiente como tempo de execução. O processo depende do IDE que você está usando.
nota
Você só precisa instalar a PyFlink biblioteca. Você não precisa instalar um cluster Apache Flink em sua máquina.
Autentique sua sessão AWS
O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no stream de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:
-
Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulteConfigure o AWS Command Line Interface (AWS CLI).
-
Verifique se o seu AWS CLI está configurado corretamente e se seus usuários têm permissões para gravar no stream de dados do Kinesis publicando o seguinte registro de teste:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
-
Se você IDE tiver um plug-in com o qual se integrar AWS, poderá usá-lo para passar as credenciais para o aplicativo em execução noIDE. Para obter mais informações, consulte AWS Toolkit for PyCharm
, AWS Toolkit for Visual Studio Code AWS e Toolkit for IntelliJ. IDEA
Baixe e examine o código Python de streaming do Apache Flink
O código do aplicativo Python para este exemplo está disponível em. GitHub Para fazer download do código do aplicativo, faça o seguinte:
-
Duplique o repositório remoto usando o seguinte comando:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
-
Navegue até o diretório
./python/GettingStarted
.
Revise os componentes do aplicativo
O código do aplicativo está localizado emmain.py
. Usamos SQL incorporado em Python para definir o fluxo do aplicativo.
nota
Para uma experiência otimizada do desenvolvedor, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service para Apache Flink e localmente, para desenvolvimento em sua máquina. O aplicativo usa a variável de ambiente IS_LOCAL =
true
para detectar quando está sendo executado localmente. Você deve definir a variável IS_LOCAL = true
de ambiente no seu shell ou na configuração de execução do seuIDE.
-
O aplicativo configura o ambiente de execução e lê a configuração do tempo de execução. Para funcionar tanto no Amazon Managed Service para Apache Flink quanto localmente, o aplicativo verifica a
IS_LOCAL
variável.-
O seguinte é o comportamento padrão quando o aplicativo é executado no Amazon Managed Service para Apache Flink:
-
Carregue as dependências empacotadas com o aplicativo. Para obter mais informações, consulte (link)
-
Carregue a configuração das propriedades do Runtime que você define no aplicativo Amazon Managed Service for Apache Flink. Para obter mais informações, consulte (link)
-
-
Quando o aplicativo detecta
IS_LOCAL = true
quando você executa seu aplicativo localmente:-
Carrega dependências externas do projeto.
-
Carrega a configuração do
application_properties.json
arquivo incluído no projeto.... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
O aplicativo define uma tabela de origem com uma
CREATE TABLE
declaração, usando o KinesisConnector. Essa tabela lê dados do stream de entrada do Kinesis. O aplicativo usa o nome do fluxo, a região e a posição inicial da configuração do tempo de execução. table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
-
O aplicativo também define uma tabela de coletor usando o Kinesis Connector
neste exemplo. Essa tabela envia dados para o stream de saída do Kinesis. table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
-
Por fim, o aplicativo executa uma SQL tabela coletora
INSERT INTO...
a partir da tabela de origem. Em um aplicativo mais complexo, é provável que você tenha etapas adicionais para transformar os dados antes de gravar no coletor.table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
-
Você deve adicionar outra etapa no final da
main()
função para executar o aplicativo localmente:if is_local: table_result.wait()
Sem essa instrução, o aplicativo é encerrado imediatamente quando você o executa localmente. Você não deve executar essa declaração ao executar seu aplicativo no Amazon Managed Service para Apache Flink.
Gerenciar JAR dependências
Um PyFlink aplicativo geralmente requer um ou mais conectores. O aplicativo neste tutorial usa o Kinesis
Neste exemplo, mostramos como usar o Apache Maven para buscar as dependências e empacotar o aplicativo para ser executado no Managed Service para Apache Flink.
nota
Existem formas alternativas de buscar e empacotar dependências. Este exemplo demonstra um método que funciona corretamente com um ou mais conectores. Ele também permite que você execute o aplicativo localmente, para desenvolvimento e no Managed Service for Apache Flink sem alterações no código.
Use o arquivo pom.xml
O Apache Maven usa o pom.xml
arquivo para controlar dependências e pacotes de aplicativos.
Todas JAR as dependências são especificadas no pom.xml
arquivo do <dependencies>...</dependencies>
bloco.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
Para encontrar o artefato e a versão corretos do conector a serem usados, consulteUse conectores Apache Flink com o Managed Service para Apache Flink. Certifique-se de consultar a versão do Apache Flink que você está usando. Neste exemplo, usamos o conector Kinesis. Para o Apache Flink 1.19, a versão do conector é. 4.3.0-1.19
nota
Se você estiver usando o Apache Flink 1.19, não há uma versão de conector lançada especificamente para essa versão. Use os conectores lançados para 1.18.
Dependências de download e empacotamento
Use o Maven para baixar as dependências definidas no pom.xml
arquivo e empacotá-las para o aplicativo Python Flink.
-
Navegue até o diretório que contém o projeto Python Getting Started chamado.
python/GettingStarted
-
Execute o seguinte comando:
$ mvn package
O Maven cria um novo arquivo chamado./target/pyflink-dependencies.jar
. Quando você está desenvolvendo localmente em sua máquina, o aplicativo Python procura esse arquivo.
nota
Se você esquecer de executar este comando, ao tentar executar seu aplicativo, ele falhará com o erro: Não foi possível encontrar nenhuma fábrica para o identificador “kinesis”.
Grave registros de amostra no fluxo de entrada
Nesta seção, você enviará registros de amostra ao stream para o aplicativo processar. Você tem duas opções para gerar dados de amostra, usando um script Python ou o Kinesis
Gere dados de amostra usando um script Python
Você pode usar um script Python para enviar registros de amostra para o stream.
nota
Para executar esse script Python, você deve usar o Python 3.x e ter a biblioteca for AWS SDKPython
Para começar a enviar dados de teste para o stream de entrada do Kinesis:
-
Baixe o script
stock.py
Python do gerador de dados no repositório do gerador GitHub de dados. -
Execute o script
stock.py
:$ python stock.py
Mantenha o script em execução enquanto você conclui o resto do tutorial. Agora você pode executar seu aplicativo Apache Flink.
Gere dados de amostra usando o Kinesis Data Generator
Como alternativa ao script Python, você pode usar o Kinesis Data Generator
Para configurar e executar o Kinesis Data Generator:
-
Siga as instruções na documentação do Kinesis Data Generator
para configurar o acesso à ferramenta. Você executará um AWS CloudFormation modelo que configura um usuário e uma senha. -
Acesse o Kinesis Data Generator por meio do URL gerado pelo CloudFormation modelo. Você pode encontrar o URL na guia Saída após a conclusão do CloudFormation modelo.
-
Configure o gerador de dados:
-
Região: selecione a região que você está usando para este tutorial: us-east-1
-
Stream/stream de entrega: selecione o stream de entrada que o aplicativo usará:
ExampleInputStream
-
Registros por segundo: 100
-
Modelo de registro: copie e cole o seguinte modelo:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
Teste o modelo: escolha Modelo de teste e verifique se o registro gerado é semelhante ao seguinte:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
-
Inicie o gerador de dados: escolha Selecionar Enviar dados.
O Kinesis Data Generator agora está enviando dados para o. ExampleInputStream
Execute seu aplicativo localmente
Você pode testar o aplicativo localmente, executando a partir da linha de comando com python main.py
ou a partir do seuIDE.
Para executar seu aplicativo localmente, você deve ter a versão correta da PyFlink biblioteca instalada conforme descrito na seção anterior. Para obter mais informações, consulte (link)
nota
Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte Crie dois streams de dados do Amazon Kinesis. Além disso, verifique se você tem permissão para ler e gravar nos dois fluxos. Consulte Autentique sua sessão AWS.
Importe o projeto Python para o seu IDE
Para começar a trabalhar no aplicativo em seuIDE, você deve importá-lo como um projeto Python.
O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do ./python/GettingStarted
subdiretório para o seuIDE.
Importe o código como um projeto Python existente.
nota
O processo exato para importar um novo projeto em Python varia de acordo com o IDE que você está usando.
Verifique a configuração do aplicativo local
Ao ser executado localmente, o aplicativo usa a configuração no application_properties.json
arquivo na pasta de recursos do projeto abaixo./src/main/resources
. Você pode editar esse arquivo para usar diferentes nomes ou regiões de stream do Kinesis.
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
Execute seu aplicativo Python localmente
Você pode executar seu aplicativo localmente, seja na linha de comando, como um script Python normal, ou no. IDE
Para executar seu aplicativo a partir da linha de comando
-
Certifique-se de que o ambiente autônomo do Python, como o Conda VirtualEnv ou onde você instalou a biblioteca Python Flink, esteja ativo no momento.
-
Certifique-se de correr pelo
mvn package
menos uma vez. -
Defina a
IS_LOCAL = true
variável de ambiente:$ export IS_LOCAL=true
-
Execute o aplicativo como um script Python normal.
$python main.py
Para executar o aplicativo de dentro do IDE
-
Configure seu IDE para executar o
main.py
script com a seguinte configuração:-
Use o ambiente Python independente, como o Conda VirtualEnv ou onde você instalou a biblioteca. PyFlink
-
Use as AWS credenciais para acessar os streams de dados de entrada e saída do Kinesis.
-
Defina
IS_LOCAL = true
.
-
-
O processo exato para definir a configuração de execução depende de você IDE e varia.
-
Depois de configurar o seuIDE, execute o script Python e use as ferramentas fornecidas por você IDE enquanto o aplicativo estiver em execução.
Inspecione os registros do aplicativo localmente
Quando executado localmente, o aplicativo não mostra nenhum registro no console, além de algumas linhas impressas e exibidas quando o aplicativo é iniciado. PyFlink grava registros em um arquivo no diretório em que a biblioteca Python Flink está instalada. O aplicativo imprime a localização dos registros quando é iniciado. Você também pode executar o comando a seguir para encontrar os registros:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
Liste os arquivos no diretório de registro. Normalmente, você encontra um único
.log
arquivo. -
Acompanhe o arquivo enquanto o aplicativo estiver em execução:
tail -f <log-path>/<log-file>.log
.
Observe os dados de entrada e saída nos streams do Kinesis
Você pode observar os registros enviados ao stream de entrada pelo (gerando o Python de amostra) ou pelo Kinesis Data Generator (link) usando o Visualizador de dados no console do Amazon Kinesis.
Para observar registros:
Pare de executar seu aplicativo localmente
Pare a execução do aplicativo em seuIDE. IDEGeralmente fornece uma opção de “parar”. A localização e o método exatos dependem doIDE.
Package o código do seu aplicativo
Nesta seção, você usa o Apache Maven para empacotar o código do aplicativo e todas as dependências necessárias em um arquivo.zip.
Execute o comando do pacote Maven novamente:
$ mvn package
Esse comando gera o arquivotarget/managed-flink-pyflink-getting-started-1.0.0.zip
.
Faça o upload do pacote do aplicativo em um bucket do Amazon S3
Nesta seção, você carrega o arquivo.zip criado na seção anterior para o bucket do Amazon Simple Storage Service (Amazon S3) que você criou no início deste tutorial. Se você não concluiu essa etapa, consulte (link).
Para carregar o JAR arquivo de código do aplicativo
Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/
-
Escolha o bucket que você criou anteriormente para o código do aplicativo.
-
Escolha Carregar.
-
Escolha Adicionar arquivos.
-
Navegue até o arquivo.zip gerado na etapa anterior:
target/managed-flink-pyflink-getting-started-1.0.0.zip
. -
Escolha Carregar sem alterar nenhuma outra configuração.
Crie e configure o serviço gerenciado para o aplicativo Apache Flink
Você pode criar e configurar um serviço gerenciado para o aplicativo Apache Flink usando o console ou o. AWS CLI Para este tutorial, usaremos o console.
Criar o aplicativo
Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com
-
Verifique se a região correta está selecionada: Leste dos EUA (Norte da Virgínia) us-east-1.
-
Abra o menu do lado direito e escolha aplicativos Apache Flink e, em seguida, Criar aplicativo de streaming. Como alternativa, escolha Criar aplicativo de streaming na seção Começar da página inicial.
-
Na página Criar aplicativos de streaming:
-
Em Escolha um método para configurar o aplicativo de processamento de fluxo, escolha Criar do zero.
-
Para configuração do Apache Flink, versão do aplicativo Flink, escolha Apache Flink 1.19.
-
Para configuração do aplicativo:
-
Em Nome do aplicativo, insira
MyApplication
. -
Em Descrição, insira
My Python test app
. -
Em Acesso aos recursos do aplicativo, escolha Create/update IAM role kinesis-analytics- MyApplication -us-east-1 com as políticas necessárias.
-
-
Para o modelo para configurações de aplicativos:
-
Em Modelos, escolha Desenvolvimento.
-
-
Escolha Criar aplicativo de streaming.
-
nota
Ao criar um serviço gerenciado para o aplicativo Apache Flink usando o console, você tem a opção de criar uma IAM função e uma política para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses IAM recursos são nomeados usando o nome do aplicativo e a região da seguinte forma:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Função:
kinesisanalytics-
MyApplication
-us-west-2
O Amazon Managed Service para Apache Flink era conhecido anteriormente como Kinesis Data Analytics. O nome dos recursos que são gerados automaticamente é prefixado com kinesis-analytics
para fins de compatibilidade com versões anteriores.
Edite a IAM política
Edite a IAM política para adicionar permissões para acessar o bucket do Amazon S3.
Para editar a IAM política para adicionar permissões de bucket do S3
Abra o IAM console em https://console.aws.amazon.com/iam/
. -
Selecione Policies (Políticas). Selecione a política
kinesis-analytics-service-MyApplication-us-east-1
que o console criou para você na seção anterior. -
Escolha Editar e, em seguida, escolha a JSONguia.
-
Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (
012345678901
) com o ID da sua conta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:
012345678901
:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901
:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:
] }012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901
:stream/ExampleOutputStream" } -
Escolha Próximo e, em seguida, escolha Salvar alterações.
Configurar o aplicativo
Edite a configuração do aplicativo para definir o artefato do código do aplicativo.
Configurar o aplicativo
-
Na MyApplicationpágina, escolha Configurar.
-
Na seção Localização do código do aplicativo:
-
Para o bucket do Amazon S3, selecione o bucket que você criou anteriormente para o código do aplicativo. Escolha Procurar e selecione o bucket correto e, em seguida, escolha Escolher. Não selecione o nome do bucket.
-
Em Caminho do objeto do Amazon S3, insira
managed-flink-pyflink-getting-started-1.0.0.zip
.
-
-
Para permissões de acesso, escolha Criar/atualizar IAM função
kinesis-analytics-MyApplication-us-east-1
com as políticas necessárias. -
Vá para as propriedades de tempo de execução e mantenha os valores padrão para todas as outras configurações.
-
Escolha Adicionar novo item e adicione cada um dos seguintes parâmetros:
ID do grupo Chave Valor InputStream0
stream.name
ExampleInputStream
InputStream0
flink.stream.initpos
LATEST
InputStream0
aws.region
us-east-1
OutputStream0
stream.name
ExampleOutputStream
OutputStream0
aws.region
us-east-1
kinesis.analytics.flink.run.options
python
main.py
kinesis.analytics.flink.run.options
jarfile
lib/pyflink-dependencies.jar
-
Não modifique nenhuma das outras seções e escolha Salvar alterações.
nota
Quando você opta por habilitar o CloudWatch registro na Amazon, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:
-
Grupo de logs:
/aws/kinesis-analytics/MyApplication
-
Fluxo de logs:
kinesis-analytics-log-stream
Execute o aplicativo
O aplicativo agora está configurado e pronto para ser executado.
Executar o aplicativo
-
No console do Amazon Managed Service para Apache Flink, escolha Meu aplicativo e escolha Executar.
-
Na próxima página, na página de configuração de restauração do aplicativo, escolha Executar com o snapshot mais recente e, em seguida, escolha Executar.
O status nos detalhes do aplicativo muda de
Ready
paraStarting
e depois paraRunning
quando o aplicativo é iniciado.
Quando o aplicativo está no Running
status, agora você pode abrir o painel do Flink.
Para abrir o painel do
-
Escolha Abrir painel do Apache Flink. O painel é aberto em uma nova página.
-
Na lista de trabalhos em execução, escolha o único trabalho que você pode ver.
nota
Se você definiu as propriedades do Runtime ou editou as IAM políticas incorretamente, o status do aplicativo pode se transformar em
Running
, mas o painel do Flink mostra que o trabalho está sendo reiniciado continuamente. Esse é um cenário de falha comum se o aplicativo estiver configurado incorretamente ou não tiver permissões para acessar os recursos externos.Quando isso acontecer, verifique a guia Exceções no painel do Flink para ver a causa do problema.
Observe as métricas do aplicativo em execução
Na MyApplicationpágina, na seção de CloudWatch métricas da Amazon, você pode ver algumas das métricas fundamentais do aplicativo em execução.
Para ver as métricas
-
Ao lado do botão Atualizar, selecione 10 segundos na lista suspensa.
-
Quando o aplicativo está em execução e em bom estado, você pode ver a métrica de tempo de atividade aumentando continuamente.
-
A métrica de reinicializações completas deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Para investigar o problema, revise a guia Exceções no painel do Flink.
-
A métrica Número de pontos de verificação com falha deve ser zero em um aplicativo saudável.
nota
Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no CloudWatch painel.
Observe os dados de saída nos streams do Kinesis
Verifique se você ainda está publicando dados na entrada, usando o script Python ou o Kinesis Data Generator.
Agora você pode observar a saída do aplicativo em execução no Managed Service for Apache Flink usando o Visualizador de Dados no https://console.aws.amazon.com/kinesis/
Para ver a saída
Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.
-
Verifique se a região é a mesma que você está usando para executar este tutorial. Por padrão, é US-East-1US East (Norte da Virgínia). Altere a região, se necessário.
-
Escolha fluxos de dados.
-
Selecione o stream que você deseja observar. Para este tutorial, use
ExampleOutputStream
. -
Escolha a guia Visualizador de dados.
-
Selecione qualquer fragmento, mantenha Último como posição inicial e escolha Obter registros. Talvez você veja o erro “nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha Tentar obter registros novamente. Os registros mais recentes publicados no stream são exibidos.
-
Selecione o valor na coluna Dados para inspecionar o conteúdo do registro em JSON formato.
Pare o aplicativo
Para interromper o aplicativo, acesse a página do console do aplicativo Managed Service for Apache Flink chamada. MyApplication
Como interromper o aplicativo
-
Na lista suspensa Ação, escolha Parar.
-
O status nos detalhes do aplicativo muda de
Running
paraStopping
e depois paraReady
quando o aplicativo é completamente interrompido.nota
Não se esqueça também de parar de enviar dados para o stream de entrada a partir do script Python ou do Kinesis Data Generator.