翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Oracle 外部テーブルを Amazon Aurora PostgreSQL 互換に移行
作成者: anuradha chintha (AWS) と Rakesh Raghav (AWS)
概要
外部テーブルにより、Oracle がデータベースの外部にあるフラットファイルに保存されているデータをクエリできます。ORACLE_LOADER ドライバーを使用して、SQL*Loader ユーティリティでロードできるあらゆる形式で保存されているデータにアクセスできます。外部テーブルではデータ操作言語 (DML) を使用できませんが、クエリ、結合、ソート操作には外部テーブルを使用できます。
Amazon Aurora PostgreSQL 互換エディションには、Oracle の外部テーブルと類似する機能はありません。代わりに、モダナイゼーションを使用して、機能要件を満たし、かつ質素なスケーラブルなソリューションを開発する必要があります。
このパターンでは、aws_s3
拡張を使用して、さまざまなタイプの Oracle 外部テーブルをAmazon Web Services (AWS) クラウド上の Aurora PostgreSQL 互換エディションに移行する手順を示しています。
実稼働環境に実装する前に、このソリューションを徹底的にテストすることを推奨します。
前提条件と制限
前提条件
アクティブなAWS アカウント
AWS コマンドラインインターフェイス (AWS CLI)
使用可能な Aurora PostgreSQL 互換データベースインスタンス。
外部テーブルのあるオンプレミスの Oracle データベース
PG. クライアント API
データファイル
機能制限
このパターンには Oracle 外部テーブルの置き換えの機能がありません。ただし、手順とサンプルコードをさらに拡張して、データベースのモダナイゼーション目標を達成することができます。
ファイルには、
aws_s3
エクスポート関数とインポート関数で区切り文字として渡される文字を含んではなりません。
製品バージョン
Amazon S3 から PostgreSQL の RDS にインポートするには、データベースで PostgreSQL バージョン 10.7 以降を実行する必要があります。
アーキテクチャ
ソーステクノロジースタック
Oracle
ソースアーキテクチャ

ターゲットテクノロジースタック
Amazon Aurora PostgreSQL- 互換
Amazon CloudWatch
AWS Lambda
AWS Secrets Manager
Amazon Simple Notification Service (Amazon SNS)
Amazon Simple Storage Service (Amazon S3)
ターゲットアーキテクチャ
以下の図表は、このソリューションの概要を示しています。

