Amazon DynamoDB에서 스트리밍 데이터 로드 - Amazon OpenSearch 서비스

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon DynamoDB에서 스트리밍 데이터 로드

AWS Lambda을 사용하여 Amazon DynamoDB에서 OpenSearch Service 도메인으로 데이터를 전송할 수 있습니다. 데이터베이스 테이블에 도착한 새 데이터는 Lambda로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다.

사전 조건

계속하려면 먼저 다음 리소스를 확보해야 합니다.

전제 조건 설명
DynamoDB 테이블

이 테이블에는 소스 데이터가 있습니다. 자세한 내용은 Amazon DynamoDB 개발자 안내서DynamoDB 테이블에 대한 기본 작업을 참조하세요.

테이블은 OpenSearch Service 도메인과 같은 리전에 위치하고 새 이미지로 설정된 스트림이 있어야 합니다. 자세한 내용은 스트림 활성화를 참조하세요.

OpenSearch Service 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 OpenSearch 서비스 도메인 생성 섹션을 참조하세요.
IAM 역할

이 역할에는 다음과 같은 기본 OpenSearch Service, DynamoDB 및 Lambda 실행 권한이 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

역할은 다음과 같은 신뢰 관계를 맺고 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

자세한 내용은 IAM 사용 설명서IAM 역할 생성을 참조하세요.

Lambda 함수 생성

Lambda 배포 패키지 생성의 지침을 따르되, ddb-to-opensearch라는 디렉터리를 만들고 sample.py에는 다음과 같은 코드를 사용합니다.

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost의 변수를 편집합니다.

아직 설치하지 않았다면 pip를 설치한 다음, 다음 명령을 사용하여 종속 항목을 설치합니다.

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

이제 Lambda 함수 생성 지침을 따르되, 사전 조건에서 IAM 역할을 지정하고 트리거에는 다음 설정을 지정합니다.

  • 테이블: 사용자의 DynamoDB 테이블

  • 배치 크기: 100

  • 시작 위치: 수평 트리밍

자세한 내용은 Amazon DynamoDB 개발자 안내서DynamoDB Streams 및 Lambda를 사용하여 새 항목 처리를 참조하세요.

이제 사용자는 완벽한 리소스 모음, 즉 소스 데이터에 대한 DynamoDB 테이블, 테이블 변경 사항의 DynamoDB 스트림, 소스 데이터가 변경되면 실행되어 이러한 변경 사항을 인덱싱하는 함수, 검색과 시각화를 위한 OpenSearch Service 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만들었으면 이제 AWS CLI를 사용해 DynamoDB 테이블에 새 항목을 추가해 함수를 테스트할 수 있습니다.

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

그런 다음 OpenSearch Service 콘솔 또는 OpenSearch Dashboards를 사용하여 lambda-index에 한 개의 문서가 있음을 확인합니다. 다음 요청을 사용할 수도 있습니다.

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }