

终止支持通知： AWS 将于 2025 年 12 月 15 日终止对的支持 AWS IoT Analytics。2025 年 12 月 15 日之后，您将无法再访问 AWS IoT Analytics 控制台或 AWS IoT Analytics 资源。有关更多信息，请参阅[AWS IoT Analytics 终止支持](https://docs.aws.amazon.com/iotanalytics/latest/userguide/iotanalytics-end-of-support.html)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# AWS Lambda 活动
<a name="pipeline-activities-lambda"></a>

**`lambda` 活动**可用于对消息执行复杂的处理。例如，您可使用来自外部 API 操作输出的数据来丰富消息，或者根据来自 Amazon DynamoDB 的逻辑筛选消息。但是，在进入数据存储之前，您无法使用此管道活动来添加其他消息或删除现有消息。

**`lambda`活动**中使用的 AWS Lambda 函数必须接收并返回一个 JSON 对象数组。有关示例，请参阅[Lambda 函数示例 1](#pipeline-activities-lambda-ex1)。

 要授予调用 Lambda 函数的 AWS IoT Analytics 权限，您必须添加策略。例如，运行以下 CLI 命令并{{exampleFunctionName}}替换为您的 Lambda 函数的名称，替换为您的 AWS 账户 ID，然后使用调{{123456789012}}用给定 Lambda 函数的管道的亚马逊资源名称 (ARN)。

```
aws lambda add-permission --function-name {{exampleFunctionName}} --action lambda:InvokeFunction --statement-id iotanalytics --principal iotanalytics.amazonaws.com --source-account {{123456789012}} --source-arn arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}
```

该命令将返回以下输出：

```
{
    "Statement": "{\"Sid\":\"iotanalyticsa\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"iotanalytics.amazonaws.com\"},\"Action\":\"lambda:InvokeFunction\",\"Resource\":\"arn:aws:lambda:aws-region:aws-account:function:{{exampleFunctionName}}\",\"Condition\":{\"StringEquals\":{\"AWS:SourceAccount\":\"{{123456789012}}\"},\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:iotanalytics:{{us-east-1}}:{{123456789012}}:pipeline/{{examplePipeline}}\"}}}"
}
```

有关更多信息，请参阅*AWS Lambda 开发人员指南*中的[为 AWS Lambda使用基于资源的策略](https://docs.aws.amazon.com/lambda/latest/dg/access-control-resource-based.html)。

## Lambda 函数示例 1
<a name="pipeline-activities-lambda-ex1"></a>

在该示例中，Lambda 函数根据原始消息中的数据添加信息。设备发布一条包含类似于以下负载的消息。

```
{
  "thingid": "00001234abcd",
  "temperature": 26,
  "humidity": 29,
  "location": {
    "lat": 52.4332935,
    "lon": 13.231694
  },
  "ip": "192.168.178.54",
  "datetime": "2018-02-15T07:06:01"
}
```

并且该设备具有以下管道定义。

```
{
    "pipeline": {
        "activities": [
            {
                "channel": {
                    "channelName": "foobar_channel",
                    "name": "foobar_channel_activity",
                    "next": "lambda_foobar_activity"
                }
            },
            {
                "lambda": {
                    "lambdaName": "MyAnalyticsLambdaFunction",
                    "batchSize": 5,
                    "name": "lambda_foobar_activity",
                    "next": "foobar_store_activity"
                }
            },
            {
                "datastore": {
                    "datastoreName": "foobar_datastore",
                    "name": "foobar_store_activity"
                }
            }
        ],
        "name": "foobar_pipeline",
        "arn": "arn:aws:iotanalytics:eu-west-1:123456789012:pipeline/foobar_pipeline"
    }
}
```

以下 Lambda Python 函数 (`MyAnalyticsLambdaFunction`) 将 GMaps 网址和温度（以华氏度为单位）添加到消息中。

```
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def c_to_f(c):
    return 9.0/5.0 * c + 32

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))
    maps_url = 'N/A'

    for e in event:
        #e['foo'] = 'addedByLambda'
        if 'location' in e:
            lat = e['location']['lat']
            lon = e['location']['lon']
            maps_url = "http://maps.google.com/maps?q={},{}".format(lat,lon)

        if 'temperature' in e:
            e['temperature_f'] = c_to_f(e['temperature'])

        logger.info("maps_url: {}".format(maps_url))
        e['maps_url'] = maps_url

    logger.info("event after processing: {}".format(event))

    return event
```

## Lambda 函数示例 2
<a name="pipeline-activities-lambda-ex2"></a>

一种有用的方法是压缩并序列化消息负载，以降低传输和存储成本。在该第二个示例中，Lambda 函数假定消息负载表示已压缩并以字符串形式进行 Base64 编码（序列化）的 JSON 原始数据。它返回原始 JSON。

```
import base64
import gzip
import json
import logging
import sys

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler(stream=sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

def decode_to_bytes(e):
    return base64.b64decode(e)

def decompress_to_string(binary_data):
    return gzip.decompress(binary_data).decode('utf-8')

def lambda_handler(event, context):
    logger.info("event before processing: {}".format(event))

    decompressed_data = []

    for e in event:
        binary_data = decode_to_bytes(e)
        decompressed_string = decompress_to_string(binary_data)

        decompressed_data.append(json.loads(decompressed_string))

    logger.info("event after processing: {}".format(decompressed_data))

    return decompressed_data
```