ファイルが S3 バケットにアップロードされます。
Lambda 関数が初期化されます。
Lambda 関数は DB 関数呼び出しを開始します。
Secrets Manager は、データベースにアクセスするための認証情報を提供します。
DB 関数によって、SNS アラームが作成されます。
自動化とスケール
外部テーブルへの追加や変更は、メタデータのメンテナンスで処理できます。
ツール
「Amazon Aurora PostgreSQL-互換」 — Amazon Aurora PostgreSQL 互換エディションは、フルマネージド型で PostgreSQL 互換で、ACID 準拠のリレーショナルデータベースエンジンです。ハイエンドの商用データベースのスピードと信頼性を、オープンソースデータベースの高いコスト効率を経験できます。
「AWS CLI」 — AWS コマンドラインインターフェイス (AWS CLI)は、AWS のサービスを管理するための統合ツールです。ダウンロードおよび設定用の1つのツールのみを使用して、コマンドラインから複数の AWS サービスを制御し、スクリプトを使用してこれらを自動化することができます。
「Amazon CloudWatch」 — Amazon CloudWatch は Amazon S3 のリソースと利用状況を監視します。
「AWS Lambda」 — AWS Lambdaは、サーバーのプロビジョニングや管理、ワークロードに対応したクラスタースケーリングロジックの作成、イベント統合の維持、あるいはランタイムの管理などを行うことなくコードを実行でき、サーバーレスコンピューティングサービスです。このパターンでは、ファイルが Amazon S3 にアップロードされるたびに、Lambda がデータベース関数を実行します。
「AWS Secrets Manager」 — AWS Secrets Manager は、認証情報を保存および取得するためのサービスです。Secrets Manager を使用して、コードにハードコードされた認証情報 (パスワードを含む) を Secrets Manager への API コールに置き換えて、シークレットをプログラムで取得できます。
「Amazon S3」 — Amazon Simple Storage Service (Amazon S3) は、Aurora PostgreSQL 互換クラスターとの間で消費および送信するファイルを受信および保存するためのストレージレイヤーを提供します。
「aws_s3」 —
aws_s3
の拡張は Amazon S3 と Aurora PostgreSQL 互換を統合します。「Amazon SNS」 — Amazon Simple Notification Service (Amazon SNS)は、パブリッシャーやクライアントの間のメッセージ配信や送信を調整および管理します。このパターンでは、Amazon SNS を使用して通知を送信します。
Code
ファイルを S3 バケットに配置するたびに、DB 関数を作成して処理アプリケーションまたは Lambda 関数から呼び出す必要があります。詳細については、コード (添付) を参照してください。
エピック
タスク | 説明 | 必要なスキル |
---|---|---|
外部ファイルをソースデータベースに追加します。 | 外部ファイルを作成し、 | DBA |
タスク | 説明 | 必要なスキル |
---|---|---|
Aurora PostgreSQL データベースを作成します。 | Amazon Aurora PostgreSQL 互換クラスターに DB インスタンスを作成します。 | DBA |
スキーマ、aws_s3 エクステンション、テーブルを作成します。 | 「追加情報」セクションの | DBA、開発者 |
DB関数を作成します。 | DB 関数を作成するには、追加情報セクションの | DBA、開発者 |
タスク | 説明 | 必要なスキル |
---|---|---|
ロールを作成します。 | Amazon S3 と Amazon Relational Database Service (Amazon RDS) にアクセスする権限を持つロールを作成します。このロールは、パターンを実行するための Lambda に割り当てられます。 | DBA |
Lambda 関数を作成します。 | Amazon S3 (例: 関数呼び出しの結果に応じて、SNS 通知が開始されます (例、 ビジネスニーズに基づいて、必要に応じて追加のコードを使用して Lambda 関数を作成できます。詳細については、Lambda の「ドキュメント」 を参照してください。 | DBA |
S3 バケットイベントトリガーを設定します。 | S3 バケットのすべてのオブジェクト作成イベントで Lambda 関数を呼び出すメカニズムを設定します。 | DBA |
シークレットを作成します。 | Secrets Manager を使用して、データベース認証情報のシークレット名を作成します。Lambda 関数にシークレットを渡します。 | DBA |
Lambda サポートファイルをアップロードします。 | Lambda サポートパッケージを Aurora PostgreSQL 互換に接続するための添付の Python スクリプトを含む.zip ファイルをアップロードします。Python コードは、データベースで作成した関数を呼び出します。 | DBA |
SNS トピックを作成します。 | SNS トピックを作成して、データロードの成功または失敗のメールを送信します。 | DBA |
タスク | 説明 | 必要なスキル |
---|---|---|
S3 バケットを作成する。 | Amazon S3 コンソールで、先頭にスラッシュを含まない一意の名前で S3 バケットを作成します。S3 バケット名はグローバルに一意であり、名前空間はすべての AWS アカウントによって共有されます。 | DBA |
IAM ポリシーを作成します。 | AWS 識別とアクセス管理(IAM) ポリシーを作成するには、追加情報セクションの | DBA |
ロールを作成します。 | Aurora PostgreSQL 互換用に 2 つのロールを作成します。1 つはインポート用で、もう 1 つはエクスポート用です。対応するポリシーをロールに割り当てます。 | DBA |
ロールを Aurora PostgreSQL 互換クラスターにアタッチします。 | ロールの管理で、Aurora PostgreSQL クラスターにインポートロールとエクスポートロールをアタッチします。 | DBA |
Aurora PostgreSQL 互換のサポートオブジェクトを作成します。 | テーブルスクリプトについては、追加情報セクション カスタム関数については、追加情報セクションの | DBA |
タスク | 説明 | 必要なスキル |
---|---|---|
S3 バケットにファイルをアップロードします。 | テストファイルを S3 バケットにアップロードするには、コンソールまたは AWS CLIにある以下のコマンドを使用します。
ファイルがアップロードされるとすぐに、バケットイベントによって Lambda 関数が開始されますが、それによりAurora PostgreSQL 互換関数が実行されます。 | DBA |
データ、ログ、エラーファイルを確認してください。 | Aurora PostgreSQL 互換関数はファイルをメインテーブルに読み込み、S3 バケットに | DBA |
ソリューションを監視します。 | Amazon CloudWatch コンソールで、Lambda 関数をモニタリングします。 | DBA |
関連リソース
追加情報
ext_table_scripts
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
table_name_stg character varying(100) ,
table_name character varying(100) ,
col_list character varying(1000) ,
data_type character varying(100) ,
col_order numeric,
start_pos numeric,
end_pos numeric,
no_position character varying(100) ,
date_mask character varying(100) ,
delimeter character(1) ,
directory character varying(100) ,
file_name character varying(100) ,
header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
error_details text,
file_name character varying(100),
processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
file_name character varying(50) COLLATE pg_catalog."default",
processed_date timestamp without time zone,
tot_rec_count numeric,
proc_rec_count numeric,
error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
s3bucketpolicy_for import
---Import role policy
--Create an IAM policy to allow, Get, and list actions on S3 bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3import",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::s3importtest",
"arn:aws:s3:::s3importtest/*"
]
}
]
}
--Export Role policy
--Create an IAM policy to allow, put, and list actions on S3 bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3export",
"Action": [
"S3:PutObject",
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::s3importtest/*"
]
}
]
}
Sample DB function load_external_tables_latest
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
RETURNS character varying
LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
v_final_sql TEXT;
pi_ext_table TEXT;
r refCURSOR;
v_sqlerrm text;
v_chunk numeric;
i integer;
v_col_list TEXT;
v_postion_list CHARACTER VARYING(1000);
v_len integer;
v_delim varchar;
v_file_name CHARACTER VARYING(1000);
v_directory CHARACTER VARYING(1000);
v_table_name_stg CHARACTER VARYING(1000);
v_sql_col TEXT;
v_sql TEXT;
v_sql1 TEXT;
v_sql2 TEXT;
v_sql3 TEXT;
v_cnt integer;
v_sql_dynamic TEXT;
v_sql_ins TEXT;
proc_rec_COUNT integer;
error_rec_COUNT integer;
tot_rec_COUNT integer;
v_rec_val integer;
rec record;
v_col_cnt integer;
kv record;
v_val text;
v_header text;
j integer;
ERCODE VARCHAR(5);
v_region text;
cr CURSOR FOR
SELECT distinct DELIMETER,
FILE_NAME,
DIRECTORY
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
AND DELIMETER IS NOT NULL;
cr1 CURSOR FOR
SELECT col_list,
data_type,
start_pos,
END_pos,
concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
no_position,date_mask
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
order by col_order asc;
cr2 cursor FOR
SELECT distinct table_name,table_name_stg
FROM meta_EXTERNAL_TABLE
WHERE upper(file_name) = upper(pi_filename);
BEGIN
-- PERFORM utl_file_utility.init();
v_region := 'us-east-1';
/* find tab details from file name */
--DELETE FROM ERROR_TABLE WHERE file_name= pi_filename;
-- DELETE FROM log_table WHERE file_name= pi_filename;
BEGIN
SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
FROM meta_EXTERNAL_TABLE
WHERE upper(file_name) = upper(pi_filename);
EXCEPTION
WHEN NO_DATA_FOUND THEN
raise notice 'error 1,%',sqlerrm;
pi_ext_table := null;
v_table_name_stg := null;
RAISE USING errcode = 'NTFIP' ;
when others then
raise notice 'error others,%',sqlerrm;
END;
j :=1 ;
for rec in cr2
LOOP
pi_ext_table := rec.table_name;
v_table_name_stg := rec.table_name_stg;
v_col_list := null;
IF pi_ext_table IS NOT NULL
THEN
--EXECUTE concat_ws('','truncate table ' ,pi_ext_table) ;
EXECUTE concat_ws('','truncate table ' ,v_table_name_stg) ;
SELECT distinct DELIMETER INTO STRICT v_delim
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table;
IF v_delim IS NOT NULL THEN
SELECT distinct DELIMETER,
FILE_NAME,
DIRECTORY ,
concat_ws('',' ',table_name_stg),
case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
AND DELIMETER IS NOT NULL;
IF upper(v_delim) = 'CSV'
THEN
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
v_table_name_stg,''','''',
''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
v_directory,''',''',v_file_name,''', ''',v_region,'''))');
ELSE
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
aws_commons.create_s3_uri
( ''',v_directory, ''',''',
v_file_name, ''',',
'''',v_region,''')
)');
raise notice 'v_sql , %',v_sql;
begin
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
raise notice 'error 1';
RAISE USING errcode = 'S3IMP' ;
END;
select count(col_list) INTO v_col_cnt
from meta_EXTERNAL_TABLE where table_name = pi_ext_table;
-- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');
execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');
i :=1;
FOR rec in cr1
loop
v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
-- v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');
case
WHEN upper(rec.data_type) = 'NUMERIC'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'MM/DD/YYYY hh24:mi:ss'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
ELSE
v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
END case;
i :=i+1;
end loop;
-- raise notice 'v_sql 3, %',v_sql3;
SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;
SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;
SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;
END IF;
raise notice 'v_delim , %',v_delim;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;
raise notice 'stg cnt , %',v_cnt;
/* if upper(v_delim) = 'CSV' then
v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
else
-- v_sql_ins := concat_ws('',' SELECT ',v_sql1,' from (select col1 from ' ,v_table_name_stg , ')sub ');
v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ')sub ');
END IF;*/
v_chunk := v_cnt/100;
for i in 1..101
loop
BEGIN
-- raise notice 'v_sql , %',v_sql;
-- raise notice 'Chunk number , %',i;
v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');
v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins);
-- raise notice 'select statement , %',v_sql_ins;
-- v_sql := null;
-- EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
--v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins );
-- raise notice 'insert statement , %',v_sql;
raise NOTICE 'CHUNK START %',v_chunk*(i-1);
raise NOTICE 'CHUNK END %',v_chunk;
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
-- v_sql_ins := concat_ws('',' SELECT ',v_sql1, ' from (select col1 from ' ,v_table_name_stg , ' )sub ');
-- raise notice 'Chunk number for cursor , %',i;
raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
raise NOTICE 'Cursor - CHUNK END %',v_chunk;
v_sql_ins := concat_ws('',' SELECT ',v_sql3, ' from (select col1 from ' ,v_table_name_stg , ' )sub ');
v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
-- raise notice 'v_final_sql %',v_final_sql;
v_sql :=concat_ws('','do $a$ declare r refcursor;v_sql text; i numeric;v_conname text; v_typ ',pi_ext_table,'[]; v_rec ','record',';
begin
open r for execute ''select col1 from ',v_table_name_stg ,' offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
loop
begin
fetch r into v_rec;
EXIT WHEN NOT FOUND;
v_sql := concat_ws('''',''insert into ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , ' from ( select '''''',v_rec.col1,'''''' as col1) v'');
execute v_sql;
exception
when others then
v_sql := ''INSERT INTO ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';
execute v_sql;
continue;
end ;
end loop;
close r;
exception
when others then
raise;
end ; $a$');
-- raise notice ' inside excp v_sql %',v_sql;
execute v_sql;
-- raise notice 'v_sql %',v_sql;
END;
END LOOP;
ELSE
SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table ;
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
aws_commons.create_s3_uri
( ''',v_directory, ''',''',
v_file_name, ''',',
'''',v_region,''')
)');
EXECUTE v_sql;
FOR rec in cr1
LOOP
IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
THEN
v_rec_val := 1;
ELSE
case
WHEN upper(rec.data_type) = 'NUMERIC'
THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
ELSE
v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
END case;
END IF;
v_col_list := concat_ws('',v_col_list ,v_sql1);
END LOOP;
SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;
v_sql_col := concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');
v_sql_dynamic := v_sql_col;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;
IF v_rec_val = 1 THEN
v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;
ELSE
v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
END IF;
BEGIN
EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins);
EXCEPTION
WHEN OTHERS THEN
IF v_rec_val = 1 THEN
v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
ELSE
v_final_sql := ' SELECT col1 from';
END IF;
v_sql :=concat_ws('','do $a$ declare r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ ',pi_ext_table,'[]; v_rec ',pi_ext_table,';
begin
open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
loop
begin
if v_rec_val = 1 then
fetch r into line_number,col1;
else
fetch r into col1;
end if;
EXIT WHEN NOT FOUND;
if v_rec_val = 1 then
select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
else
select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
end if;
insert into ',pi_ext_table,' select v_rec.*;
exception
when others then
INSERT INTO ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
continue;
end ;
end loop;
close r;
exception
when others then
raise;
end ; $a$');
execute v_sql;
END;
END IF;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ' ,pi_ext_table) INTO proc_rec_COUNT;
EXECUTE concat_ws('','SELECT COUNT(*) FROM error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date') INTO error_rec_COUNT;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO tot_rec_COUNT;
INSERT INTO log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);
raise notice 'v_directory, %',v_directory;
raise notice 'pi_filename, %',pi_filename;
raise notice 'v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM error_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
raise notice 'v_directory, %',v_directory;
raise notice 'pi_filename, %',pi_filename;
raise notice 'v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT * FROM log_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
END IF;
j := j+1;
END LOOP;
RETURN 'OK';
EXCEPTION
WHEN OTHERS THEN
raise notice 'error %',sqlerrm;
ERCODE=SQLSTATE;
IF ERCODE = 'NTFIP' THEN
v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
ELSIF ERCODE = 'S3IMP' THEN
v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
ELSE
v_sqlerrm := sqlerrm;
END IF;
select distinct directory into v_directory from meta_EXTERNAL_TABLE;
raise notice 'exc v_directory, %',v_directory;
raise notice 'exc pi_filename, %',pi_filename;
raise notice 'exc v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT * FROM error_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
RETURN null;
END;
$function$