Amazon Redshift との Aurora ゼロ ETL 統合の開始方法 - Amazon Aurora

Amazon Redshift との Aurora ゼロ ETL 統合の開始方法

Amazon Redshift とのゼロ ETL 統合を作成する前に、必要なパラメータとアクセス許可で Aurora DB クラスターと Amazon Redshift データウェアハウスを設定します。セットアップ時には、以下の手順を完了します。

これらのタスクが完了したら、Amazon Redshift との Amazon ゼロ ETL 統合の作成 に進みます。

AWS SDK を使用して、セットアッププロセスを自動化できます。詳細については、「AWS を使用して統合をセットアップする」を参照してください。

ヒント

RDS では、統合の作成中にこれらのセットアップ手順を手動で行わずに、自動的に完了させることができます。統合の作成をすぐに開始するには、「Amazon Redshift との Amazon ゼロ ETL 統合の作成」を参照してください。

ステップ 1: カスタム DB クラスターのパラメータグループを作成する

Amazon Redshift との Aurora ゼロ ETL 統合には、レプリケーションを制御する DB クラスターパラメータに特定の値が必要です。具体的には、Aurora MySQL には拡張バイナリログ (aurora_enhanced_binlog) が必要であり、Aurora PostgreSQL には拡張論理レプリケーション (aurora.enhanced_logical_replication) が必要です。

バイナリロギングまたは論理レプリケーションを設定するには、まずカスタム DB クラスターパラメータグループを作成し、それをソース DB クラスターに関連付ける必要があります。

Aurora MySQL (aurora-mysql8.0 ファミリー):

  • aurora_enhanced_binlog=1

  • binlog_backup=0

  • binlog_format=ROW

  • binlog_replication_globaldb=0

  • binlog_row_image=full

  • binlog_row_metadata=full

さらに、binlog_transaction_compressionパラメータが ON設定されていないこと、および binlog_row_value_options パラメータが PARTIAL_JSON設定されていないことを確認してください。

Aurora MySQL 拡張バイナリログの詳細については、「Aurora MySQL の拡張バイナリログの設定」を参照してください。

Aurora PostgreSQL (aurora-postgresql16 ファミリー):

  • rds.logical_replication=1

  • aurora.enhanced_logical_replication=1

  • aurora.logical_replication_backup=0

  • aurora.logical_replication_globaldb=0

拡張論理レプリケーション (aurora.enhanced_logical_replication) を有効にすると、REPLICA IDENTITY FULL が有効になっていない場合でも、すべての列値が常に書き込み先行ログ (WAL) に書き込まれます。これにより、ソース DB クラスターの IOPS が向上する場合があります。

重要

統合の作成後に aurora.enhanced_logical_replication DB クラスターパラメータを無効にすると、プライマリ DB インスタンスはすべての論理レプリケーションスロットを無効にします。これにより、ソースからターゲットへのレプリケーションが停止されるため、プライマリ DB インスタンスでレプリケーションスロットを再作成する必要があります。中断を防ぐため、レプリケーション中は パラメータを一貫して有効にしておきます。

ステップ 2: ソース DB クラスターを選択または作成する

カスタム DB クラスターパラメータグループを作成したら、Aurora MySQL または Aurora PostgreSQL DB クラスターを選択するか、作成します。この のクラスターは、Amazon Redshift へのデータレプリケーションのソースになります。DB クラスターの作成手順については、「Amazon Aurora DB クラスターの作成」を参照してください。。

データベースは、サポートされている DB エンジンバージョンを実行している必要があります。サポートされているバージョンのリストについては、「Amazon Redshift とのゼロ ETL 統合でサポートされているリージョンと Aurora DB エンジン」を参照してください。

データベースを作成するには、[追加設定] で、デフォルトの DB クラスターパラメータグループを、前のステップで作成したカスタムパラメータグループに変更します。

注記

クラスターの作成にパラメータグループを DB クラスターに関連付ける場合は、ゼロ ETL 統合を作成する前にクラスター内のプライマリ DB インスタンスを再起動して変更を適用する必要があります。手順については、「Amazon Aurora DB クラスターまたは Amazon Aurora DB インスタンスの再起動」を参照してください。

