開始使用與 Amazon Redshift 的 Amazon RDS 零 ETL 整合 - Amazon Relational Database Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

開始使用與 Amazon Redshift 的 Amazon RDS 零 ETL 整合

建立與 Amazon Redshift 的零 ETL 整合之前,請使用必要的參數和許可來設定 RDS 資料庫和 Amazon Redshift 資料倉儲。安裝期間,您將完成以下步驟:

完成這些步驟後,請繼續建立與 Amazon Redshift 的 Amazon RDS 零 ETL 整合

提示

您可以在建立整合時讓 RDS 為您完成這些設定步驟,而不是手動執行這些步驟。若要立即開始建立整合,請參閱 建立與 Amazon Redshift 的 Amazon RDS 零 ETL 整合

步驟 1:建立自訂資料庫參數群組。

Amazon RDS 與 Amazon Redshift 的零 ETL 整合需要控制二進位記錄 (binlog) 的資料庫參數的特定值。若要設定二進位記錄,您必須先建立自訂資料庫參數群組,然後將其與來源資料庫建立關聯。設定下列參數值。如需建立參數群組的指示,請參閱 RDSAmazon資料庫執行個體的資料庫參數群。建議您在相同的請求中設定所有參數值,以避免相依性問題。

  • binlog_format=ROW

  • binlog_row_image=full

此外,請確定binlog_row_value_options 參數設定為 PARTIAL_JSON。如果來源資料庫是多可用區域資料庫叢集,請確定 binlog_transaction_compression 參數設定為 ON

步驟 2:選取或建立來源資料庫

建立自訂資料庫參數群組之後,請選擇或建立 RDS for MySQL 資料庫。此資料庫將是 Amazon Redshift 複寫資料的來源。如需建立單一可用區域或多可用區域資料庫執行個體的指示,請參閱 建立 Amazon RDS 資料庫執行個體如需建立多可用區域資料庫叢集的說明,請參閱 為 Amazon 建立多可用區域資料庫叢集 RDS

資料庫必須執行支援的資料庫引擎版本。如需支援的版本的清單,請參閱Amazon 與 Amazon Redshift 的RDS零ETL整合支援的 區域和資料庫引擎

當您建立資料庫時,在其他組態下,將預設資料庫參數群組變更為您在上一個步驟中建立的自訂參數群組。

注意

如果您在建立資料庫之後將參數群組與資料庫建立關聯,您必須先重新啟動叢集中的資料庫主要資料庫執行個體,以套用變更,才能建立零 ETL 整合。如需相關指示,請參閱重新啟動中的資料庫執行個體重新啟動 Amazon 的多可用區域資料庫叢集和讀取器資料庫執行個體 RDS

此外,請確定資料庫已啟用自動備份。如需詳細資訊,請參閱啟用自動備份

步驟 3:建立目標 Amazon Redshift 資料倉儲

建立來源資料庫之後,您必須在 Amazon Redshift 中建立和設定目標資料倉儲。資料倉儲必須符合下列需求:

  • 使用具有至少兩個節點的 RA3 節點類型,或 Redshift Serverless。

  • 已加密 (如果使用已佈建的叢集)。如需詳細資訊,請參閱 Amazon Redshift 資料庫加密

如需建立資料倉儲的指示,請參閱建立叢集 (適用於佈建的叢集),或使用命名空間建立工作群組 (適用於 Redshift Serverless)。

在資料倉儲上啟用區分大小寫

若要成功整合,必須為資料倉儲啟用區分大小寫參數 (enable_case_sensitive_identifier)。依預設,所有佈建的叢集和 Redshift Serverless 工作群組上都會停用區分大小寫。

若要啟用區分大小寫,請根據您的資料倉儲類型執行下列步驟:

  • 佈建的叢集 – 若要在佈建的叢集上啟用區分大小寫,請建立已啟用 enable_case_sensitive_identifier 參數的自訂參數群組。接著,將該參數群組與叢集建立關聯。如需指示,請參閱使用主控台管理參數群組使用 AWS CLI設定參數值

    注意

    在將自訂參數群組與叢集建立關聯之後,請記得重新啟動該叢集。

  • 無伺服器工作群組 - 若要在 Redshift Serverless 工作群組上啟用區分大小寫,您必須使用 AWS CLI。Amazon Redshift 主控台目前不支援修改 Redshift Serverless 參數值。傳送下列更新工作群組請求:

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    在修改工作群組的參數值之後,您不需要重新啟動該工作群組。

設定資料倉儲的授權

建立資料倉儲之後,您必須將來源 RDS 資料庫設定為授權的整合來源。如需指示,請參閱設定 Amazon Redshift 資料倉儲的授權

使用 AWS SDKs設定整合

您可以執行下列 Python 指令碼來自動為您設定所需的資源,而不是手動設定每個資源。程式碼範例使用 來AWS SDK for Python (Boto3)建立來源 RDS for MySQL 資料庫執行個體,並以 Amazon Redshift 資料倉儲為目標,每個都具有必要的參數值。然後,在資料庫之間建立零 ETL 整合之前,它會等待資料庫可用。您可以根據您需要設定的資源來註解不同的函數。

若要安裝所需的相依性,請執行下列命令:

pip install boto3 pip install time

在指令碼中,選擇性地修改來源、目標和參數群組的名稱。最終函數會在設定 資源my-integration後建立名為 的整合。

import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_db_name = 'my-source-db' # A name for the source database source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_db(*args): """Creates a source RDS for MySQL DB instance""" response = rds.create_db_parameter_group( DBParameterGroupName=source_param_group_name, DBParameterGroupFamily='mysql8.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBParameterGroup']['DBParameterGroupName']) response = rds.modify_db_parameter_group( DBParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBParameterGroupName']) response = rds.create_db_instance( DBInstanceIdentifier=source_db_name, DBParameterGroupName=source_param_group_name, Engine='mysql', EngineVersion='8.0.32', DBName='mydb', DBInstanceClass='db.m5.large', AllocatedStorage=15, MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source database: ' + response['DBInstance']['DBInstanceIdentifier']) source_arn = (response['DBInstance']['DBInstanceArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy granting access to source database and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_db_availability(*args): """Waits for both databases to be available""" print('Waiting for source and target to be available...') response = rds.describe_db_instances( DBInstanceIdentifier=source_db_name ) source_status = response['DBInstances'][0]['DBInstanceStatus'] source_arn = response['DBInstances'][0]['DBInstanceArn'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the databases are available if source_status != 'available' or target_status != 'available': time.sleep(60) response = wait_for_db_availability( source_db_name, target_cluster_name) else: print('Databases available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target databases""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_db(source_db_name, source_param_group_name) wait_for_db_availability(source_db_name, target_cluster_name) if __name__ == "__main__": main()

後續步驟

透過來源 RDS 資料庫和 Amazon Redshift 目標資料倉儲,您現在可以建立零 ETL 整合並複寫資料。如需說明,請參閱「建立與 Amazon Redshift 的 Amazon RDS 零 ETL 整合」。