Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

AWS Lambda exemplos de funções para o Amazon Neptune

Modo de foco
AWS Lambda exemplos de funções para o Amazon Neptune - Amazon Neptune

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

As AWS Lambda funções de exemplo a seguir, escritas em Java JavaScript e Python, ilustram a alteração de um único vértice com um ID gerado aleatoriamente usando o idioma. fold().coalesce().unfold()

Grande parte do código em cada função é código clichê responsável por gerenciar e repetir conexões e consultas em caso de erro. A lógica real da aplicação e a consulta do Gremlin são implementadas nos métodos doQuery() e query(), respectivamente. Se você usar esses exemplos como base para as próprias funções do Lambda, poderá se concentrar em modificar doQuery() e query().

As funções são configuradas para repetir consultas com falha cinco vezes, aguardando um segundo entre as tentativas.

As funções exigem que os valores estejam presentes nas seguintes variáveis de ambiente do Lambda:

  • NEPTUNE_ENDPOINT: o endpoint de cluster de banco de dados do Neptune. Para Python, deve ser neptuneEndpoint.

  • NEPTUNE_PORT: a porta do Neptune. Para Python, deve ser neptunePort.

  • USE_IAM — (trueoufalse) Se seu banco de dados tiver a autenticação de banco de dados AWS Identity and Access Management (IAM) ativada, defina a variável de USE_IAM ambiente comotrue. Isso faz com que a função do Lambda assine por Sigv4 solicitações de conexão com o Neptune. Para essas solicitações de autenticação de banco de dados do IAM, garanta que a função de execução da função do Lambda tenha uma política do IAM apropriada anexada que permita que a função se conecte ao cluster de banco de dados do Neptune (consulte Tipos de política do IAM).

Exemplo de função do Lambda em Java para Amazon Neptune

Aqui estão algumas coisas que você deve ter em mente sobre AWS Lambda as funções Java:

  • O driver Java mantém o próprio grupo de conexões, que não é necessário, então configure o objeto Cluster com minConnectionPoolSize(1) e maxConnectionPoolSize(1).

  • O objeto Cluster pode demorar porque cria um ou mais serializadores (Gyro por padrão, além de outro, se você o tiver configurado para formatos de saída adicionais, como binary). Eles podem demorar um pouco para ser instanciados.

  • O grupo de conexões é inicializado com a primeira solicitação. Nesse ponto, o driver configura a pilha Netty, aloca buffers de bytes e cria uma chave de assinatura caso você esteja usando a autenticação de banco de dados do IAM. Tudo isso pode aumentar a latência de inicialização a frio.

  • O grupo de conexões do driver Java monitora a disponibilidade dos hosts do servidor e tenta se reconectar automaticamente em caso de falha da conexão. Ele inicia uma tarefa em segundo plano para tentar restabelecer a conexão. Use reconnectInterval( ) para configurar o intervalo entre as tentativas de reconexão. Enquanto o driver está tentando se reconectar, a função do Lambda pode simplesmente repetir a consulta.

    Se o intervalo entre as tentativas for menor do que o intervalo entre as tentativas de reconexão, ocorrerá novamente uma falha nas novas tentativas em uma conexão com falha porque o host será considerado indisponível. Isso não se aplica às novas tentativas de uma ConcurrentModificationException.

  • Use o Java 8 em vez do Java 11. As otimizações do Netty não estão habilitadas por padrão no Java 11.

  • Este exemplo usa Retry4j para novas tentativas.

  • Para usar o driver de assinatura Sigv4 na função do Lambda em Java, consulte os requisitos de dependência em Conectando-se aos bancos de dados Amazon Neptune usando IAM com Gremlin Java.

Atenção

O CallExecutor do Retry4j pode não ser livre de threads. Pense em fazer com que cada thread use a própria instância CallExecutor.

nota

O exemplo a seguir foi atualizado para incluir o uso de requestInterceptor (). Isso foi adicionado na TinkerPop versão 3.6.6. Antes da TinkerPop versão 3.6.6, o exemplo de código usava handshakeInterceptor (), que foi descontinuado com essa versão.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.requestInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

Se você quiser incluir a lógica de reconexão na função, consulte Exemplo de reconexão Java.

