

終止支援通知：在 2025 年 12 月 15 日， AWS 將終止對 的支援 AWS IoT Analytics。2025 年 12 月 15 日之後，您將無法再存取 AWS IoT Analytics 主控台或 AWS IoT Analytics 資源。如需詳細資訊，請參閱[AWS IoT Analytics 終止支援](https://docs.aws.amazon.com/iotanalytics/latest/userguide/iotanalytics-end-of-support.html)。

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# AWS Lambda 活動
<a name="pipeline-activities-lambda"></a>

您可以使用**`lambda`活動**對訊息執行複雜的處理。例如，您可以使用來自外部 API 操作輸出的資料來充實訊息，或根據來自 Amazon DynamoDB 的邏輯篩選訊息。不過，在進入資料存放區之前，您無法使用此管道活動來新增其他訊息，或移除現有的訊息。

**`lambda` 活動**中使用的 AWS Lambda 函數必須接收並傳回 JSON 物件陣列。如需範例，請參閱「[Lambda 函數範例 1](#pipeline-activities-lambda-ex1)」。

 若要授予叫用 Lambda 函數的 AWS IoT Analytics 許可，您必須新增政策。例如，執行下列 CLI 命令，並將 {{exampleFunctionName}} 取代為 Lambda 函數的名稱，將 {{123456789012}} 取代為 AWS 您的帳戶 ID，並使用呼叫指定 Lambda 函數之管道的 Amazon Resource Name (ARN)。

```
aws lambda add-permission --function-name {{exampleFunctionName}} --action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account {{123456789012}} --source-arn arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}
```

命令會傳回下列項目：

```
{
    "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:{{exampleFunctionName}}\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"{{123456789012}}\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}\"}}}"
}
```

詳情請參閱 *AWS Lambda 開發人員指南*中的[為 AWS Lambda使用資源型政策](https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html)。

## Lambda 函數範例 1
<a name="pipeline-activities-lambda-ex1"></a>

在此範例中，Lambda 函數會根據原始訊息中的資料新增資訊。裝置會發佈承載類似下列範例的訊息。

```
{
  "thingid": "00001234abcd",
  "temperature": 26,
  "humidity": 29,
  "location": {
    "lat": 52.4332935,
    "lon": 13.231694
  },
  "ip": "192.168.178.54",
  "datetime": "2018-02-15T07:06:01"
}
```

裝置具有下列管道定義。

```
{
    "pipeline": {
        "activities": [
            {
                "channel": {
                    "channelName": "foobar_channel",
                    "name": "foobar_channel_activity",
                    "next": "lambda_foobar_activity"
                }
            },
            {
                "lambda": {
                    "lambdaName": "MyAnalyticsLambdaFunction",
                    "batchSize": 5,
                    "name": "lambda_foobar_activity",
                    "next": "foobar_store_activity"
                }
            },
            {
                "datastore": {
                    "datastoreName": "foobar_datastore",
                    "name": "foobar_store_activity"
                }
            }
        ],
        "name": "foobar_pipeline",
        "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline"
    }
}
```

下列 Lambda Python 函數 (`MyAnalyticsLambdaFunction`) 會將 GMaps URL 和以華氏為單位的溫度新增至訊息。

```
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def c_to_f(c):
    return 9.0/5.0 * c + 32

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))
    maps_url = 'N/A'

    for e in event:
        #e['foo'] = 'addedByLambda'
        if 'location' in e:
            lat = e['location']['lat']
            lon = e['location']['lon']
            maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon)

        if 'temperature' in e:
            e['temperature_f'] = c_to_f(e['temperature'])

        logger.info("maps_url: {}".format(maps_url))
        e['maps_url'] = maps_url

    logger.info("event after processing: {}".format(event))

    return event
```

## Lambda 函數範例 2
<a name="pipeline-activities-lambda-ex2"></a>

有用的技巧就是壓縮和序列化訊息承載，以降低傳輸和存放成本。在此第二個範例中，Lambda 函數假設訊息承載代表已壓縮的 JSON 原始檔，然後以 base64 編碼 （序列化） 做為字串。它會傳回原始 JSON。

```
import base64
import gzip
import json
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def decode_to_bytes(e):
    return base64.b64decode(e)

def decompress_to_string(binary_data):
    return gzip.decompress(binary_data).decode('utf-8')

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))

    decompressed_data = []

    for e in event:
        binary_data = decode_to_bytes(e)
        decompressed_string = decompress_to_string(binary_data)

        decompressed_data.append(json.loads(decompressed_string))

    logger.info("event after processing: {}".format(decompressed_data))

    return decompressed_data
```