Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Migrer des tables externes Oracle vers des tables compatibles avec Amazon Aurora PostgreSQL
Créée par anuradha chintha (AWS) et Rakesh Raghav (AWS)
Récapitulatif
Les tables externes permettent à Oracle d'interroger des données stockées en dehors de la base de données dans des fichiers plats. Vous pouvez utiliser le pilote ORACLE_LOADER pour accéder à toutes les données stockées dans n'importe quel format pouvant être chargées par l'utilitaire SQL*Loader. Vous ne pouvez pas utiliser le langage de manipulation de données (DML) sur des tables externes, mais vous pouvez utiliser des tables externes pour les opérations de requête, de jointure et de tri.
L'édition compatible avec Amazon Aurora PostgreSQL ne fournit pas de fonctionnalités similaires à celles des tables externes d'Oracle. Vous devez plutôt recourir à la modernisation pour développer une solution évolutive qui répond aux exigences fonctionnelles et qui soit économe.
Ce modèle décrit les étapes à suivre pour migrer différents types de tables externes Oracle vers l'édition compatible Aurora PostgreSQL sur le cloud Amazon Web Services (AWS) à l'aide de l'extension. aws_s3
Nous vous recommandons de tester minutieusement cette solution avant de l'implémenter dans un environnement de production.
Conditions préalables et limitations
Prérequis
Un compte AWS actif
Interface de ligne de commande AWS (AWS CLI)
Une instance de base de données compatible Aurora PostgreSQL disponible.
Une base de données Oracle sur site avec une table externe
API PG.Client
Fichiers de données
Limites
Ce modèle ne fournit pas les fonctionnalités nécessaires pour remplacer les tables externes Oracle. Cependant, les étapes et les exemples de code peuvent être encore améliorés pour atteindre les objectifs de modernisation de votre base de données.
Les fichiers ne doivent pas contenir le caractère utilisé comme délimiteur dans les fonctions
aws_s3
d'exportation et d'importation.
Versions du produit
Pour effectuer une importation depuis Amazon S3 vers RDS pour PostgreSQL, la base de données doit exécuter PostgreSQL version 10.7 ou ultérieure.
Architecture
Pile technologique source
Oracle
Architecture source

Pile technologique cible
Compatible avec Amazon Aurora PostgreSQL
Amazon CloudWatch
AWS Lambda
AWS Secrets Manager
Amazon Simple Notification Service (Amazon SNS)
Amazon Simple Storage Service (Amazon S3)
Architecture cible
Le schéma suivant montre une représentation de haut niveau de la solution.

