Cookie の設定を選択する

当社は、当社のサイトおよびサービスを提供するために必要な必須 Cookie および類似のツールを使用しています。当社は、パフォーマンス Cookie を使用して匿名の統計情報を収集することで、お客様が当社のサイトをどのように利用しているかを把握し、改善に役立てています。必須 Cookie は無効化できませんが、[カスタマイズ] または [拒否] をクリックしてパフォーマンス Cookie を拒否することはできます。

お客様が同意した場合、AWS および承認された第三者は、Cookie を使用して便利なサイト機能を提供したり、お客様の選択を記憶したり、関連する広告を含む関連コンテンツを表示したりします。すべての必須ではない Cookie を受け入れるか拒否するには、[受け入れる] または [拒否] をクリックしてください。より詳細な選択を行うには、[カスタマイズ] をクリックしてください。

Oracle 外部テーブルを Amazon Aurora PostgreSQL 互換に移行 - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Oracle 外部テーブルを Amazon Aurora PostgreSQL 互換に移行

作成者: anuradha chintha (AWS) と Rakesh Raghav (AWS)

概要

外部テーブルにより、Oracle がデータベースの外部にあるフラットファイルに保存されているデータをクエリできます。ORACLE_LOADER ドライバーを使用して、SQL*Loader ユーティリティでロードできるあらゆる形式で保存されているデータにアクセスできます。外部テーブルではデータ操作言語 (DML) を使用できませんが、クエリ、結合、ソート操作には外部テーブルを使用できます。

Amazon Aurora PostgreSQL 互換エディションには、Oracle の外部テーブルと類似する機能はありません。代わりに、モダナイゼーションを使用して、機能要件を満たし、かつ質素なスケーラブルなソリューションを開発する必要があります。

このパターンでは、aws_s3 拡張を使用して、さまざまなタイプの Oracle 外部テーブルをAmazon Web Services (AWS) クラウド上の Aurora PostgreSQL 互換エディションに移行する手順を示しています。

実稼働環境に実装する前に、このソリューションを徹底的にテストすることを推奨します。

前提条件と制限

前提条件

  • アクティブなAWS アカウント

  • AWS コマンドラインインターフェイス (AWS CLI)

  • 使用可能な Aurora PostgreSQL 互換データベースインスタンス。

  • 外部テーブルのあるオンプレミスの Oracle データベース

  • PG. クライアント API

  • データファイル 

機能制限 

  • このパターンには Oracle 外部テーブルの置き換えの機能がありません。ただし、手順とサンプルコードをさらに拡張して、データベースのモダナイゼーション目標を達成することができます。

  • ファイルには、aws_s3 エクスポート関数とインポート関数で区切り文字として渡される文字を含んではなりません。

製品バージョン

  • Amazon S3 から PostgreSQL の RDS にインポートするには、データベースで PostgreSQL バージョン 10.7 以降を実行する必要があります。

アーキテクチャ

ソーステクノロジースタック

  • Oracle

ソースアーキテクチャ

オンプレミスの Oracle データベースのディレクトリとテーブルに送られるデータファイルの図表。

ターゲットテクノロジースタック

  • Amazon Aurora PostgreSQL- 互換

  • Amazon CloudWatch

  • AWS Lambda

  • AWS Secrets Manager

  • Amazon Simple Notification Service (Amazon SNS)

  • Amazon Simple Storage Service (Amazon S3)

ターゲットアーキテクチャ 

以下の図表は、このソリューションの概要を示しています。

図表の後に説明があります。
  1. ファイルが S3 バケットにアップロードされます。

  2. Lambda 関数が初期化されます。

  3. Lambda 関数は DB 関数呼び出しを開始します。

  4. Secrets Manager は、データベースにアクセスするための認証情報を提供します。

  5. DB 関数によって、SNS アラームが作成されます。

自動化とスケール

外部テーブルへの追加や変更は、メタデータのメンテナンスで処理できます。

