Amazon Redshift との Amazon RDS ゼロ ETL 統合の開始方法
Amazon Redshift とのゼロ ETL 統合を作成する前に、必要なパラメータとアクセス許可で RDS データベースと Amazon Redshift データウェアハウスを設定します。セットアップ時には、以下の手順を完了します。
これらのタスクが完了したら、Amazon Redshift との Amazon RDS ゼロ ETL 統合の作成 に進みます。
ヒント
RDS では、統合の作成中にこれらのセットアップ手順を手動で行わずに、自動的に完了させることができます。統合の作成をすぐに開始するには、「Amazon Redshift との Amazon RDS ゼロ ETL 統合の作成」を参照してください。
ステップ 1: カスタム DB のパラメータグループを作成する
Amazon Redshift との Amazon RDS ゼロ ETL 統合には、バイナリロギング (binlog) を制御する Aurora DB パラメータに特定の値が必要です。バイナリログを設定するには、まずカスタム DB パラメータグループを作成して、これをソースデータベースに関連付ける必要があります。
以下の設定でカスタム DB パラメータグループを作成します。カスタムパラメータグループを作成するには、「Amazon RDS DB インスタンスの DB パラメータグループマルチ AZ DB クラスターの DB クラスターパラメータグループを使用する」を参照してください。
-
binlog_format=ROW
-
binlog_row_image=full
また、binlog_row_value_options
パラメータが PARTIAL_JSON
に設定されていないことも確認してください。
ステップ 2: ソースデータベースを選択または作成する
カスタム DB パラメータグループを作成したら、RDS for MySQL データベースを選択するか、作成します。この データベースのは、Amazon Redshift へのデータレプリケーションのソースになります。シングル AZ インスタンスまたはマルチ AZ DB インスタンスの作成手順については、「Amazon RDS DB インスタンスの作成」を参照してください。
データベースは、サポートされている DB エンジンバージョンを実行している必要があります。サポートされているバージョンのリストについては、「Amazon RDS と Amazon Redshift のゼロ ETL 統合でサポートされているリージョンと DB エンジン」を参照してください。
シングル AZ インスタンスまたはマルチ AZ DB インスタンスの作成手順については、「Amazon RDS DB インスタンスの作成」を参照してください。マルチ AZ DB クラスターの作成手順については、「Amazon RDS 用のマルチ AZ DB クラスターの作成」を参照してください。
データベースを作成するには、[追加設定] で、デフォルトの DB パラメータグループを、前のステップで作成したカスタムパラメータグループに変更します。
注記
もしデータベースの作成後にパラメータグループをデータベースに関連付ける場合は、ゼロ ETL 統合を作成する前にデータベースを再起動して変更を適用する必要があります。手順については、「 DB インスタンスの再起動」または「Amazon RDS のマルチ AZ DB クラスターとリーダー DB インスタンスの再起動」を参照してください。
さらに、データベースで自動バックアップが有効になっていることを確認してください。詳細については、「自動バックアップの有効化」を参照してください。
ステップ 3: ターゲット Amazon Redshift データウェアハウスを作成する
ソースデータベースを作成した後、Amazon Redshift でターゲットのデータウェアハウスを作成して設定する必要があります。データウェアハウスは、以下の要件を満たしている必要があります。
-
少なくとも 2 つのノードがある RA3 ノードタイプ、または Redshift Serverless を使用している。
-
暗号化されている (プロビジョニングされたクラスターを使用している場合) 詳細については、「Amazon Redshift データベースの暗号化」を参照してください。
データウェアハウスを作成する手順については、プロビジョニングされたクラスター用の「クラスターの作成」またはRedshift Serverless 用の「名前空間を使用したワークグループの作成」を参照してください。
データウェアハウスで大文字と小文字の区別を有効にします。
統合を正常に行うには、データウェアハウスで大文字と小文字を区別するパラメータ (enable_case_sensitive_identifier
) を有効にする必要があります。デフォルトでは、プロビジョニング済みクラスターと Redshift Serverless ワークグループの大文字と小文字の区別は無効になっています。
大文字と小文字の区別を有効にするには、データウェアハウスのタイプに応じて以下の手順を実行します。
-
プロビジョニングされたクラスター — プロビジョニングされたクラスターで大文字と小文字の区別を有効にするには、
enable_case_sensitive_identifier
パラメータを有効にしたカスタムパラメータグループを作成します。次に、そのパラメータグループとクラスターを関連付けます。手順については、「コンソールを使用したパラメータグループの管理」または「AWS CLI を使用したパラメータ値の設定」を参照してください。注記
クラスターにパラメータグループを関連付けたら、クラスターを再起動することを忘れないでください。
-
Serverless ワークグループ — Redshift Serverless ワークグループで大文字と小文字の区別を有効にするには、AWS CLI を使用する必要があります。Amazon Redshift コンソールは現在、Redshift Serverless パラメータ値の変更をサポートしていません。次の update-workgroup リクエストを送信します。
aws redshift-serverless update-workgroup \ --workgroup-name
target-workgroup
\ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=trueパラメータ値を変更した後、ワークグループを再起動する必要はありません。
データウェアハウスの認証を設定します。
データウェアハウスを作成したら、ソース RDS データベースを承認済みの統合ソースとして設定する必要があります。手順については、「Amazon Redshift データウェアハウスの認証を設定する」を参照してください。
AWS SDK を使用して統合をセットアップする
各リソースを手動でセットアップするのではなく、次の Python スクリプトを実行して、必要なリソースを自動的にセットアップできます。このコード例では AWS SDK for Python (Boto3)
必要な従属関係をインストールには、次のコマンドを実行します。
pip install boto3 pip install time
スクリプト内で、オプションでソース、ターゲット、パラメータグループの名前を変更します。最後の関数は、リソースのセットアップ後に my-integration
という名前の統合を作成します。
import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_db_name = 'my-source-db' # A name for the source database source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_db(*args): """Creates a source RDS for MySQL DB instance""" response = rds.create_db_parameter_group( DBParameterGroupName=source_param_group_name, DBParameterGroupFamily='mysql8.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBParameterGroup']['DBParameterGroupName']) response = rds.modify_db_parameter_group( DBParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBParameterGroupName']) response = rds.create_db_instance( DBInstanceIdentifier=source_db_name, DBParameterGroupName=source_param_group_name, Engine='mysql', EngineVersion='8.0.32', DBName='mydb', DBInstanceClass='db.m5.large', AllocatedStorage=15, MasterUsername=
'username'
, MasterUserPassword='Password01**
' ) print('Creating source database: ' + response['DBInstance']['DBInstanceIdentifier']) source_arn = (response['DBInstance']['DBInstanceArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='RDS for MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy granting access to source database and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_db_availability(*args): """Waits for both databases to be available""" print('Waiting for source and target to be available...') response = rds.describe_db_instances( DBInstanceIdentifier=source_db_name ) source_status = response['DBInstances'][0]['DBInstanceStatus'] source_arn = response['DBInstances'][0]['DBInstanceArn'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the databases are available if source_status != 'available' or target_status != 'available': time.sleep(60) response = wait_for_db_availability( source_db_name, target_cluster_name) else: print('Databases available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target databases""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration
' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_db(source_db_name, source_param_group_name) wait_for_db_availability(source_db_name, target_cluster_name) if __name__ == "__main__": main()
次のステップ
ソース RDS データベースと Amazon Redshift ターゲットデータウェアハウスを作成したので、ゼロ ETL 統合を作成してデータのレプリケーションを開始できます。手順については、「Amazon Redshift との Amazon RDS ゼロ ETL 統合の作成」を参照してください。