ステップ 3: ターゲット Amazon Redshift データウェアハウスを作成する

ソース DB クラスターを作成した後、Amazon Redshift でターゲットのデータウェアハウスを作成して設定する必要があります。データウェアハウスは、以下の要件を満たしている必要があります。

  • 少なくとも 2 つのノードがある RA3 ノードタイプまたは Redshift Serverless を使用している。

  • 暗号化されている (プロビジョニングされたクラスターを使用している場合) 詳細については、「Amazon Redshift データベースの暗号化」を参照してください。

データウェアハウスを作成する手順については、プロビジョニングされたクラスター用の「クラスターの作成」またはRedshift Serverless 用の「名前空間を使用したワークグループの作成」を参照してください。

データウェアハウスで大文字と小文字の区別を有効にします。

統合を正常に行うには、データウェアハウスで大文字と小文字を区別するパラメータ (enable_case_sensitive_identifier) を有効にする必要があります。デフォルトでは、プロビジョニング済みクラスターと Redshift Serverless ワークグループの大文字と小文字の区別は無効になっています。

大文字と小文字の区別を有効にするには、データウェアハウスのタイプに応じて以下の手順を実行します。

  • プロビジョニングされたクラスター — プロビジョニングされたクラスターで大文字と小文字の区別を有効にするには、enable_case_sensitive_identifier パラメータを有効にしたカスタムパラメータグループを作成します。次に、そのパラメータグループとクラスターを関連付けます。手順については、「コンソールを使用したパラメータグループの管理」または「AWS CLI を使用したパラメータ値の設定」を参照してください。

    注記

    クラスターにパラメータグループを関連付けたら、クラスターを再起動することを忘れないでください。

  • Serverless ワークグループ — Redshift Serverless ワークグループで大文字と小文字の区別を有効にするには、AWS CLI を使用する必要があります。Amazon Redshift コンソールは現在、Redshift Serverless パラメータ値の変更をサポートしていません。次の update-workgroup リクエストを送信します。

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    パラメータ値を変更した後、ワークグループを再起動する必要はありません。

データウェアハウスの認可を設定します。

データウェアハウスを作成したら、ソース Aurora DB クラスターを承認済みの統合ソースとして設定する必要があります。手順については、「Amazon Redshift データウェアハウスの認可を設定する」を参照してください。

AWS を使用して統合をセットアップする

各リソースを手動でセットアップするのではなく、次の Python スクリプトを実行して、必要なリソースを自動的にセットアップできます。このコード例では AWS SDK for Python (Boto3) を使用してソース Amazon Aurora DB クラスターとターゲット Amazon Redshift データウェアハウスを作成し、それぞれに必要なパラメータ値を指定します。次に、データベースが使用可能になるまで待ってから、データベース間にゼロ ETL 統合を作成します。設定する必要があるリソースに応じて、さまざまな関数をコメントアウトできます。

必要な従属関係をインストールには、次のコマンドを実行します。

pip install boto3 pip install time

スクリプト内で、オプションでソース、ターゲット、パラメータグループの名前を変更します。最後の関数は、リソースのセットアップ後に my-integration という名前の統合を作成します。

Aurora MySQL
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora MySQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-mysql8.0', Description='For Aurora MySQL binary logging' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'aurora_enhanced_binlog', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_metadata', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-mysql', EngineVersion='8.0.mysql_aurora.3.05.2', DatabaseName='myauroradb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-mysql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()
Aurora PostgreSQL
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora PostgreSQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-postgresql16', Description='For Aurora PostgreSQL logical replication' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'rds.logical_replication', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.enhanced_logical_replication', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.logical_replication_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'aurora.logical_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-postgresql', EngineVersion='16.4.aurora-postgresql', DatabaseName='mypostgresdb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-postgresql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora PostgreSQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()

次のステップ

ソース Aurora DB クラスターと Amazon Redshift ターゲットデータウェアハウスを作成したので、ゼロ ETL 統合を作成してデータのレプリケーションを開始できます。手順については、Amazon Redshift との Amazon ゼロ ETL 統合の作成 を参照してください。