开始使用 Aurora 与 Amazon Redshift 的零 ETL 集成
在创建与 Amazon Redshift 的零 ETL 集成之前,请使用所需的参数和权限配置您的 Aurora 数据库集群和 Amazon Redshift 数据仓库。在安装过程中,您将完成以下步骤:
完成这些步骤后,请继续执行创建 Aurora 与 Amazon Redshift 的零 ETL 集成。
您可以使用 AWS SDK 为您自动完成设置过程。有关更多信息,请参阅 使用 AWS SDK 设置集成(仅限 Aurora MySQL)。
提示
您可以在创建集成时让 RDS 为您完成这些设置步骤,而不必手动执行这些步骤。要立即开始创建集成,请参阅创建 Aurora 与 Amazon Redshift 的零 ETL 集成。
步骤 1:创建自定义数据库集群参数组
Aurora 与 Amazon Redshift 的零 ETL 集成需要为控制复制的数据库集群参数提供特定值。具体而言,Aurora MySQL 需要增强型二进制日志(aurora_enhanced_binlog
),而 Aurora PostgreSQL 需要增强型逻辑复制(aurora.enhanced_logical_replication
)。
要配置二进制日志记录或逻辑复制,必须先创建自定义数据库集群参数组,然后将其与源数据库集群关联。
根据您的源数据库引擎,使用以下设置创建自定义数据库集群参数组。有关创建参数组的说明,请参阅Amazon Aurora 数据库集群的数据库集群参数组。
Aurora MySQL(aurora-mysql8.0 系列):
-
aurora_enhanced_binlog=1
-
binlog_backup=0
-
binlog_format=ROW
-
binlog_replication_globaldb=0
-
binlog_row_image=full
-
binlog_row_metadata=full
此外,请确保 binlog_transaction_compression
参数未设置为 ON
,也未将 binlog_row_value_options
参数设置为 PARTIAL_JSON
。
有关 Aurora MySQL 增强型二进制日志的更多信息,请参阅为 Aurora MySQL 设置增强型二进制日志。
Aurora PostgreSQL(aurora-postgresql15 系列):
注意
对于 Aurora PostgreSQL 数据库集群,您必须在美国东部(俄亥俄州)(us-east-2)AWS 区域的 Amazon RDS 数据库预览环境
-
rds.logical_replication=1
-
aurora.enhanced_logical_replication=1
-
aurora.logical_replication_backup=0
-
aurora.logical_replication_globaldb=0
启用增强型逻辑复制(aurora.enhanced_logical_replication
)会自动将 REPLICA IDENTITY
参数设置为 FULL
,这意味着所有列值都写入提前写入日志(WAL)。这将增加源数据库集群的 IOPS。
步骤 2:选择或创建源数据库集群
创建自定义数据库集群参数组后,选择或创建一个 Aurora MySQL 或 Aurora PostgreSQL 数据库集群。该集群将成为向 Amazon Redshift 复制数据的源。有关创建数据库集群的说明,请参阅。
数据库必须运行受支持的数据库引擎版本。有关受支持的版本的列表,请参阅 支持与 Amazon Redshift 进行零 ETL 集成的区域和 Amazon 数据库引擎。
有关创建数据库集群的说明,请参阅。
注意
您必须在美国东部(俄亥俄州)(us-east-2)AWS 区域的 Amazon RDS 数据库预览环境
创建数据库时,在其他配置下,将默认的数据库集群参数组更改为您在上一步中创建的自定义参数组。
注意
对于 Aurora MySQL,您在已创建集群之后 将参数组与数据库集群关联,则必须重启集群中的主数据库实例以应用更改,然后才能创建零 ETL 集成。有关说明,请参阅重启 Amazon Aurora 数据库集群或 Amazon Aurora 数据库实例。
在 Aurora PostgreSQL 与 Amazon Redshift 的零 ETL 集成预览版期间,您必须在创建集群时将集群与自定义数据库集群参数组相关联。在源数据库集群已经创建后,您不能执行此操作,否则您需要删除并重新创建集群。
步骤 3:创建目标 Amazon Redshift 数据仓库
创建源数据库集群后,必须在 Amazon Redshift 中创建和配置目标数据仓库。数据仓库必须满足以下要求:
-
在预览版中创建(仅适用于 Aurora PostgreSQL 源)。对于 Aurora MySQL 源,您必须创建生产集群和工作组。
-
使用 RA3 节点类型或 Redshift Serverless。
-
已加密(如果使用预置集群)。有关更多信息,请参阅 Amazon Redshift 数据库加密。
有关创建数据仓库的说明,请参阅预调配集群的创建集群或 Redshift Serverless 的创建带命名空间的工作组。
在数据仓库上启用区分大小写
要使集成成功,必须为数据仓库启用区分大小写参数(enable_case_sensitive_identifier
)。默认情况下,所有预调配集群和 Redshift Serverless 工作组均禁用区分大小写。
要启用区分大小写,请根据您的数据仓库类型执行以下步骤:
-
预调配集群 – 要在预调配集群上启用区分大小写,请创建一个启用
enable_case_sensitive_identifier
参数的自定义参数组。然后,将该参数组与集群关联。有关说明,请参阅使用控制台管理参数组或使用 AWS CLI 配置参数值。注意
将自定义参数组与集群关联后,请记得重启集群。
-
无服务器工作组 - 要在 Redshift Serverless 工作组上启用区分大小写,必须使用 AWS CLI。Amazon Redshift 控制台目前不支持修改 Redshift Serverless 参数值。发送以下 update-workgroup 请求:
aws redshift-serverless update-workgroup \ --workgroup-name
target-workgroup
\ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true修改参数值后,无需重启工作组。
为数据仓库配置授权
创建数据仓库后,必须将源 Aurora 数据库集群配置为授权的集成源。有关说明,请参阅为您的 Amazon Redshift 数据仓库配置授权。
使用 AWS SDK 设置集成(仅限 Aurora MySQL)
您可以运行以下 Python 脚本来自动设置所需的资源,而不必手动设置每个资源。此代码示例使用AWS SDK for Python (Boto3)
要安装所需依赖项,请运行以下命令:
pip install boto3 pip install time
在脚本中,可以选择修改源组、目标组和参数组的名称。最后一个函数在设置资源后创建一个名为 my-integration
的集成。
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora MySQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-mysql8.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'aurora_enhanced_binlog', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_metadata', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-mysql', EngineVersion='8.0.mysql_aurora.3.05.2', DatabaseName='myauroradb', MasterUsername='
username
', MasterUserPassword='Password01**
' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-mysql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username
', MasterUserPassword='Password01**
', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration
' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()
后续步骤
借助源 Aurora 数据库集群和 Amazon Redshift 目标数据仓库,您可以创建零 ETL 集成并复制数据。有关说明,请参阅 创建 Aurora 与 Amazon Redshift 的零 ETL 集成。