Les fichiers sont chargés dans le compartiment S3.
La fonction Lambda est lancée.
La fonction Lambda lance l'appel de fonction de base de données.
Secrets Manager fournit les informations d'identification pour accéder à la base de données.
En fonction de la fonction de base de données, une alarme SNS est créée.
Automatisation et mise à l'échelle
Tout ajout ou modification aux tables externes peut être géré grâce à la maintenance des métadonnées.
Outils
Compatible avec Amazon Aurora PostgreSQL — Amazon Aurora PostgreSQL Compatible Edition est un moteur de base de données relationnelle entièrement géré, compatible avec PostgreSQL et conforme à l'ACID qui associe la vitesse et la fiabilité des bases de données commerciales haut de gamme à la rentabilité des bases de données open source.
AWS CLI — AWS Command Line Interface (AWS CLI) est un outil unifié permettant de gérer vos services AWS. Avec un seul outil à télécharger et à configurer, vous pouvez contrôler plusieurs services AWS depuis la ligne de commande et les automatiser par le biais de scripts.
Amazon CloudWatch — Amazon CloudWatch surveille les ressources et l'utilisation d'Amazon S3.
AWS Lambda — AWS Lambda est un service de calcul sans serveur qui permet d'exécuter du code sans provisionner ni gérer de serveurs, de créer une logique de dimensionnement des clusters adaptée à la charge de travail, de gérer les intégrations d'événements ou de gérer les temps d'exécution. Dans ce modèle, Lambda exécute la fonction de base de données chaque fois qu'un fichier est chargé sur Amazon S3.
AWS Secrets Manager — AWS Secrets Manager est un service de stockage et de récupération des informations d'identification. À l'aide de Secrets Manager, vous pouvez remplacer les informations d'identification codées en dur dans votre code, y compris les mots de passe, par un appel d'API à Secrets Manager pour récupérer le secret par programmation.
Amazon S3 — Amazon Simple Storage Service (Amazon S3) fournit une couche de stockage permettant de recevoir et de stocker des fichiers destinés à être consommés et transmis vers et depuis le cluster compatible Aurora PostgreSQL.
aws_s3 — L'
aws_s3
extension intègre la compatibilité avec Amazon S3 et Aurora PostgreSQL.Amazon SNS — Amazon Simple Notification Service (Amazon SNS) coordonne et gère la distribution ou l'envoi de messages entre les éditeurs et les clients. Dans ce modèle, Amazon SNS est utilisé pour envoyer des notifications.
Code
Chaque fois qu'un fichier est placé dans le compartiment S3, une fonction de base de données doit être créée et appelée depuis l'application de traitement ou la fonction Lambda. Pour plus de détails, consultez le code (ci-joint).
Épopées
Tâche | Description | Compétences requises |
---|---|---|
Ajoutez un fichier externe à la base de données source. | Créez un fichier externe et déplacez-le vers le | DBA |
Tâche | Description | Compétences requises |
---|---|---|
Créez une base de données Aurora PostgreSQL. | Créez une instance de base de données dans votre cluster compatible Amazon Aurora PostgreSQL. | DBA |
Créez un schéma, une extension aws_s3 et des tables. | Utilisez le code ci-dessous | DBA, Développeur |
Créez la fonction de base de données. | Pour créer la fonction de base de données, utilisez le code sous | DBA, Développeur |
Tâche | Description | Compétences requises |
---|---|---|
Créez un rôle. | Créez un rôle avec des autorisations pour accéder à Amazon S3 et Amazon Relational Database Service (Amazon RDS). Ce rôle sera attribué à Lambda pour exécuter le modèle. | DBA |
Créez la fonction Lambda. | Créez une fonction Lambda qui lit le nom du fichier depuis Amazon S3 (par exemple En fonction du résultat de l'appel de fonction, une notification SNS sera lancée (par exemple, En fonction des besoins de votre entreprise, vous pouvez créer une fonction Lambda avec du code supplémentaire si nécessaire. Pour plus d'informations, consultez la documentation Lambda. | DBA |
Configurez un déclencheur d'événement du compartiment S3. | Configurez un mécanisme pour appeler la fonction Lambda pour tous les événements de création d'objets dans le compartiment S3. | DBA |
Créez un secret. | Créez un nom secret pour les informations d'identification de la base de données à l'aide de Secrets Manager. Transmettez le secret dans la fonction Lambda. | DBA |
Téléchargez les fichiers de support Lambda. | Téléchargez un fichier .zip contenant les packages de support Lambda et le script Python joint pour vous connecter à Aurora PostgreSQL compatible. Le code Python appelle la fonction que vous avez créée dans la base de données. | DBA |
Créez une rubrique SNS. | Créez une rubrique SNS pour envoyer un e-mail en cas de réussite ou d'échec du chargement des données. | DBA |
Tâche | Description | Compétences requises |
---|---|---|
Créez un compartiment S3. | Sur la console Amazon S3, créez un compartiment S3 avec un nom unique qui ne contient pas de barres obliques. Le nom d'un compartiment S3 est unique au monde et l'espace de noms est partagé par tous les comptes AWS. | DBA |
Créez des politiques IAM. | Pour créer les politiques AWS Identity and Access Management (IAM), utilisez le code ci-dessous | DBA |
Créez des rôles. | Créez deux rôles pour la compatibilité avec Aurora PostgreSQL, un rôle pour l'importation et un rôle pour l'exportation. Assignez les politiques correspondantes aux rôles. | DBA |
Associez les rôles au cluster compatible avec Aurora PostgreSQL. | Sous Gérer les rôles, attachez les rôles d'importation et d'exportation au cluster Aurora PostgreSQL. | DBA |
Créez des objets de support compatibles avec Aurora PostgreSQL. | Pour les scripts de table, utilisez le code ci-dessous Pour la fonction personnalisée, utilisez le code ci-dessous | DBA |
Tâche | Description | Compétences requises |
---|---|---|
Téléchargez un fichier dans le compartiment S3. | Pour télécharger un fichier de test dans le compartiment S3, utilisez la console ou la commande suivante dans l'AWS CLI.
Dès que le fichier est chargé, un événement de bucket lance la fonction Lambda, qui exécute la fonction compatible avec Aurora PostgreSQL. | DBA |
Vérifiez les données, le journal et les fichiers d'erreurs. | La fonction compatible avec Aurora PostgreSQL charge les fichiers dans la table principale | DBA |
Surveillez la solution. | Dans la CloudWatch console Amazon, surveillez la fonction Lambda. | DBA |
Ressources connexes
Informations supplémentaires
ext_table_scripts
CREATE EXTENSION aws_s3 CASCADE;
CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE
(
table_name_stg character varying(100) ,
table_name character varying(100) ,
col_list character varying(1000) ,
data_type character varying(100) ,
col_order numeric,
start_pos numeric,
end_pos numeric,
no_position character varying(100) ,
date_mask character varying(100) ,
delimeter character(1) ,
directory character varying(100) ,
file_name character varying(100) ,
header_exist character varying(5)
);
CREATE TABLE IF NOT EXISTS ext_tbl_stg
(
col1 text
);
CREATE TABLE IF NOT EXISTS error_table
(
error_details text,
file_name character varying(100),
processed_time timestamp without time zone
);
CREATE TABLE IF NOT EXISTS log_table
(
file_name character varying(50) COLLATE pg_catalog."default",
processed_date timestamp without time zone,
tot_rec_count numeric,
proc_rec_count numeric,
error_rec_count numeric
);
sample insert scripts of meta data:
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');
s3bucketpolicy_for import
---Import role policy
--Create an IAM policy to allow, Get, and list actions on S3 bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3import",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::s3importtest",
"arn:aws:s3:::s3importtest/*"
]
}
]
}
--Export Role policy
--Create an IAM policy to allow, put, and list actions on S3 bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "s3export",
"Action": [
"S3:PutObject",
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::s3importtest/*"
]
}
]
}
Exemple de fonction de base de données load_external_tables_latest
CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)
RETURNS character varying
LANGUAGE plpgsql
AS $function$
/* Loading data from S3 bucket into a APG table */
DECLARE
v_final_sql TEXT;
pi_ext_table TEXT;
r refCURSOR;
v_sqlerrm text;
v_chunk numeric;
i integer;
v_col_list TEXT;
v_postion_list CHARACTER VARYING(1000);
v_len integer;
v_delim varchar;
v_file_name CHARACTER VARYING(1000);
v_directory CHARACTER VARYING(1000);
v_table_name_stg CHARACTER VARYING(1000);
v_sql_col TEXT;
v_sql TEXT;
v_sql1 TEXT;
v_sql2 TEXT;
v_sql3 TEXT;
v_cnt integer;
v_sql_dynamic TEXT;
v_sql_ins TEXT;
proc_rec_COUNT integer;
error_rec_COUNT integer;
tot_rec_COUNT integer;
v_rec_val integer;
rec record;
v_col_cnt integer;
kv record;
v_val text;
v_header text;
j integer;
ERCODE VARCHAR(5);
v_region text;
cr CURSOR FOR
SELECT distinct DELIMETER,
FILE_NAME,
DIRECTORY
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
AND DELIMETER IS NOT NULL;
cr1 CURSOR FOR
SELECT col_list,
data_type,
start_pos,
END_pos,
concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,
no_position,date_mask
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
order by col_order asc;
cr2 cursor FOR
SELECT distinct table_name,table_name_stg
FROM meta_EXTERNAL_TABLE
WHERE upper(file_name) = upper(pi_filename);
BEGIN
-- PERFORM utl_file_utility.init();
v_region := 'us-east-1';
/* find tab details from file name */
--DELETE FROM ERROR_TABLE WHERE file_name= pi_filename;
-- DELETE FROM log_table WHERE file_name= pi_filename;
BEGIN
SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg
FROM meta_EXTERNAL_TABLE
WHERE upper(file_name) = upper(pi_filename);
EXCEPTION
WHEN NO_DATA_FOUND THEN
raise notice 'error 1,%',sqlerrm;
pi_ext_table := null;
v_table_name_stg := null;
RAISE USING errcode = 'NTFIP' ;
when others then
raise notice 'error others,%',sqlerrm;
END;
j :=1 ;
for rec in cr2
LOOP
pi_ext_table := rec.table_name;
v_table_name_stg := rec.table_name_stg;
v_col_list := null;
IF pi_ext_table IS NOT NULL
THEN
--EXECUTE concat_ws('','truncate table ' ,pi_ext_table) ;
EXECUTE concat_ws('','truncate table ' ,v_table_name_stg) ;
SELECT distinct DELIMETER INTO STRICT v_delim
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table;
IF v_delim IS NOT NULL THEN
SELECT distinct DELIMETER,
FILE_NAME,
DIRECTORY ,
concat_ws('',' ',table_name_stg),
case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table
AND DELIMETER IS NOT NULL;
IF upper(v_delim) = 'CSV'
THEN
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',
v_table_name_stg,''','''',
''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',
v_directory,''',''',v_file_name,''', ''',v_region,'''))');
ELSE
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','
aws_commons.create_s3_uri
( ''',v_directory, ''',''',
v_file_name, ''',',
'''',v_region,''')
)');
raise notice 'v_sql , %',v_sql;
begin
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
raise notice 'error 1';
RAISE USING errcode = 'S3IMP' ;
END;
select count(col_list) INTO v_col_cnt
from meta_EXTERNAL_TABLE where table_name = pi_ext_table;
-- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');
execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');
i :=1;
FOR rec in cr1
loop
v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');
v_sql2 := concat_ws('',v_sql2,rec.col_list,',');
-- v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');
case
WHEN upper(rec.data_type) = 'NUMERIC'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'MM/DD/YYYY hh24:mi:ss'
THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');
ELSE
v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0
THEN null
ELSE
coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
END case;
i :=i+1;
end loop;
-- raise notice 'v_sql 3, %',v_sql3;
SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;
SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;
SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;
SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;
SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;
SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;
END IF;
raise notice 'v_delim , %',v_delim;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;
raise notice 'stg cnt , %',v_cnt;
/* if upper(v_delim) = 'CSV' then
v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );
else
-- v_sql_ins := concat_ws('',' SELECT ',v_sql1,' from (select col1 from ' ,v_table_name_stg , ')sub ');
v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ')sub ');
END IF;*/
v_chunk := v_cnt/100;
for i in 1..101
loop
BEGIN
-- raise notice 'v_sql , %',v_sql;
-- raise notice 'Chunk number , %',i;
v_sql_ins := concat_ws('',' SELECT ',v_sql3,' from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');
v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins);
-- raise notice 'select statement , %',v_sql_ins;
-- v_sql := null;
-- EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );
--v_sql := concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins );
-- raise notice 'insert statement , %',v_sql;
raise NOTICE 'CHUNK START %',v_chunk*(i-1);
raise NOTICE 'CHUNK END %',v_chunk;
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
-- v_sql_ins := concat_ws('',' SELECT ',v_sql1, ' from (select col1 from ' ,v_table_name_stg , ' )sub ');
-- raise notice 'Chunk number for cursor , %',i;
raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);
raise NOTICE 'Cursor - CHUNK END %',v_chunk;
v_sql_ins := concat_ws('',' SELECT ',v_sql3, ' from (select col1 from ' ,v_table_name_stg , ' )sub ');
v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);
-- raise notice 'v_final_sql %',v_final_sql;
v_sql :=concat_ws('','do $a$ declare r refcursor;v_sql text; i numeric;v_conname text; v_typ ',pi_ext_table,'[]; v_rec ','record',';
begin
open r for execute ''select col1 from ',v_table_name_stg ,' offset ',v_chunk*(i-1), ' limit ',v_chunk,''';
loop
begin
fetch r into v_rec;
EXIT WHEN NOT FOUND;
v_sql := concat_ws('''',''insert into ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , ' from ( select '''''',v_rec.col1,'''''' as col1) v'');
execute v_sql;
exception
when others then
v_sql := ''INSERT INTO ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';
execute v_sql;
continue;
end ;
end loop;
close r;
exception
when others then
raise;
end ; $a$');
-- raise notice ' inside excp v_sql %',v_sql;
execute v_sql;
-- raise notice 'v_sql %',v_sql;
END;
END LOOP;
ELSE
SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),
case header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist
INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header
FROM meta_EXTERNAL_TABLE
WHERE table_name = pi_ext_table ;
v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',
v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','
aws_commons.create_s3_uri
( ''',v_directory, ''',''',
v_file_name, ''',',
'''',v_region,''')
)');
EXECUTE v_sql;
FOR rec in cr1
LOOP
IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'
THEN
v_rec_val := 1;
ELSE
case
WHEN upper(rec.data_type) = 'NUMERIC'
THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'
THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');
WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'
THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');
ELSE
v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0
THEN null
ELSE
coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;
END case;
END IF;
v_col_list := concat_ws('',v_col_list ,v_sql1);
END LOOP;
SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;
SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;
v_sql_col := concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');
v_sql_dynamic := v_sql_col;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;
IF v_rec_val = 1 THEN
v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;
ELSE
v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;
END IF;
BEGIN
EXECUTE concat_ws('','insert into ', pi_ext_table ,' ', v_sql_ins);
EXCEPTION
WHEN OTHERS THEN
IF v_rec_val = 1 THEN
v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';
ELSE
v_final_sql := ' SELECT col1 from';
END IF;
v_sql :=concat_ws('','do $a$ declare r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ ',pi_ext_table,'[]; v_rec ',pi_ext_table,';
begin
open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;
loop
begin
if v_rec_val = 1 then
fetch r into line_number,col1;
else
fetch r into col1;
end if;
EXIT WHEN NOT FOUND;
if v_rec_val = 1 then
select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;
else
select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;
end if;
insert into ',pi_ext_table,' select v_rec.*;
exception
when others then
INSERT INTO ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());
continue;
end ;
end loop;
close r;
exception
when others then
raise;
end ; $a$');
execute v_sql;
END;
END IF;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ' ,pi_ext_table) INTO proc_rec_COUNT;
EXECUTE concat_ws('','SELECT COUNT(*) FROM error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date') INTO error_rec_COUNT;
EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO tot_rec_COUNT;
INSERT INTO log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);
raise notice 'v_directory, %',v_directory;
raise notice 'pi_filename, %',pi_filename;
raise notice 'v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM error_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
raise notice 'v_directory, %',v_directory;
raise notice 'pi_filename, %',pi_filename;
raise notice 'v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT * FROM log_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
END IF;
j := j+1;
END LOOP;
RETURN 'OK';
EXCEPTION
WHEN OTHERS THEN
raise notice 'error %',sqlerrm;
ERCODE=SQLSTATE;
IF ERCODE = 'NTFIP' THEN
v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');
ELSIF ERCODE = 'S3IMP' THEN
v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');
ELSE
v_sqlerrm := sqlerrm;
END IF;
select distinct directory into v_directory from meta_EXTERNAL_TABLE;
raise notice 'exc v_directory, %',v_directory;
raise notice 'exc pi_filename, %',pi_filename;
raise notice 'exc v_region, %',v_region;
perform aws_s3.query_export_to_s3('SELECT * FROM error_table WHERE file_name = '''||pi_filename||'''',
aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),
options :='FORmat csv, header, delimiter $$,$$'
);
RETURN null;
END;
$function$