使用示例脚本的 Amazon DataZone 快速入门 - 亚马逊 DataZone

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用示例脚本的 Amazon DataZone 快速入门

您可以 DataZone 通过管理门户网站或亚马逊 DataZone 数据门户网站访问亚马逊,也可以使用亚马逊以编程方式访问亚马逊 DataZone HTTPSAPI,亚马逊允许您直接向服务发出HTTPS请求。本节包含调用 Amazon 的示例脚本 DataZone APIs,您可以使用这些脚本来完成以下常见任务:

创建 Amazon DataZone 域名和数据门户

您可以使用以下示例脚本创建 Amazon DataZone 域名。有关 Amazon DataZone 域名的更多信息,请参阅Amazon DataZone 术语和概念

import sys import boto3 // Initialize datazone client region = 'us-east-1' dzclient = boto3.client(service_name='datazone', region_name='us-east-1') // Create DataZone domain def create_domain(name): return dzclient.create_domain( name = name, description = "this is a description", domainExecutionRole = "arn:aws:iam::<account>:role/AmazonDataZoneDomainExecutionRole", )

创建发布项目

您可以使用以下示例脚本在 Amazon 中创建发布项目 DataZone。

// Create Project def create_project(domainId): return dzclient.create_project( domainIdentifier = domainId, name = "sample-project" )

创建环境配置文件

您可以使用以下示例脚本在 Amazon 中创建环境配置文件 DataZone。

调用时使用以下CreateEnvironmentProfileAPI示例负载:

Sample Payload { "Content":{ "project_name": "Admin_project", "domain_name": "Drug-Research-and-Development", "blueprint_account_region": [ { "blueprint_name": "DefaultDataLake", "account_id": ["066535990535", "413878397724", "676266385322", "747721550195", "755347404384" ], "region": ["us-west-2", "us-east-1"] }, { "blueprint_name": "DefaultDataWarehouse", "account_id": ["066535990535", "413878397724", "676266385322", "747721550195", "755347404384" ], "region":["us-west-2", "us-east-1"] } ] } }

此示例脚本调用:CreateEnvironmentProfileAPI

def create_environment_profile(domain_id, project_id, env_blueprints) try: response = dz.list_environment_blueprints( domainIdentifier=domain_id, managed=True ) env_blueprints = response.get("items") env_blueprints_map = {} for i in env_blueprints: env_blueprints_map[i["name"]] = i['id'] print("Environment Blueprint map", env_blueprints_map) for i in blueprint_account_region: print(i) for j in i["account_id"]: for k in i["region"]: print("The env blueprint name is", i['blueprint_name']) dz.create_environment_profile( description='This is a test environment profile created via lambda function', domainIdentifier=domain_id, awsAccountId=j, awsAccountRegion=k, environmentBlueprintIdentifier=env_blueprints_map.get(i["blueprint_name"]), name=i["blueprint_name"] + j + k + "_profile", projectIdentifier=project_id ) except Exception as e: print("Failed to created Environment Profile") raise e

这是调用后的输出负载示例:CreateEnvironmentProfileAPI

{ "Content":{ "project_name": "Admin_project", "domain_name": "Drug-Research-and-Development", "blueprint_account_region": [ { "blueprint_name": "DefaultDataWarehouse", "account_id": ["111111111111"], "region":["us-west-2"], "user_parameters":[ { "name": "dataAccessSecretsArn", "value": "" } ] } ] } }

创建环境

您可以使用以下示例脚本在 Amazon 中创建环境 DataZone。

def create_environment(domain_id, project_id,blueprint_account_region ): try: #refer to get_domain_id and get_project_id for fetching ids using names. sts_client = boto3.client("sts") # Get the current account ID account_id = sts_client.get_caller_identity()["Account"] print("Fetching environment profile ids") env_profile_map = get_env_profile_map(domain_id, project_id) for i in blueprint_account_region: for j in i["account_id"]: for k in i["region"]: print(" env blueprint name", i['blueprint_name']) profile_name = i["blueprint_name"] + j + k + "_profile" env_name = i["blueprint_name"] + j + k + "_env" description = f'This is environment is created for {profile_name}, Account {account_id} and region {i["region"]}' try: dz.create_environment( description=description, domainIdentifier=domain_id, environmentProfileIdentifier=env_profile_map.get(profile_name), name=env_name, projectIdentifier=project_id ) print(f"Environment created - {env_name}") except: dz.create_environment( description=description, domainIdentifier=domain_id, environmentProfileIdentifier=env_profile_map.get(profile_name), name=env_name, projectIdentifier=project_id, userParameters= i["user_parameters"] ) print(f"Environment created - {env_name}") except Exception as e: print("Failed to created Environment") raise e

