Amazon ECS 至少发送一次事件。这意味着您可能会收到给定事件的多个副本。此外,无法按事件的发生顺序将事件传送到事件侦听器。
为了正确排序事件,每个事件的 detail
部分均包含 version
属性。每次当资源更改状态时,此 version
会递增。重复事件在 detail
对象中具有相同 version
。如果使用 EventBridge 复制 Amazon ECS 容器实例和任务状态,则可以将 Amazon ECS API 报告的 version
资源版本与资源在 EventBridge 中报告的版本进行比较,以验证事件流中的版本是否为当前版本。版本属性数值更大的事件应视为在版本号更小的事件之后发生。
示例:在 AWS Lambda 函数中处理事件
以下示例显示了一个用 Python 3.9 编写的 Lambda 函数,此函数可同时捕获任务和容器实例状态更改事件,并将这些事件保存到两个 Amazon DynamoDB 表之一:
-
ECSCtrInstanceState – 存储容器实例的最新状态。表 ID 是容器实例的
containerInstanceArn
值。 -
ECSTaskState – 存储任务的最新状态。表 ID 是任务的
taskArn
值。
import json
import boto3
def lambda_handler(event, context):
id_name = ""
new_record = {}
# For debugging so you can see raw event format.
print('Here is the event:')
print((json.dumps(event)))
if event["source"] != "aws.ecs":
raise ValueError("Function only supports input from events with a source type of: aws.ecs")
# Switch on task/container events.
table_name = ""
if event["detail-type"] == "ECS Task State Change":
table_name = "ECSTaskState"
id_name = "taskArn"
event_id = event["detail"]["taskArn"]
elif event["detail-type"] == "ECS Container Instance State Change":
table_name = "ECSCtrInstanceState"
id_name = "containerInstanceArn"
event_id = event["detail"]["containerInstanceArn"]
else:
raise ValueError("detail-type for event is not a supported type. Exiting without saving event.")
new_record["cw_version"] = event["version"]
new_record.update(event["detail"])
# "status" is a reserved word in DDB, but it appears in containerPort
# state change messages.
if "status" in event:
new_record["current_status"] = event["status"]
new_record.pop("status")
# Look first to see if you have received a newer version of an event ID.
# If the version is OLDER than what you have on file, do not process it.
# Otherwise, update the associated record with this latest information.
print("Looking for recent event with same ID...")
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
table = dynamodb.Table(table_name)
saved_event = table.get_item(
Key={
id_name : event_id
}
)
if "Item" in saved_event:
# Compare events and reconcile.
print(("EXISTING EVENT DETECTED: Id " + event_id + " - reconciling"))
if saved_event["Item"]["version"] < event["detail"]["version"]:
print("Received event is a more recent version than the stored event - updating")
table.put_item(
Item=new_record
)
else:
print("Received event is an older version than the stored event - ignoring")
else:
print(("Saving new event - ID " + event_id))
table.put_item(
Item=new_record
)
以下 Fargate 示例显示了一个用 Python 3.9 编写的 Lambda 函数,此函数可捕获任务状态更改事件,并将这些事件保存到以下 Amazon DynamoDB 表:
import json
import boto3
def lambda_handler(event, context):
id_name = ""
new_record = {}
# For debugging so you can see raw event format.
print('Here is the event:')
print((json.dumps(event)))
if event["source"] != "aws.ecs":
raise ValueError("Function only supports input from events with a source type of: aws.ecs")
# Switch on task/container events.
table_name = ""
if event["detail-type"] == "ECS Task State Change":
table_name = "ECSTaskState"
id_name = "taskArn"
event_id = event["detail"]["taskArn"]
else:
raise ValueError("detail-type for event is not a supported type. Exiting without saving event.")
new_record["cw_version"] = event["version"]
new_record.update(event["detail"])
# "status" is a reserved word in DDB, but it appears in containerPort
# state change messages.
if "status" in event:
new_record["current_status"] = event["status"]
new_record.pop("status")
# Look first to see if you have received a newer version of an event ID.
# If the version is OLDER than what you have on file, do not process it.
# Otherwise, update the associated record with this latest information.
print("Looking for recent event with same ID...")
dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
table = dynamodb.Table(table_name)
saved_event = table.get_item(
Key={
id_name : event_id
}
)
if "Item" in saved_event:
# Compare events and reconcile.
print(("EXISTING EVENT DETECTED: Id " + event_id + " - reconciling"))
if saved_event["Item"]["version"] < event["detail"]["version"]:
print("Received event is a more recent version than the stored event - updating")
table.put_item(
Item=new_record
)
else:
print("Received event is an older version than the stored event - ignoring")
else:
print(("Saving new event - ID " + event_id))
table.put_item(
Item=new_record
)