ツール

  • Amazon Aurora PostgreSQL-互換」 — Amazon Aurora PostgreSQL 互換エディションは、フルマネージド型で PostgreSQL 互換で、ACID 準拠のリレーショナルデータベースエンジンです。ハイエンドの商用データベースのスピードと信頼性を、オープンソースデータベースの高いコスト効率を経験できます。

  • AWS CLI」 — AWS コマンドラインインターフェイス (AWS CLI)は、AWS のサービスを管理するための統合ツールです。ダウンロードおよび設定用の1つのツールのみを使用して、コマンドラインから複数の AWS サービスを制御し、スクリプトを使用してこれらを自動化することができます。

  • Amazon CloudWatch」 — Amazon CloudWatch は Amazon S3 のリソースと利用状況を監視します。

  • AWS Lambda」 — AWS Lambdaは、サーバーのプロビジョニングや管理、ワークロードに対応したクラスタースケーリングロジックの作成、イベント統合の維持、あるいはランタイムの管理などを行うことなくコードを実行でき、サーバーレスコンピューティングサービスです。このパターンでは、ファイルが Amazon S3 にアップロードされるたびに、Lambda がデータベース関数を実行します。

  • AWS Secrets Manager」 — AWS Secrets Manager は、認証情報を保存および取得するためのサービスです。Secrets Manager を使用して、コードにハードコードされた認証情報 (パスワードを含む) を Secrets Manager への API コールに置き換えて、シークレットをプログラムで取得できます。

  • Amazon S3」 — Amazon Simple Storage Service (Amazon S3) は、Aurora PostgreSQL 互換クラスターとの間で消費および送信するファイルを受信および保存するためのストレージレイヤーを提供します。

  • aws_s3」 — aws_s3 の拡張は Amazon S3 と Aurora PostgreSQL 互換を統合します。

  • Amazon SNS」 — Amazon Simple Notification Service (Amazon SNS)は、パブリッシャーやクライアントの間のメッセージ配信や送信を調整および管理します。このパターンでは、Amazon SNS を使用して通知を送信します。

Code

ファイルを S3 バケットに配置するたびに、DB 関数を作成して処理アプリケーションまたは Lambda 関数から呼び出す必要があります。詳細については、コード (添付) を参照してください。

エピック

タスク説明必要なスキル

外部ファイルをソースデータベースに追加します。

外部ファイルを作成し、oracle ディレクトリに移動します。

DBA

外部ファイルの作成

タスク説明必要なスキル

外部ファイルをソースデータベースに追加します。

外部ファイルを作成し、oracle ディレクトリに移動します。

DBA
タスク説明必要なスキル

Aurora PostgreSQL データベースを作成します。

Amazon Aurora PostgreSQL 互換クラスターに DB インスタンスを作成します。

DBA

スキーマ、aws_s3 エクステンション、テーブルを作成します。

追加情報」セクションの ext_tbl_scripts にあるコードを使用します。テーブルには、実際のテーブル、ステージングテーブル、エラーとログテーブル、およびメタテーブルが含まれます。

DBA、開発者

DB関数を作成します。

DB 関数を作成するには、追加情報セクションの load_external_table_latest 関数の下のコードを使用します。

DBA、開発者

ターゲットの設定 (Aurora PostgreSQL 互換の) 設定

タスク説明必要なスキル

Aurora PostgreSQL データベースを作成します。

Amazon Aurora PostgreSQL 互換クラスターに DB インスタンスを作成します。

DBA

スキーマ、aws_s3 エクステンション、テーブルを作成します。

追加情報」セクションの ext_tbl_scripts にあるコードを使用します。テーブルには、実際のテーブル、ステージングテーブル、エラーとログテーブル、およびメタテーブルが含まれます。

DBA、開発者

DB関数を作成します。

DB 関数を作成するには、追加情報セクションの load_external_table_latest 関数の下のコードを使用します。