从 AWS Glue 收集元数据

您可以使用此示例脚本从 AWS Glue 收集元数据。此脚本按标准计划运行。您可以从示例脚本中检索参数并将其设为全局参数。使用标准函数获取项目、环境和域 ID。 AWS Glue 数据源是在标准时间创建和运行的,可以在脚本的 cron 部分进行更新。

def crcreate_data_source(domain_id, project_id,data_source_name) print("Creating Data Source") data_source_creation = dz.create_data_source( # Define data source : Customize the data source to which you'd like to connect # define the name of the Data source to create, example: name ='TestGlueDataSource' name=data_source_name, # give a description for the datasource (optional), example: description='This is a dorra test for creation on DZ datasources' description=data_source_description, # insert the domain identifier corresponding to the domain to which the datasource will belong, example: domainIdentifier= 'dzd_6f3gst5jjmrrmv' domainIdentifier=domain_id, # give environment identifier , example: environmentIdentifier= '3weyt6hhn8qcvb' environmentIdentifier=environment_id, # give corresponding project identifier, example: projectIdentifier= '6tl4csoyrg16ef', projectIdentifier=project_id, enableSetting="ENABLED", # publishOnImport used to select whether assets are added to the inventory and/or discovery catalog . # publishOnImport = True : Assets will be added to project's inventory as well as published to the discovery catalog # publishOnImport = False : Assets will only be added to project's inventory. # You can later curate the metadata of the assets and choose subscription terms to publish them from the inventory to the discovery catalog. publishOnImport=False, # Automated business name generation : Use AI to automatically generate metadata for assets as they are published or updated by this data source run. # Automatically generated metadata can be be approved, rejected, or edited by data publishers. # Automatically generated metadata is badged with a small icon next to the corresponding metadata field. recommendation={"enableBusinessNameGeneration": True}, type="GLUE", configuration={ "glueRunConfiguration": { "dataAccessRole": "arn:aws:iam::" + account_id + ":role/service-role/AmazonDataZoneGlueAccess-" + current_region + "-" + domain_id + "", "relationalFilterConfigurations": [ { # "databaseName": glue_database_name, "filterExpressions": [ {"expression": "*", "type": "INCLUDE"}, ], # "schemaName": "TestSchemaName", }, ], }, }, # Add metadata forms to the data source (OPTIONAL). # Metadata forms will be automatically applied to any assets that are created by the data source. # assetFormsInput=[ # { # "content": "string", # "formName": "string", # "typeIdentifier": "string", # "typeRevision": "string", # }, # ], schedule={ "schedule": "cron(5 20 * * ? *)", "timezone": "UTC", }, ) # This is a suggested syntax to return values # return_values["data_source_creation"] = data_source_creation["items"] print("Data Source Created") //This is the sample response payload after the CreateDataSource API is invoked: { "Content":{ "project_name": "Admin", "domain_name": "Drug-Research-and-Development", "env_name": "GlueEnvironment", "glue_database_name": "test", "data_source_name" : "test", "data_source_description" : "This is a test data source" } }

整理和发布数据资产

您可以使用以下示例脚本在 Amazon DataZone 中整理和发布数据资产。

您可以使用以下脚本来创建自定义表单类型:

def create_form_type(domainId, projectId): return dzclient.create_form_type( domainIdentifier = domainId, name = "customForm", model = { "smithy": "structure customForm { simple: String }" }, owningProjectIdentifier = projectId, status = "ENABLED" )

您可以使用以下示例脚本来创建自定义资产类型:

def create_custom_asset_type(domainId, projectId): return dzclient.create_asset_type( domainIdentifier = domainId, name = "userCustomAssetType", formsInput = { "Model": { "typeIdentifier": "customForm", "typeRevision": "1", "required": False } }, owningProjectIdentifier = projectId, )

您可以使用以下示例脚本来创建自定义资产:

def create_custom_asset(domainId, projectId): return dzclient.create_asset( domainIdentifier = domainId, name = 'custom asset', description = "custom asset", owningProjectIdentifier = projectId, typeIdentifier = "userCustomAssetType", formsInput = [ { "formName": "UserCustomForm", "typeIdentifier": "customForm", "content": "{\"simple\":\"sample-catalogId\"}" } ] )

您可以使用以下示例脚本来创建词汇表:

def create_glossary(domainId, projectId): return dzclient.create_glossary( domainIdentifier = domainId, name = "test7", description = "this is a test glossary", owningProjectIdentifier = projectId )

您可以使用以下示例脚本来创建词汇表术语:

def create_glossary_term(domainId, glossaryId): return dzclient.create_glossary_term( domainIdentifier = domainId, name = "soccer", shortDescription = "this is a test glossary", glossaryIdentifier = glossaryId, )

您可以使用以下示例脚本使用系统定义的资产类型创建资产:

def create_asset(domainId, projectId): return dzclient.create_asset( domainIdentifier = domainId, name = 'sample asset name', description = "this is a glue table asset", owningProjectIdentifier = projectId, typeIdentifier = "amazon.datazone.GlueTableAssetType", formsInput = [ { "formName": "GlueTableForm", "content": "{\"catalogId\":\"sample-catalogId\",\"columns\":[{\"columnDescription\":\"sample-columnDescription\",\"columnName\":\"sample-columnName\",\"dataType\":\"sample-dataType\",\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}}],\"compressionType\":\"sample-compressionType\",\"lakeFormationDetails\":{\"lakeFormationManagedTable\":false,\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}},\"primaryKeys\":[\"sample-Key1\",\"sample-Key2\"],\"region\":\"us-east-1\",\"sortKeys\":[\"sample-sortKey1\"],\"sourceClassification\":\"sample-sourceClassification\",\"sourceLocation\":\"sample-sourceLocation\",\"tableArn\":\"sample-tableArn\",\"tableDescription\":\"sample-tableDescription\",\"tableName\":\"sample-tableName\"}" } ] )

您可以使用以下示例脚本来创建资源修订版并附加词汇表术语:

def create_asset_revision(domainId, assetId): return dzclient.create_asset_revision( domainIdentifier = domainId, identifier = assetId, name = 'glue table asset 7', description = "glue table asset description update", formsInput = [ { "formName": "GlueTableForm", "content": "{\"catalogId\":\"sample-catalogId\",\"columns\":[{\"columnDescription\":\"sample-columnDescription\",\"columnName\":\"sample-columnName\",\"dataType\":\"sample-dataType\",\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}}],\"compressionType\":\"sample-compressionType\",\"lakeFormationDetails\":{\"lakeFormationManagedTable\":false,\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}},\"primaryKeys\":[\"sample-Key1\",\"sample-Key2\"],\"region\":\"us-east-1\",\"sortKeys\":[\"sample-sortKey1\"],\"sourceClassification\":\"sample-sourceClassification\",\"sourceLocation\":\"sample-sourceLocation\",\"tableArn\":\"sample-tableArn\",\"tableDescription\":\"sample-tableDescription\",\"tableName\":\"sample-tableName\"}" } ], glossaryTerms = ["<glossaryTermId:>"] )

您可以使用以下示例脚本来发布资源:

def publish_asset(domainId, assetId): return dzclient.create_listing_change_set( domainIdentifier = domainId, entityIdentifier = assetId, entityType = "ASSET", action = "PUBLISH", )

搜索数据目录并订阅数据

您可以使用以下示例脚本来搜索数据目录和订阅数据:

def search_asset(domainId, projectId, text): return dzclient.search( domainIdentifier = domainId, owningProjectIdentifier = projectId, searchScope = "ASSET", searchText = text, )

您可以使用以下示例脚本来获取资产的清单 ID:

def search_listings(domainId, assetName, assetId): listings = dzclient.search_listings( domainIdentifier=domainId, searchText=assetName, additionalAttributes=["FORMS"] ) assetListing = None for listing in listings['items']: if listing['assetListing']['entityId'] == assetId: assetListing = listing return listing['assetListing']['listingId']

您可以使用以下示例脚本使用清单 ID 创建订阅请求:

create_subscription_response = def create_subscription_request(domainId, projectId, listingId): return dzclient.create_subscription_request( subscribedPrincipals=[{ "project": { "identifier": projectId } }], subscribedListings=[{ "identifier": listingId }], requestReason="Give request reason here." )

使用create_subscription_response上述方法,获取订阅 subscription_request_id,然后使用以下示例脚本接受/批准订阅:

subscription_request_id = create_subscription_response["id"] def accept_subscription_request(domainId, subscriptionRequestId): return dzclient.accept_subscription_request( domainIdentifier=domainId, identifier=subscriptionRequestId )

在数据目录中搜索资产

您可以使用以下利用自由文本搜索的示例脚本在 Amazon DataZone 目录中查找您发布的数据资产(清单)。

  • 以下示例在网域中执行自由文本关键字搜索,并返回与提供的关键字 “credit” 匹配的所有列表:

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --search-text "credit"
  • 您也可以组合多个关键字以进一步缩小搜索范围。例如,如果您要查找所有已发布的数据资产(清单),其中包含与墨西哥的销售相关的数据,则可以使用两个关键字 “墨西哥” 和 “销售” 来表述查询。

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --search-text "mexico sales"

您也可以使用筛选条件搜索房源。中的filters参数 SearchListings API允许您从域中检索经过筛选的结果。API支持多个默认筛选器,您也可以组合两个或多个过滤器并对它们执行 AND /OR 操作。过滤子句采用两个参数:属性值和值。默认支持的筛选器属性为typeNameowningProjectId、和glossaryTerms

  • 以下示例使用assetType过滤器搜索给定网域中的所有房源,其中列表是 Redshift 表的一种。

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --filters '{"or":[{"filter":{"attribute":"typeName","value":"RedshiftTableAssetType"}} ]}'
  • 您也可以使用 AND /OR 操作将多个筛选器组合在一起。在以下示例中,您可以合并typeNameproject筛选。

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --filters '{"or":[{"filter":{"attribute":"typeName","value":"RedshiftTableAssetType"}}, {"filter":{"attribute":"owningProjectId","value":"cwrrjch7f5kppj"}} ]}'
  • 您甚至可以将自由文本搜索与过滤器结合使用来查找确切的结果,并按列表的创建/上次更新时间对其进行进一步排序,如以下示例所示:

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --search-text "finance sales" \ --filters '{"or":[{"filter":{"attribute":"typeName","value":"GlueTableViewType"}} ]}' \ --sort '{"attribute": "UPDATED_AT", "order":"ASCENDING"}'

其他有用的示例脚本

在 Amazon 中处理数据时,您可以使用以下示例脚本来完成各种任务 DataZone。

使用以下示例脚本列出现有的 Amazon DataZone 域名:

def list_domains(): datazone = boto3.client('datazone') response = datazone.list_domains(status='AVAILABLE') [print("%12s | %16s | %12s | %52s" % (item['id'], item['name'], item['managedAccountId'], item['portalUrl'])) for item in response['items']] return

使用以下示例脚本列出现有的 Amazon DataZone 项目:

def list_projects(domain_id): datazone = boto3.client('datazone') response = datazone.list_projects(domainIdentifier=domain_id) [print("%12s | %16s " % (item['id'], item['name'])) for item in response['items']] return

使用以下示例脚本列出现有的 Amazon DataZone 元数据表单:

def list_metadata_forms(domain_id): datazone = boto3.client('datazone') response = datazone.search_types(domainIdentifier=domain_id, managed=False, searchScope='FORM_TYPE') [print("%16s | %16s | %3s | %8s" % (item['formTypeItem']['name'], item['formTypeItem']['owningProjectId'],item['formTypeItem']['revision'], item['formTypeItem']['status'])) for item in response['items']] return