

サポート終了通知: 2025 年 12 月 15 日に、 AWS はサポートを終了します AWS IoT Analytics。2025 年 12 月 15 日以降、 AWS IoT Analytics コンソールまたは AWS IoT Analytics リソースにアクセスできなくなります。詳細については、[AWS IoT Analytics 「サポート終了](https://docs.aws.amazon.com/iotanalytics/latest/userguide/iotanalytics-end-of-support.html)」を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# AWS Lambda アクティビティ
<a name="pipeline-activities-lambda"></a>

**`lambda`アクティビティ**を使用すれば、メッセージでより複雑な処理を実行できます。たとえば、外部 API の出力からのデータによるメッセージの強化や、Amazon DynamoDB のロジックに基づくメッセージのフィルターがあります。ただし、データストアに入る前に、このパイプラインアクティビティを使用してメッセージを追加したり、既存のメッセージを削除したりすることはできません。

**`lambda` アクティビティ**で使用される AWS Lambda 関数は、JSON オブジェクトの配列を受信して返す必要があります。例については、[Lambda 関数の例 1](#pipeline-activities-lambda-ex1)を参照してください。

 Lambda 関数を呼び出す AWS IoT Analytics アクセス許可を付与するには、ポリシーを追加する必要があります。たとえば、次の CLI コマンドを実行し、{{exampleFunctionName}} を Lambda 関数の名前に置き換え、{{123456789012}} を AWS アカウント ID に置き換え、指定された Lambda 関数を呼び出すパイプラインの Amazon リソースネーム (ARN) を使用します。

```
aws lambda add-permission --function-name {{exampleFunctionName}} --action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account {{123456789012}} --source-arn arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}
```

このコマンドは以下を返します。

```
{
    "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:{{exampleFunctionName}}\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"{{123456789012}}\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}\"}}}"
}
```

詳細については、*AWS Lambda デベロッパーガイド*の「[AWS Lambdaのリソースベースのポリシーを使用する](https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html)」を参照してください。

## Lambda 関数の例 1
<a name="pipeline-activities-lambda-ex1"></a>

この例では、Lambda 関数は、元のメッセージのデータに基づいて、追加情報を追加します。デ次の例のようなペイロードを含むメッセージがデバイスから発行されます。

```
{
  "thingid": "00001234abcd",
  "temperature": 26,
  "humidity": 29,
  "location": {
    "lat": 52.4332935,
    "lon": 13.231694
  },
  "ip": "192.168.178.54",
  "datetime": "2018-02-15T07:06:01"
}
```

そのデバイスには次のパイプライン定義があります。

```
{
    "pipeline": {
        "activities": [
            {
                "channel": {
                    "channelName": "foobar_channel",
                    "name": "foobar_channel_activity",
                    "next": "lambda_foobar_activity"
                }
            },
            {
                "lambda": {
                    "lambdaName": "MyAnalyticsLambdaFunction",
                    "batchSize": 5,
                    "name": "lambda_foobar_activity",
                    "next": "foobar_store_activity"
                }
            },
            {
                "datastore": {
                    "datastoreName": "foobar_datastore",
                    "name": "foobar_store_activity"
                }
            }
        ],
        "name": "foobar_pipeline",
        "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline"
    }
}
```

次の Lambda Python 関数 `MyAnalyticsLambdaFunction`により GMaps URL と華氏の温度がメッセージに追加されます。

```
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def c_to_f(c):
    return 9.0/5.0 * c + 32

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))
    maps_url = 'N/A'

    for e in event:
        #e['foo'] = 'addedByLambda'
        if 'location' in e:
            lat = e['location']['lat']
            lon = e['location']['lon']
            maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon)

        if 'temperature' in e:
            e['temperature_f'] = c_to_f(e['temperature'])

        logger.info("maps_url: {}".format(maps_url))
        e['maps_url'] = maps_url

    logger.info("event after processing: {}".format(event))

    return event
```

## Lambda 関数の例 2
<a name="pipeline-activities-lambda-ex2"></a>

メッセージペイロードを圧縮およびシリアル化して、トランスポートおよびストレージコストを削減するのが便利です。この 2 番目の例では、Lambda 関数はメッセージペイロードが圧縮されて、文字列として base64 エンコード シリアル化された元の JSON を表していると想定しています。元の JSON が返されます。

```
import base64
import gzip
import json
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def decode_to_bytes(e):
    return base64.b64decode(e)

def decompress_to_string(binary_data):
    return gzip.decompress(binary_data).decode('utf-8')

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))

    decompressed_data = []

    for e in event:
        binary_data = decode_to_bytes(e)
        decompressed_string = decompress_to_string(binary_data)

        decompressed_data.append(json.loads(decompressed_string))

    logger.info("event after processing: {}".format(decompressed_data))

    return decompressed_data
```