

지원 종료 알림: 2025년 12월 15일에 AWS 에 대한 지원이 종료됩니다 AWS IoT 분석. 2025년 12월 15일 이후에는 AWS IoT 분석 콘솔 또는 AWS IoT 분석 리소스에 더 이상 액세스할 수 없습니다. 자세한 내용은 [AWS IoT 분석 지원 종료를 참조하세요](https://docs.aws.amazon.com/iotanalytics/latest/userguide/iotanalytics-end-of-support.html).

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# AWS Lambda 활동
<a name="pipeline-activities-lambda"></a>

**`lambda` 활동**은 더욱 복잡한 메시지 처리를 수행하기 위해 사용될 수 있습니다. 예를 들어, 외부 API 작업의 출력 데이터로 메시지를 보강하거나 Amazon DynamoDB의 로직을 기반으로 메시지를 필터링할 수 있습니다. 하지만 데이터 스토어에 들어가기 전에 이 파이프라인 활동을 사용하여 메시지를 추가하거나 기존 메시지를 제거할 수는 없습니다.

**`lambda` 활동에** 사용되는 AWS Lambda 함수는 JSON 객체 배열을 수신하고 반환해야 합니다. 예제는 [Lambda 함수 예시 1](#pipeline-activities-lambda-ex1) 섹션을 참조하세요.

 Lambda 함수를 호출할 수 있는 AWS IoT 분석 권한을 부여하려면 정책을 추가해야 합니다. 예를 들어 다음 CLI 명령을 실행하고 {{exampleFunctionName}}을 Lambda 함수의 이름으로 바꾸고, {{123456789012}}을 AWS 계정 ID로 바꾸고, 지정된 Lambda 함수를 호출하는 파이프라인의 Amazon 리소스 이름(ARN)을 사용합니다.

```
aws lambda add-permission --function-name {{exampleFunctionName}} --action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account {{123456789012}} --source-arn arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}
```

명령은 다음을 반환합니다.

```
{
    "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:{{exampleFunctionName}}\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"{{123456789012}}\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}\"}}}"
}
```

자세한 내용은 *AWS Lambda 개발자 안내서*에서 [AWS Lambda에 대해 리소스 기반 정책 사용](https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html)을 참조하십시오.

## Lambda 함수 예시 1
<a name="pipeline-activities-lambda-ex1"></a>

이 예시에서 Lambda 함수는 원본 메시지의 데이터를 기반으로 정보를 추가합니다. 디바이스는 다음 예시와 유사한 페이로드와 함께 메시지를 게시합니다.

```
{
  "thingid": "00001234abcd",
  "temperature": 26,
  "humidity": 29,
  "location": {
    "lat": 52.4332935,
    "lon": 13.231694
  },
  "ip": "192.168.178.54",
  "datetime": "2018-02-15T07:06:01"
}
```

그리고 디바이스에는 다음과 같은 파이프라인 정의가 포합됩니다.

```
{
    "pipeline": {
        "activities": [
            {
                "channel": {
                    "channelName": "foobar_channel",
                    "name": "foobar_channel_activity",
                    "next": "lambda_foobar_activity"
                }
            },
            {
                "lambda": {
                    "lambdaName": "MyAnalyticsLambdaFunction",
                    "batchSize": 5,
                    "name": "lambda_foobar_activity",
                    "next": "foobar_store_activity"
                }
            },
            {
                "datastore": {
                    "datastoreName": "foobar_datastore",
                    "name": "foobar_store_activity"
                }
            }
        ],
        "name": "foobar_pipeline",
        "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline"
    }
}
```

다음 Lambda Python 함수(`MyAnalyticsLambdaFunction`)는 메시지에 GMaps URL과 화씨 온도를 추가합니다.

```
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def c_to_f(c):
    return 9.0/5.0 * c + 32

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))
    maps_url = 'N/A'

    for e in event:
        #e['foo'] = 'addedByLambda'
        if 'location' in e:
            lat = e['location']['lat']
            lon = e['location']['lon']
            maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon)

        if 'temperature' in e:
            e['temperature_f'] = c_to_f(e['temperature'])

        logger.info("maps_url: {}".format(maps_url))
        e['maps_url'] = maps_url

    logger.info("event after processing: {}".format(event))

    return event
```

## Lambda 함수 예시 2
<a name="pipeline-activities-lambda-ex2"></a>

유용한 기술은 메시지 페이로드를 압축 및 직렬화하여 전송 및 저장 비용을 줄이는 것입니다. 이 두 번째 예시에서 Lambda 함수는 메시지 페이로드가 압축된 후 문자열로 base64 인코딩(직렬화)된 JSON 원본을 나타내는 것으로 가정합니다. 원본 JSON을 반환합니다.

```
import base64
import gzip
import json
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def decode_to_bytes(e):
    return base64.b64decode(e)

def decompress_to_string(binary_data):
    return gzip.decompress(binary_data).decode('utf-8')

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))

    decompressed_data = []

    for e in event:
        binary_data = decode_to_bytes(e)
        decompressed_string = decompress_to_string(binary_data)

        decompressed_data.append(json.loads(decompressed_string))

    logger.info("event after processing: {}".format(decompressed_data))

    return decompressed_data
```