DBA、開発者
タスク説明必要なスキル

ロールを作成します。

Amazon S3 と Amazon Relational Database Service (Amazon RDS) にアクセスする権限を持つロールを作成します。このロールは、パターンを実行するための Lambda に割り当てられます。

DBA

Lambda 関数を作成します。

Amazon S3 (例: file_key = info.get('object', {}).get('key'))からファイル名を読み取り、そのファイル名を入力パラメータとして DB 関数 (例、curs.callproc("load_external_tables", [file_key])) を呼び出す Lambda 関数を作成します。

関数呼び出しの結果に応じて、SNS 通知が開始されます (例、client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess') )。

ビジネスニーズに基づいて、必要に応じて追加のコードを使用して Lambda 関数を作成できます。詳細については、Lambda の「ドキュメント」 を参照してください。

DBA

S3 バケットイベントトリガーを設定します。

S3 バケットのすべてのオブジェクト作成イベントで Lambda 関数を呼び出すメカニズムを設定します。

DBA

シークレットを作成します。

Secrets Manager を使用して、データベース認証情報のシークレット名を作成します。Lambda 関数にシークレットを渡します。

DBA

Lambda サポートファイルをアップロードします。

Lambda サポートパッケージを Aurora PostgreSQL 互換に接続するための添付の Python スクリプトを含む.zip ファイルをアップロードします。Python コードは、データベースで作成した関数を呼び出します。

DBA

SNS トピックを作成します。

SNS トピックを作成して、データロードの成功または失敗のメールを送信します。

DBA

Lambda 関数の作成と設定

タスク説明必要なスキル

ロールを作成します。

Amazon S3 と Amazon Relational Database Service (Amazon RDS) にアクセスする権限を持つロールを作成します。このロールは、パターンを実行するための Lambda に割り当てられます。

DBA

Lambda 関数を作成します。

Amazon S3 (例: file_key = info.get('object', {}).get('key'))からファイル名を読み取り、そのファイル名を入力パラメータとして DB 関数 (例、curs.callproc("load_external_tables", [file_key])) を呼び出す Lambda 関数を作成します。

関数呼び出しの結果に応じて、SNS 通知が開始されます (例、client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess') )。

ビジネスニーズに基づいて、必要に応じて追加のコードを使用して Lambda 関数を作成できます。詳細については、Lambda の「ドキュメント」 を参照してください。

DBA

S3 バケットイベントトリガーを設定します。

S3 バケットのすべてのオブジェクト作成イベントで Lambda 関数を呼び出すメカニズムを設定します。

DBA

シークレットを作成します。

Secrets Manager を使用して、データベース認証情報のシークレット名を作成します。Lambda 関数にシークレットを渡します。

DBA

Lambda サポートファイルをアップロードします。

Lambda サポートパッケージを Aurora PostgreSQL 互換に接続するための添付の Python スクリプトを含む.zip ファイルをアップロードします。Python コードは、データベースで作成した関数を呼び出します。

DBA

SNS トピックを作成します。

SNS トピックを作成して、データロードの成功または失敗のメールを送信します。

DBA
タスク説明必要なスキル

S3 バケットを作成する。

Amazon S3 コンソールで、先頭にスラッシュを含まない一意の名前で S3 バケットを作成します。S3 バケット名はグローバルに一意であり、名前空間はすべての AWS アカウントによって共有されます。

DBA

IAM ポリシーを作成します。

AWS 識別とアクセス管理(IAM) ポリシーを作成するには、追加情報セクションの s3bucketpolicy_for_import の配下のコードを使用します。

DBA

ロールを作成します。

Aurora PostgreSQL 互換用に 2 つのロールを作成します。1 つはインポート用で、もう 1 つはエクスポート用です。対応するポリシーをロールに割り当てます。

DBA

ロールを Aurora PostgreSQL 互換クラスターにアタッチします。

ロールの管理で、Aurora PostgreSQL クラスターにインポートロールとエクスポートロールをアタッチします。

DBA

Aurora PostgreSQL 互換のサポートオブジェクトを作成します。

テーブルスクリプトについては、追加情報セクション ext_tbl_scripts の配下にあるコードを使用します。

カスタム関数については、追加情報セクションの load_external_Table_latest の配下にあるコードを使用します。

DBA

Amazon S3 との統合を追加する

タスク説明必要なスキル

S3 バケットを作成する。

Amazon S3 コンソールで、先頭にスラッシュを含まない一意の名前で S3 バケットを作成します。S3 バケット名はグローバルに一意であり、名前空間はすべての AWS アカウントによって共有されます。

DBA

IAM ポリシーを作成します。

AWS 識別とアクセス管理(IAM) ポリシーを作成するには、追加情報セクションの s3bucketpolicy_for_import の配下のコードを使用します。

DBA

ロールを作成します。

Aurora PostgreSQL 互換用に 2 つのロールを作成します。1 つはインポート用で、もう 1 つはエクスポート用です。対応するポリシーをロールに割り当てます。

DBA

ロールを Aurora PostgreSQL 互換クラスターにアタッチします。

ロールの管理で、Aurora PostgreSQL クラスターにインポートロールとエクスポートロールをアタッチします。

DBA

Aurora PostgreSQL 互換のサポートオブジェクトを作成します。

テーブルスクリプトについては、追加情報セクション ext_tbl_scripts の配下にあるコードを使用します。

カスタム関数については、追加情報セクションの load_external_Table_latest の配下にあるコードを使用します。

DBA
タスク説明必要なスキル

S3 バケットにファイルをアップロードします。

テストファイルを S3 バケットにアップロードするには、コンソールまたは AWS CLIにある以下のコマンドを使用します。 

aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps

ファイルがアップロードされるとすぐに、バケットイベントによって Lambda 関数が開始されますが、それによりAurora PostgreSQL 互換関数が実行されます。

DBA

データ、ログ、エラーファイルを確認してください。

Aurora PostgreSQL 互換関数はファイルをメインテーブルに読み込み、S3 バケットに .log.bad のファイルを作成します。

DBA

ソリューションを監視します。

Amazon CloudWatch コンソールで、Lambda 関数をモニタリングします。

DBA

テストファイルを処理

タスク説明必要なスキル

S3 バケットにファイルをアップロードします。

テストファイルを S3 バケットにアップロードするには、コンソールまたは AWS CLIにある以下のコマンドを使用します。 

aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps

ファイルがアップロードされるとすぐに、バケットイベントによって Lambda 関数が開始されますが、それによりAurora PostgreSQL 互換関数が実行されます。

DBA

データ、ログ、エラーファイルを確認してください。

Aurora PostgreSQL 互換関数はファイルをメインテーブルに読み込み、S3 バケットに .log.bad のファイルを作成します。

DBA

ソリューションを監視します。

Amazon CloudWatch コンソールで、Lambda 関数をモニタリングします。

DBA

関連リソース

追加情報

ext_table_scripts

CREATE EXTENSION aws_s3 CASCADE; CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE (     table_name_stg character varying(100) ,     table_name character varying(100)  ,     col_list character varying(1000)  ,     data_type character varying(100)  ,     col_order numeric,     start_pos numeric,     end_pos numeric,     no_position character varying(100)  ,     date_mask character varying(100)  ,     delimeter character(1)  ,     directory character varying(100)  ,     file_name character varying(100)  ,     header_exist character varying(5) ); CREATE TABLE IF NOT EXISTS ext_tbl_stg (     col1 text ); CREATE TABLE IF NOT EXISTS error_table (     error_details text,     file_name character varying(100),     processed_time timestamp without time zone ); CREATE TABLE IF NOT EXISTS log_table (     file_name character varying(50) COLLATE pg_catalog."default",     processed_date timestamp without time zone,     tot_rec_count numeric,     proc_rec_count numeric,     error_rec_count numeric ); sample insert scripts of meta data: INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');

s3bucketpolicy_for import

---Import role policy --Create an IAM policy to allow, Get,  and list actions on S3 bucket  {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3import",             "Action": [                 "s3:GetObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest",                 "arn:aws:s3:::s3importtest/*"             ]         }     ] } --Export Role policy --Create an IAM policy to allow, put,  and list actions on S3 bucket {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3export",             "Action": [                 "S3:PutObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest/*"             ]         }     ] }

Sample DB function load_external_tables_latest

CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)  RETURNS character varying  LANGUAGE plpgsql AS $function$ /* Loading data from S3 bucket into a APG table */ DECLARE  v_final_sql TEXT;  pi_ext_table TEXT;  r refCURSOR;  v_sqlerrm text;  v_chunk numeric;  i integer;  v_col_list TEXT;  v_postion_list CHARACTER VARYING(1000);  v_len  integer;  v_delim varchar;  v_file_name CHARACTER VARYING(1000);  v_directory CHARACTER VARYING(1000);  v_table_name_stg CHARACTER VARYING(1000);  v_sql_col TEXT;  v_sql TEXT;  v_sql1 TEXT;  v_sql2 TEXT;  v_sql3 TEXT;  v_cnt integer;  v_sql_dynamic TEXT;  v_sql_ins TEXT;  proc_rec_COUNT integer;  error_rec_COUNT integer;  tot_rec_COUNT integer;  v_rec_val integer;  rec record;  v_col_cnt integer;  kv record;  v_val text;  v_header text;  j integer;  ERCODE VARCHAR(5);  v_region text;  cr CURSOR FOR  SELECT distinct DELIMETER,    FILE_NAME,    DIRECTORY  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table    AND DELIMETER IS NOT NULL;  cr1 CURSOR FOR    SELECT   col_list,    data_type,    start_pos,    END_pos,    concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,    no_position,date_mask  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table  order by col_order asc; cr2 cursor FOR SELECT  distinct table_name,table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename); BEGIN  -- PERFORM utl_file_utility.init();    v_region := 'us-east-1';    /* find tab details from file name */    --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;   -- DELETE FROM  log_table WHERE file_name= pi_filename;  BEGIN    SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename);  EXCEPTION    WHEN NO_DATA_FOUND THEN     raise notice 'error 1,%',sqlerrm;     pi_ext_table := null;     v_table_name_stg := null;       RAISE USING errcode = 'NTFIP' ;     when others then         raise notice 'error others,%',sqlerrm;  END;  j :=1 ;    for rec in  cr2  LOOP   pi_ext_table     := rec.table_name;   v_table_name_stg := rec.table_name_stg;   v_col_list := null;  IF pi_ext_table IS NOT NULL   THEN     --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;    EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;        SELECT distinct DELIMETER INTO STRICT v_delim        FROM  meta_EXTERNAL_TABLE        WHERE table_name = pi_ext_table;        IF v_delim IS NOT NULL THEN      SELECT distinct DELIMETER,        FILE_NAME,        DIRECTORY ,        concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist      INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table        AND DELIMETER IS NOT NULL;      IF    upper(v_delim) = 'CSV'      THEN        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',        v_table_name_stg,''','''',        ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',        v_directory,''',''',v_file_name,''', ''',v_region,'''))');        ELSE        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',            v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','           aws_commons.create_s3_uri            ( ''',v_directory, ''',''',            v_file_name, ''',',             '''',v_region,''')           )');           raise notice 'v_sql , %',v_sql;        begin         EXECUTE  v_sql;        EXCEPTION          WHEN OTHERS THEN            raise notice 'error 1';          RAISE USING errcode = 'S3IMP' ;        END;        select count(col_list) INTO v_col_cnt        from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;         -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        i :=1;        FOR rec in cr1        loop        v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');        v_sql2 := concat_ws('',v_sql2,rec.col_list,',');    --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                   coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;        i :=i+1;        end loop;          -- raise notice 'v_sql 3, %',v_sql3;        SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;        SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;        END IF;       raise notice 'v_delim , %',v_delim;      EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;     raise notice 'stg cnt , %',v_cnt;     /* if upper(v_delim) = 'CSV' then        v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );      else       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        END IF;*/ v_chunk := v_cnt/100; for i in 1..101 loop      BEGIN     -- raise notice 'v_sql , %',v_sql;        -- raise notice 'Chunk number , %',i;        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');      v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);      -- raise notice 'select statement , %',v_sql_ins;           -- v_sql := null;      -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );      --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );      -- raise notice 'insert statement , %',v_sql;     raise NOTICE 'CHUNK START %',v_chunk*(i-1);    raise NOTICE 'CHUNK END %',v_chunk;      EXECUTE v_sql;   EXCEPTION        WHEN OTHERS THEN        -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          -- raise notice 'Chunk number for cursor , %',i;     raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);    raise NOTICE 'Cursor -  CHUNK END %',v_chunk;          v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);         -- raise notice 'v_final_sql %',v_final_sql;          v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';            begin            open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';            loop            begin            fetch r into v_rec;            EXIT WHEN NOT FOUND;            v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');             execute v_sql;            exception             when others then           v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';                execute v_sql;              continue;            end ;            end loop;            close r;            exception            when others then          raise;            end ; $a$');       -- raise notice ' inside excp v_sql %',v_sql;           execute v_sql;       --  raise notice 'v_sql %',v_sql;        END;   END LOOP;      ELSE      SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist        INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table                  ;      v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',        v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','       aws_commons.create_s3_uri        ( ''',v_directory, ''',''',        v_file_name, ''',',         '''',v_region,''')       )');          EXECUTE  v_sql;      FOR rec in cr1      LOOP       IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'       THEN         v_rec_val := 1;       ELSE        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                   coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;       END IF;       v_col_list := concat_ws('',v_col_list ,v_sql1);      END LOOP;            SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;            SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;            v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');            v_sql_dynamic := v_sql_col;            EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;          IF v_rec_val = 1 THEN              v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;          ELSE                v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;            END IF;      BEGIN        EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);            EXCEPTION               WHEN OTHERS THEN           IF v_rec_val = 1 THEN                   v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';                 ELSE                  v_final_sql := ' SELECT col1 from';                END IF;        v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';              begin              open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;              loop              begin              if   v_rec_val = 1 then              fetch r into line_number,col1;              else              fetch r into col1;              end if;              EXIT WHEN NOT FOUND;               if v_rec_val = 1 then               select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;               else                 select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;               end if;              insert into  ',pi_ext_table,' select v_rec.*;               exception               when others then                INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());                continue;               end ;                end loop;              close r;               exception               when others then               raise;               end ; $a$');          execute v_sql;      END;          END IF;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;    INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);    raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    ); raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );    END IF;  j := j+1;  END LOOP;        RETURN 'OK'; EXCEPTION     WHEN  OTHERS THEN   raise notice 'error %',sqlerrm;    ERCODE=SQLSTATE;    IF ERCODE = 'NTFIP' THEN      v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');    ELSIF ERCODE = 'S3IMP' THEN     v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');    ELSE       v_sqlerrm := sqlerrm;    END IF;  select distinct directory into v_directory from  meta_EXTERNAL_TABLE;  raise notice 'exc v_directory, %',v_directory;    raise notice 'exc pi_filename, %',pi_filename;    raise notice 'exc v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );     RETURN null; END; $function$
プライバシーサイト規約Cookie の設定
© 2025, Amazon Web Services, Inc. or its affiliates.All rights reserved.