Amazon DataZone 快速入門範例指令碼 - Amazon DataZone

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon DataZone 快速入門範例指令碼

您可以透過 DataZone 管理入口網站或 Amazon DataZone 資料入口網站存取 Amazon,或使用 Amazon 以程式設計方式存取 Amazon DataZone HTTPSAPI,這可讓您直接向 服務發出HTTPS請求。本節包含叫用 Amazon DataZone APIs 的範例指令碼,可用來完成下列常見任務:

建立 Amazon DataZone 網域和資料入口網站

您可以使用下列範例指令碼來建立 Amazon DataZone 網域。如需 Amazon DataZone 網域的詳細資訊,請參閱 Amazon DataZone 術語和概念

import sys import boto3 // Initialize datazone client region = 'us-east-1' dzclient = boto3.client(service_name='datazone', region_name='us-east-1') // Create DataZone domain def create_domain(name): return dzclient.create_domain( name = name, description = "this is a description", domainExecutionRole = "arn:aws:iam::<account>:role/AmazonDataZoneDomainExecutionRole", )

建立發佈專案

您可以使用下列範例指令碼,在 Amazon 中建立發佈專案 DataZone。

// Create Project def create_project(domainId): return dzclient.create_project( domainIdentifier = domainId, name = "sample-project" )

建立環境設定檔

您可以使用下列範例指令碼,在 Amazon 中建立環境設定檔 DataZone。

叫用 CreateEnvironmentProfile API 時,會使用此範例承載:

Sample Payload { "Content":{ "project_name": "Admin_project", "domain_name": "Drug-Research-and-Development", "blueprint_account_region": [ { "blueprint_name": "DefaultDataLake", "account_id": ["066535990535", "413878397724", "676266385322", "747721550195", "755347404384" ], "region": ["us-west-2", "us-east-1"] }, { "blueprint_name": "DefaultDataWarehouse", "account_id": ["066535990535", "413878397724", "676266385322", "747721550195", "755347404384" ], "region":["us-west-2", "us-east-1"] } ] } }

此範例指令碼會叫用 CreateEnvironmentProfile API:

def create_environment_profile(domain_id, project_id, env_blueprints) try: response = dz.list_environment_blueprints( domainIdentifier=domain_id, managed=True ) env_blueprints = response.get("items") env_blueprints_map = {} for i in env_blueprints: env_blueprints_map[i["name"]] = i['id'] print("Environment Blueprint map", env_blueprints_map) for i in blueprint_account_region: print(i) for j in i["account_id"]: for k in i["region"]: print("The env blueprint name is", i['blueprint_name']) dz.create_environment_profile( description='This is a test environment profile created via lambda function', domainIdentifier=domain_id, awsAccountId=j, awsAccountRegion=k, environmentBlueprintIdentifier=env_blueprints_map.get(i["blueprint_name"]), name=i["blueprint_name"] + j + k + "_profile", projectIdentifier=project_id ) except Exception as e: print("Failed to created Environment Profile") raise e

這是叫用 CreateEnvironmentProfile API 時的範例輸出承載:

{ "Content":{ "project_name": "Admin_project", "domain_name": "Drug-Research-and-Development", "blueprint_account_region": [ { "blueprint_name": "DefaultDataWarehouse", "account_id": ["111111111111"], "region":["us-west-2"], "user_parameters":[ { "name": "dataAccessSecretsArn", "value": "" } ] } ] } }

建立環境

您可以使用下列範例指令碼,在 Amazon 中建立環境 DataZone。

def create_environment(domain_id, project_id,blueprint_account_region ): try: #refer to get_domain_id and get_project_id for fetching ids using names. sts_client = boto3.client("sts") # Get the current account ID account_id = sts_client.get_caller_identity()["Account"] print("Fetching environment profile ids") env_profile_map = get_env_profile_map(domain_id, project_id) for i in blueprint_account_region: for j in i["account_id"]: for k in i["region"]: print(" env blueprint name", i['blueprint_name']) profile_name = i["blueprint_name"] + j + k + "_profile" env_name = i["blueprint_name"] + j + k + "_env" description = f'This is environment is created for {profile_name}, Account {account_id} and region {i["region"]}' try: dz.create_environment( description=description, domainIdentifier=domain_id, environmentProfileIdentifier=env_profile_map.get(profile_name), name=env_name, projectIdentifier=project_id ) print(f"Environment created - {env_name}") except: dz.create_environment( description=description, domainIdentifier=domain_id, environmentProfileIdentifier=env_profile_map.get(profile_name), name=env_name, projectIdentifier=project_id, userParameters= i["user_parameters"] ) print(f"Environment created - {env_name}") except Exception as e: print("Failed to created Environment") raise e