JavaScript Exemplo de função Lambda para Amazon Neptune

Observações sobre este exemplo
  • O JavaScript driver não mantém um pool de conexões. Ele sempre abre uma única conexão.

  • A função de exemplo usa os utilitários de assinatura Sigv4 de gremlin-aws-sigv4 para assinar solicitações em um banco de dados habilitado para autenticação do IAM.

  • Ele usa a função retry () do módulo utilitário assíncrono de código aberto para lidar com tentativas. backoff-and-retry

  • As etapas do terminal Gremlin retornam a JavaScript promise (consulte a TinkerPop documentação). Para next(), isso é uma tupla {value, done}.

  • Os erros de conexão são gerados dentro do manipulador e tratados usando alguma backoff-and-retry lógica de acordo com as recomendações descritas aqui, com uma exceção. Há um tipo de problema de conexão que o driver não trata como uma exceção e que, portanto, não pode ser resolvido por essa backoff-and-retry lógica.

    O problema é que, se uma conexão for fechada depois que um driver enviar uma solicitação, mas antes que o driver receba uma resposta, parecerá que a consulta está concluída, mas gera um valor nulo. No que diz respeito ao cliente da função do Lambda, a função parece ter sido concluída com êxito, mas com uma resposta vazia.

    O impacto desse problema depende de como a aplicação trata uma resposta vazia. Algumas aplicações podem tratar uma resposta vazia de uma solicitação de leitura como um erro, mas outras podem tratá-la erroneamente como um resultado vazio.

    Solicitações de gravação que encontrem esse problema de conexão também exibirão uma resposta vazia. Uma invocação bem-sucedida com uma resposta vazia indica êxito ou falha? Se o cliente que estiver invocando uma função de gravação simplesmente tratar a invocação bem-sucedida da função como se a gravação no banco de dados tivesse sido confirmada em vez de verificar o corpo da resposta, poderá parecer que o sistema perdeu dados.

    Esse problema é causado pela forma como o driver trata os eventos emitidos pelo soquete subjacente. Quando o soquete de rede subjacente é fechado com um ECONNRESET erro, o WebSocket usado pelo driver é fechado e emite um 'ws close' evento. No entanto, não há nada no driver que lide com esse evento de uma forma que possa ser usada para gerar uma exceção. Como resultado, a consulta simplesmente desaparece.

    Para contornar esse problema, o exemplo da função do Lambda aqui adiciona um manipulador de eventos 'ws close' que lança uma exceção ao driver ao criar uma conexão remota. No entanto, essa exceção não é gerada ao longo do caminho de solicitação-resposta da consulta Gremlin e, portanto, não pode ser usada para acionar qualquer backoff-and-retry lógica dentro da própria função lambda. Em vez disso, a exceção lançada pelo manipulador de eventos 'ws close' gera uma exceção não tratada que faz com que a invocação do Lambda falhe. Isso permite ao cliente que invoca a função manipular o erro e repetir a invocação do Lambda, se apropriado.

    Recomendamos que você implemente a backoff-and-retry lógica na própria função lambda para proteger seus clientes contra problemas de conexão intermitentes. No entanto, a solução alternativa para o problema acima exige que o cliente implemente a lógica de repetição também para lidar com falhas resultantes desse problema de conexão específico.

Código Javascript

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Exemplo de função do Lambda em Python para Amazon Neptune

Veja algumas considerações sobre o seguinte exemplo de função AWS Lambda em Python:

  • Ele usa o módulo de recuo.

  • Ele define pool_size=1 para evitar a criação de um grupo de conexões desnecessário.

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return database_url, request.headers.items() def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V().hasLabel('column').has('column_name', 'amhstr_ag_type').in_('hascolumn').dedup().valueMap().limit(10).toList()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() # Convert headers to a dictionary if it's not already headers_dict = dict(headers) if isinstance(headers, list) else headers print(headers) return DriverRemoteConnection( database_url, 'g', pool_size=1, headers=headers_dict) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

Veja exemplos de resultados, mostrando períodos alternados de carga pesada e leve:

Diagrama mostrando exemplos de resultados do exemplo de função do Lambda em Python.
PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.