從 AWS Glue 收集中繼資料

您可以使用此範例指令碼從 AWS Glue 收集中繼資料。此指令碼會以標準排程執行。您可以從範例指令碼擷取參數,並將其設為全域。使用標準函數擷取專案、環境和網域 ID。 AWS Glue 資料來源是在標準時間建立和執行,可在指令碼的 cron 區段中更新。

def crcreate_data_source(domain_id, project_id,data_source_name) print("Creating Data Source") data_source_creation = dz.create_data_source( # Define data source : Customize the data source to which you'd like to connect # define the name of the Data source to create, example: name ='TestGlueDataSource' name=data_source_name, # give a description for the datasource (optional), example: description='This is a dorra test for creation on DZ datasources' description=data_source_description, # insert the domain identifier corresponding to the domain to which the datasource will belong, example: domainIdentifier= 'dzd_6f3gst5jjmrrmv' domainIdentifier=domain_id, # give environment identifier , example: environmentIdentifier= '3weyt6hhn8qcvb' environmentIdentifier=environment_id, # give corresponding project identifier, example: projectIdentifier= '6tl4csoyrg16ef', projectIdentifier=project_id, enableSetting="ENABLED", # publishOnImport used to select whether assets are added to the inventory and/or discovery catalog . # publishOnImport = True : Assets will be added to project's inventory as well as published to the discovery catalog # publishOnImport = False : Assets will only be added to project's inventory. # You can later curate the metadata of the assets and choose subscription terms to publish them from the inventory to the discovery catalog. publishOnImport=False, # Automated business name generation : Use AI to automatically generate metadata for assets as they are published or updated by this data source run. # Automatically generated metadata can be be approved, rejected, or edited by data publishers. # Automatically generated metadata is badged with a small icon next to the corresponding metadata field. recommendation={"enableBusinessNameGeneration": True}, type="GLUE", configuration={ "glueRunConfiguration": { "dataAccessRole": "arn:aws:iam::" + account_id + ":role/service-role/AmazonDataZoneGlueAccess-" + current_region + "-" + domain_id + "", "relationalFilterConfigurations": [ { # "databaseName": glue_database_name, "filterExpressions": [ {"expression": "*", "type": "INCLUDE"}, ], # "schemaName": "TestSchemaName", }, ], }, }, # Add metadata forms to the data source (OPTIONAL). # Metadata forms will be automatically applied to any assets that are created by the data source. # assetFormsInput=[ # { # "content": "string", # "formName": "string", # "typeIdentifier": "string", # "typeRevision": "string", # }, # ], schedule={ "schedule": "cron(5 20 * * ? *)", "timezone": "UTC", }, ) # This is a suggested syntax to return values # return_values["data_source_creation"] = data_source_creation["items"] print("Data Source Created") //This is the sample response payload after the CreateDataSource API is invoked: { "Content":{ "project_name": "Admin", "domain_name": "Drug-Research-and-Development", "env_name": "GlueEnvironment", "glue_database_name": "test", "data_source_name" : "test", "data_source_description" : "This is a test data source" } }

整理和發佈資料資產

您可以使用下列範例指令碼,在 Amazon 中策劃和發佈資料資產 DataZone。

您可以使用下列指令碼來建立自訂表單類型:

def create_form_type(domainId, projectId): return dzclient.create_form_type( domainIdentifier = domainId, name = "customForm", model = { "smithy": "structure customForm { simple: String }" }, owningProjectIdentifier = projectId, status = "ENABLED" )

您可以使用下列範例指令碼來建立自訂資產類型:

def create_custom_asset_type(domainId, projectId): return dzclient.create_asset_type( domainIdentifier = domainId, name = "userCustomAssetType", formsInput = { "Model": { "typeIdentifier": "customForm", "typeRevision": "1", "required": False } }, owningProjectIdentifier = projectId, )

您可以使用下列範例指令碼來建立自訂資產:

def create_custom_asset(domainId, projectId): return dzclient.create_asset( domainIdentifier = domainId, name = 'custom asset', description = "custom asset", owningProjectIdentifier = projectId, typeIdentifier = "userCustomAssetType", formsInput = [ { "formName": "UserCustomForm", "typeIdentifier": "customForm", "content": "{\"simple\":\"sample-catalogId\"}" } ] )

您可以使用下列範例指令碼來建立詞彙表:

def create_glossary(domainId, projectId): return dzclient.create_glossary( domainIdentifier = domainId, name = "test7", description = "this is a test glossary", owningProjectIdentifier = projectId )

您可以使用下列範例指令碼來建立詞彙表術語:

def create_glossary_term(domainId, glossaryId): return dzclient.create_glossary_term( domainIdentifier = domainId, name = "soccer", shortDescription = "this is a test glossary", glossaryIdentifier = glossaryId, )

您可以使用下列範例指令碼,使用系統定義的資產類型建立資產:

def create_asset(domainId, projectId): return dzclient.create_asset( domainIdentifier = domainId, name = 'sample asset name', description = "this is a glue table asset", owningProjectIdentifier = projectId, typeIdentifier = "amazon.datazone.GlueTableAssetType", formsInput = [ { "formName": "GlueTableForm", "content": "{\"catalogId\":\"sample-catalogId\",\"columns\":[{\"columnDescription\":\"sample-columnDescription\",\"columnName\":\"sample-columnName\",\"dataType\":\"sample-dataType\",\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}}],\"compressionType\":\"sample-compressionType\",\"lakeFormationDetails\":{\"lakeFormationManagedTable\":false,\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}},\"primaryKeys\":[\"sample-Key1\",\"sample-Key2\"],\"region\":\"us-east-1\",\"sortKeys\":[\"sample-sortKey1\"],\"sourceClassification\":\"sample-sourceClassification\",\"sourceLocation\":\"sample-sourceLocation\",\"tableArn\":\"sample-tableArn\",\"tableDescription\":\"sample-tableDescription\",\"tableName\":\"sample-tableName\"}" } ] )

您可以使用下列範例指令碼來建立資產修訂並連接詞彙表術語:

def create_asset_revision(domainId, assetId): return dzclient.create_asset_revision( domainIdentifier = domainId, identifier = assetId, name = 'glue table asset 7', description = "glue table asset description update", formsInput = [ { "formName": "GlueTableForm", "content": "{\"catalogId\":\"sample-catalogId\",\"columns\":[{\"columnDescription\":\"sample-columnDescription\",\"columnName\":\"sample-columnName\",\"dataType\":\"sample-dataType\",\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}}],\"compressionType\":\"sample-compressionType\",\"lakeFormationDetails\":{\"lakeFormationManagedTable\":false,\"lakeFormationTags\":{\"sample-key1\":\"sample-value1\",\"sample-key2\":\"sample-value2\"}},\"primaryKeys\":[\"sample-Key1\",\"sample-Key2\"],\"region\":\"us-east-1\",\"sortKeys\":[\"sample-sortKey1\"],\"sourceClassification\":\"sample-sourceClassification\",\"sourceLocation\":\"sample-sourceLocation\",\"tableArn\":\"sample-tableArn\",\"tableDescription\":\"sample-tableDescription\",\"tableName\":\"sample-tableName\"}" } ], glossaryTerms = ["<glossaryTermId:>"] )

您可以使用下列範例指令碼來發佈資產:

def publish_asset(domainId, assetId): return dzclient.create_listing_change_set( domainIdentifier = domainId, entityIdentifier = assetId, entityType = "ASSET", action = "PUBLISH", )

搜尋資料目錄並訂閱資料

您可以使用下列範例指令碼來搜尋資料目錄並訂閱資料:

def search_asset(domainId, projectId, text): return dzclient.search( domainIdentifier = domainId, owningProjectIdentifier = projectId, searchScope = "ASSET", searchText = text, )

您可以使用下列範例指令碼來取得資產的清單 ID:

def search_listings(domainId, assetName, assetId): listings = dzclient.search_listings( domainIdentifier=domainId, searchText=assetName, additionalAttributes=["FORMS"] ) assetListing = None for listing in listings['items']: if listing['assetListing']['entityId'] == assetId: assetListing = listing return listing['assetListing']['listingId']

您可以使用下列範例指令碼,使用清單 ID 建立訂閱請求:

create_subscription_response = def create_subscription_request(domainId, projectId, listingId): return dzclient.create_subscription_request( subscribedPrincipals=[{ "project": { "identifier": projectId } }], subscribedListings=[{ "identifier": listingId }], requestReason="Give request reason here." )

使用create_subscription_response上述 取得 subscription_request_id,然後使用下列範例指令碼接受/核准訂閱:

subscription_request_id = create_subscription_response["id"] def accept_subscription_request(domainId, subscriptionRequestId): return dzclient.accept_subscription_request( domainIdentifier=domainId, identifier=subscriptionRequestId )

在資料目錄中搜尋資產

您可以使用下列範例指令碼,利用自由文字搜尋來查詢 Amazon DataZone 目錄中已發佈的資料資產 (清單)。

  • 下列範例會在網域中執行任意文字關鍵字搜尋,並傳回符合所提供關鍵字「額度」的所有清單:

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --search-text "credit"
  • 您也可以結合多個關鍵字,進一步縮小搜尋範圍。例如,如果您要尋找具有與墨西哥銷售相關的資料的所有已發佈資料資產 (清單),您可以使用兩個關鍵字 '墨西哥' 和 '銷售' 來制定查詢。

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --search-text "mexico sales"

您也可以使用篩選條件搜尋清單。中的 filters 參數 SearchListings API可讓您從網域擷取篩選結果。API 支援多個預設篩選條件,您也可以合併兩個或多個篩選條件,並對其執行 AND/OR 操作。篩選條件子句包含兩個參數:屬性和值。預設支援的篩選條件屬性為 typeNameowningProjectIdglossaryTerms

  • 下列範例會使用清單是 Redshift Table 類型的assetType篩選條件,對指定網域中的所有清單進行搜尋。

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --filters '{"or":[{"filter":{"attribute":"typeName","value":"RedshiftTableAssetType"}} ]}'
  • 您也可以使用 AND/OR 操作將多個篩選條件合併在一起。在下列範例中,您會合併 typeNameproject篩選條件。

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --filters '{"or":[{"filter":{"attribute":"typeName","value":"RedshiftTableAssetType"}}, {"filter":{"attribute":"owningProjectId","value":"cwrrjch7f5kppj"}} ]}'
  • 您甚至可以結合任意文字搜尋與篩選條件,以尋找確切結果,並依清單的建立/上次更新時間進一步排序,如下列範例所示:

    aws datazone search-listings \ --domain-identifier dzd_c1s7uxe71prrtz \ --search-text "finance sales" \ --filters '{"or":[{"filter":{"attribute":"typeName","value":"GlueTableViewType"}} ]}' \ --sort '{"attribute": "UPDATED_AT", "order":"ASCENDING"}'

其他有用的範例指令碼

當您在 Amazon 中使用資料時,您可以使用下列範例指令碼來完成各種任務 DataZone。

使用下列範例指令碼列出現有的 Amazon DataZone 網域:

def list_domains(): datazone = boto3.client('datazone') response = datazone.list_domains(status='AVAILABLE') [print("%12s | %16s | %12s | %52s" % (item['id'], item['name'], item['managedAccountId'], item['portalUrl'])) for item in response['items']] return

使用下列範例指令碼來列出現有的 Amazon DataZone 專案:

def list_projects(domain_id): datazone = boto3.client('datazone') response = datazone.list_projects(domainIdentifier=domain_id) [print("%12s | %16s " % (item['id'], item['name'])) for item in response['items']] return

使用下列範例指令碼列出現有的 Amazon DataZone 中繼資料表單:

def list_metadata_forms(domain_id): datazone = boto3.client('datazone') response = datazone.search_types(domainIdentifier=domain_id, managed=False, searchScope='FORM_TYPE') [print("%16s | %16s | %3s | %8s" % (item['formTypeItem']['name'], item['formTypeItem']['owningProjectId'],item['formTypeItem']['revision'], item['formTypeItem']['status'])) for item in response['items']] return