Migre tabelas externas da Oracle para Amazon Aurora SQL Postgre — compatível - Recomendações da AWS

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Migre tabelas externas da Oracle para Amazon Aurora SQL Postgre — compatível

Criado por anuradha chintha (AWS) e Rakesh Raghav () AWS

Ambiente: PoC ou piloto

Origem: Oracle

Alvo: Aurora Postgre SQL

Tipo R: redefinir arquitetura

Workload: código aberto

Tecnologias: migração; bancos de dados; modernização

AWSserviços: AWS Identity and Access Management; AWS Lambda; Amazon S3; Amazon; SNS Amazon Aurora

Resumo

As tabelas externas dão à Oracle a capacidade de consultar dados armazenados fora do banco de dados em arquivos simples. Você pode usar o LOADER driver ORACLE _ para acessar qualquer dado armazenado em qualquer formato que possa ser carregado pelo utilitário SQL *Loader. Você não pode usar a Linguagem de Manipulação de Dados (DML) em tabelas externas, mas pode usar tabelas externas para operações de consulta, junção e classificação.

A edição SQL compatível com o Amazon Aurora Postgre não fornece funcionalidade semelhante às tabelas externas no Oracle. Em vez disso, você deve usar a modernização para desenvolver uma solução escalável que atenda aos requisitos funcionais e seja econômica.

Esse padrão fornece etapas para migrar diferentes tipos de tabelas externas da Oracle para a edição compatível com o Aurora SQL Postgre na nuvem da Amazon Web Services AWS () usando a extensão. aws_s3

Recomendamos testar minuciosamente essa solução antes de implementá-la em um ambiente de produção.

Pré-requisitos e limitações

Pré-requisitos

  • Uma AWS conta ativa

  • AWSInterface de linha de comando (AWSCLI)

  • Uma instância de banco de dados compatível com o Aurora Postgre SQL disponível.

  • Um banco de dados da Oracle on-premises com uma tabela externa

  • PG.Cliente API

  • Arquivos de dados 

Limitações

  • Esse padrão não fornece a funcionalidade para atuar como um substituto para tabelas externas da Oracle. No entanto, as etapas e o código de amostra podem ser aprimorados ainda mais para atingir suas metas de modernização do banco de dados.

  • Os arquivos não devem conter o caractere que está sendo passado como delimitador nas funções de exportação e importação aws_s3.

Versões do produto

  • Para importar do Amazon S3 RDS para o Postgre, SQL o banco de dados deve estar executando a SQL versão 10.7 ou posterior do Postgre.

Arquitetura

Pilha de tecnologia de origem

  • Oracle

Arquitetura de origem

Diagrama dos arquivos de dados que vão para um diretório e uma tabela no banco de dados Oracle on-premises.

Pilha de tecnologias de destino

  • Compatível com Amazon Aurora Postgre SQL

  • Amazon CloudWatch

  • AWSLambda

  • AWS Secrets Manager

  • Serviço de notificação simples da Amazon (AmazonSNS)

  • Amazon Simple Storage Service (Amazon S3)

Arquitetura de destino

O diagrama a seguir mostra uma representação de alto nível da solução.

A descrição está após o diagrama.
  1. Os arquivos são enviados para o bucket do S3.

  2. A função do Lambda é iniciada.

  3. A função do Lambda inicia a chamada da função de banco de dados.

  4. O Secrets Manager fornece as credenciais para acesso ao banco de dados.

  5. Dependendo da função DB, um SNS alarme é criado.

Automação e escala

Qualquer adição ou alteração nas tabelas externas pode ser tratada com a manutenção de metadados.

Ferramentas

  • Compatível com Amazon Aurora Postgre SQL — O Amazon Aurora Postgre SQL -Compatible Edition é um mecanismo de banco de dados relacional totalmente gerenciado, compatível com Postgre e SQL compatível que combina a velocidade e a confiabilidade ACID de bancos de dados comerciais de alto nível com a economia de bancos de dados de código aberto.

  • AWSCLI— A interface de linha de AWS comando (AWSCLI) é uma ferramenta unificada para gerenciar seus AWS serviços. Com apenas uma ferramenta para baixar e configurar, você pode controlar vários AWS serviços na linha de comando e automatizá-los por meio de scripts.

  • Amazon CloudWatch — A Amazon CloudWatch monitora os recursos e a utilização do Amazon S3.

  • AWSLambda — O AWS Lambda é um serviço de computação sem servidor que suporta a execução de código sem provisionar ou gerenciar servidores, criar uma lógica de escalabilidade de cluster com reconhecimento de carga de trabalho, manter integrações de eventos ou gerenciar tempos de execução. Nesse padrão, o Lambda executa a função de banco de dados sempre que um arquivo é carregado no Amazon S3.

  • AWSSecrets Manager — AWS Secrets Manager é um serviço para armazenamento e recuperação de credenciais. Usando o Secrets Manager, você pode substituir as credenciais codificadas em seu código, incluindo senhas, por uma API chamada para o Secrets Manager para recuperar o segredo programaticamente.

  • Amazon S3 — O Amazon Simple Storage Service (Amazon S3) fornece uma camada de armazenamento para receber e armazenar arquivos para consumo e transmissão de e para o cluster compatível com o Aurora Postgre. SQL

  • aws_s3 — A extensão aws_s3 integra Amazon S3 e Aurora Postgre -Compatível. SQL

  • Amazon SNS — O Amazon Simple Notification Service (AmazonSNS) coordena e gerencia a entrega ou o envio de mensagens entre editores e clientes. Nesse padrão, a Amazon SNS é usada para enviar notificações.

Código

Sempre que um arquivo é colocado no bucket do S3, uma função de banco de dados deve ser criada e chamada a partir do aplicativo de processamento ou da função do Lambda. Para obter detalhes, consulte o código (em anexo).

Épicos

TarefaDescriçãoHabilidades necessárias

Adicione um arquivo externo ao banco de dados de origem.

Crie um arquivo externo e mova-o para o diretório oracle.

DBA
TarefaDescriçãoHabilidades necessárias

Crie um banco de dados Aurora Postgre. SQL

Crie uma instância de banco de dados em seu cluster compatível com Amazon Aurora PostgreSQL.

DBA

Crie um esquema, uma extensão aws_s3 e tabelas.

Use o código em ext_tbl_scripts na seção Informações adicionais. As tabelas incluem tabelas reais, tabelas intermediárias, tabelas de erros e logs e uma metatabela.

DBA, Desenvolvedor

Criar a função de banco de dados.

Para criar a função de banco de dados (DB), use o código na função load_external_table_latest da seção Informações adicionais.

DBA, Desenvolvedor
TarefaDescriçãoHabilidades necessárias

Crie uma função.

Crie uma função com permissões para acessar o Amazon S3 e o Amazon Relational Database Service (Amazon). RDS Essa função será atribuída ao Lambda para executar o padrão.

DBA

Criar a função do Lambda.

Crie uma função do Lambda que leia o nome do arquivo do Amazon S3 (por exemplo file_key = info.get('object', {}).get('key'),) e chame a função de banco de dados (por exemplo, curs.callproc("load_external_tables", [file_key])) com o nome do arquivo como parâmetro de entrada.

Dependendo do resultado da chamada de função, uma SNS notificação será iniciada (por exemplo,client.publish(TopicArn='arn:',Message='fileloadsuccess',Subject='fileloadsuccess')).

Com base nas necessidades da sua empresa, você pode criar uma função do Lambda com código extra, se necessário. Para obter mais informações, consulte a documentação do Lambda.

DBA

Configurar um gatilho do evento do bucket do S3.

Configure um mecanismo para chamar a função do Lambda para todos os eventos de criação de objetos no bucket do S3.

DBA

Criar um segredo.

Crie um nome secreto para as credenciais do banco de dados usando o Secrets Manager. Passe o segredo na função do Lambda.

DBA

Faça upload dos arquivos de suporte do Lambda.

Faça upload de um arquivo.zip que contenha os pacotes de suporte do Lambda e o script Python anexado para conexão com o Aurora Postgre -Compatível. SQL O código Python chama a função que você criou no banco de dados.

DBA

Criar um tópico do SNS.

Crie um SNS tópico para enviar e-mails sobre o sucesso ou a falha do carregamento de dados.

DBA
TarefaDescriçãoHabilidades necessárias

Criar um bucket do S3.

No console do Amazon S3, você criará um bucket do S3 com um nome exclusivo que não contenha barras iniciais. O nome de um bucket do S3 é globalmente exclusivo e o namespace é compartilhado por todas as contas. AWS

DBA

Crie IAM políticas.

Para criar as políticas de AWS Identity and Access Management (IAM), use o código abaixo s3bucketpolicy_for_import na seção Informações adicionais.

DBA

Criar funções.

Crie duas funções para Aurora Postgre SQL -Compatível, uma função para Importar e outra para Exportar. Atribua as políticas correspondentes às funções.

DBA

Anexe as funções ao cluster compatível com o Aurora PostgreSQL.

Em Gerenciar funções, anexe as funções de importação e exportação ao cluster Aurora SQL Postgre.

DBA

Crie objetos de suporte para Aurora SQL Postgre -Compatível.

Para os scripts de tabela, use o código em ext_tbl_scripts na seção Informações adicionais.

Para a função personalizada, use o código em load_external_Table_latest na seção Informações adicionais.

DBA
TarefaDescriçãoHabilidades necessárias

Faça upload de um arquivo no bucket do S3.

Para fazer upload de um arquivo de teste no bucket do S3, use o console ou o comando a seguir no AWSCLI. 

aws s3 cp /Users/Desktop/ukpost/exttbl/"testing files"/aps s3://s3importtest/inputext/aps

Assim que o arquivo é carregado, um evento de bucket inicia a função Lambda, que executa a função SQL Aurora Postgre -Compatible.

DBA

Verifique os dados e os arquivos de log e erro.

A função Aurora Postgre SQL -Compatible carrega os arquivos na tabela principal .log e cria .bad arquivos no bucket do S3.

DBA

Monitore a solução.

No CloudWatch console da Amazon, monitore a função Lambda.

DBA

Recursos relacionados

Mais informações

ext_table_scripts

CREATE EXTENSION aws_s3 CASCADE; CREATE TABLE IF NOT EXISTS meta_EXTERNAL_TABLE (     table_name_stg character varying(100) ,     table_name character varying(100)  ,     col_list character varying(1000)  ,     data_type character varying(100)  ,     col_order numeric,     start_pos numeric,     end_pos numeric,     no_position character varying(100)  ,     date_mask character varying(100)  ,     delimeter character(1)  ,     directory character varying(100)  ,     file_name character varying(100)  ,     header_exist character varying(5) ); CREATE TABLE IF NOT EXISTS ext_tbl_stg (     col1 text ); CREATE TABLE IF NOT EXISTS error_table (     error_details text,     file_name character varying(100),     processed_time timestamp without time zone ); CREATE TABLE IF NOT EXISTS log_table (     file_name character varying(50) COLLATE pg_catalog."default",     processed_date timestamp without time zone,     tot_rec_count numeric,     proc_rec_count numeric,     error_rec_count numeric ); sample insert scripts of meta data: INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'source_filename', 'character varying', 2, 8, 27, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'record_type_identifier', 'character varying', 3, 28, 30, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'fad_code', 'numeric', 4, 31, 36, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'session_sequence_number', 'numeric', 5, 37, 42, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO'); INSERT INTO meta_EXTERNAL_TABLE (table_name_stg, table_name, col_list, data_type, col_order, start_pos, end_pos, no_position, date_mask, delimeter, directory, file_name, header_exist) VALUES ('F_EX_APS_TRANSACTIONS_STG', 'F_EX_APS_TRANSACTIONS', 'transaction_sequence_number', 'numeric', 6, 43, 48, NULL, NULL, NULL, 'databasedev', 'externalinterface/loaddir/APS', 'NO');

s3bucketpolicy_for import

---Import role policy --Create an IAM policy to allow, Get,  and list actions on S3 bucket  {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3import",             "Action": [                 "s3:GetObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest",                 "arn:aws:s3:::s3importtest/*"             ]         }     ] } --Export Role policy --Create an IAM policy to allow, put,  and list actions on S3 bucket {     "Version": "2012-10-17",     "Statement": [         {             "Sid": "s3export",             "Action": [                 "S3:PutObject",                 "s3:ListBucket"             ],             "Effect": "Allow",             "Resource": [                 "arn:aws:s3:::s3importtest/*"             ]         }     ] }

Exemplo de função de banco de dados load_external_tables_latest

CREATE OR REPLACE FUNCTION public.load_external_tables(pi_filename text)  RETURNS character varying  LANGUAGE plpgsql AS $function$ /* Loading data from S3 bucket into a APG table */ DECLARE  v_final_sql TEXT;  pi_ext_table TEXT;  r refCURSOR;  v_sqlerrm text;  v_chunk numeric;  i integer;  v_col_list TEXT;  v_postion_list CHARACTER VARYING(1000);  v_len  integer;  v_delim varchar;  v_file_name CHARACTER VARYING(1000);  v_directory CHARACTER VARYING(1000);  v_table_name_stg CHARACTER VARYING(1000);  v_sql_col TEXT;  v_sql TEXT;  v_sql1 TEXT;  v_sql2 TEXT;  v_sql3 TEXT;  v_cnt integer;  v_sql_dynamic TEXT;  v_sql_ins TEXT;  proc_rec_COUNT integer;  error_rec_COUNT integer;  tot_rec_COUNT integer;  v_rec_val integer;  rec record;  v_col_cnt integer;  kv record;  v_val text;  v_header text;  j integer;  ERCODE VARCHAR(5);  v_region text;  cr CURSOR FOR  SELECT distinct DELIMETER,    FILE_NAME,    DIRECTORY  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table    AND DELIMETER IS NOT NULL;  cr1 CURSOR FOR    SELECT   col_list,    data_type,    start_pos,    END_pos,    concat_ws('',' ',TABLE_NAME_STG) as TABLE_NAME_STG,    no_position,date_mask  FROM  meta_EXTERNAL_TABLE  WHERE table_name = pi_ext_table  order by col_order asc; cr2 cursor FOR SELECT  distinct table_name,table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename); BEGIN  -- PERFORM utl_file_utility.init();    v_region := 'us-east-1';    /* find tab details from file name */    --DELETE FROM  ERROR_TABLE WHERE file_name= pi_filename;   -- DELETE FROM  log_table WHERE file_name= pi_filename;  BEGIN    SELECT distinct table_name,table_name_stg INTO strict pi_ext_table,v_table_name_stg    FROM  meta_EXTERNAL_TABLE    WHERE upper(file_name) = upper(pi_filename);  EXCEPTION    WHEN NO_DATA_FOUND THEN     raise notice 'error 1,%',sqlerrm;     pi_ext_table := null;     v_table_name_stg := null;       RAISE USING errcode = 'NTFIP' ;     when others then         raise notice 'error others,%',sqlerrm;  END;  j :=1 ;    for rec in  cr2  LOOP   pi_ext_table     := rec.table_name;   v_table_name_stg := rec.table_name_stg;   v_col_list := null;  IF pi_ext_table IS NOT NULL   THEN     --EXECUTE concat_ws('','truncate table  ' ,pi_ext_table) ;    EXECUTE concat_ws('','truncate table  ' ,v_table_name_stg) ;        SELECT distinct DELIMETER INTO STRICT v_delim        FROM  meta_EXTERNAL_TABLE        WHERE table_name = pi_ext_table;        IF v_delim IS NOT NULL THEN      SELECT distinct DELIMETER,        FILE_NAME,        DIRECTORY ,        concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist      INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table        AND DELIMETER IS NOT NULL;      IF    upper(v_delim) = 'CSV'      THEN        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3 ( ''',        v_table_name_stg,''','''',        ''DELIMITER '''','''' CSV HEADER QUOTE ''''"'''''', aws_commons.create_s3_uri ( ''',        v_directory,''',''',v_file_name,''', ''',v_region,'''))');        ELSE        v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',            v_table_name_stg, ''','''', ''DELIMITER AS ''''^''''',''',','           aws_commons.create_s3_uri            ( ''',v_directory, ''',''',            v_file_name, ''',',             '''',v_region,''')           )');           raise notice 'v_sql , %',v_sql;        begin         EXECUTE  v_sql;        EXCEPTION          WHEN OTHERS THEN            raise notice 'error 1';          RAISE USING errcode = 'S3IMP' ;        END;        select count(col_list) INTO v_col_cnt        from  meta_EXTERNAL_TABLE where table_name = pi_ext_table;         -- raise notice 'v_sql 2, %',concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        execute concat_ws('','update ',v_table_name_stg, ' set col1 = col1||''',v_delim,'''');        i :=1;        FOR rec in cr1        loop        v_sql1 := concat_ws('',v_sql1,'split_part(col1,''',v_delim,''',', i,')',' as ',rec.col_list,',');        v_sql2 := concat_ws('',v_sql2,rec.col_list,',');    --    v_sql3 := concat_ws('',v_sql3,'rec.',rec.col_list,'::',rec.data_type,',');        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  coalesce((trim(split_part(col1,''',v_delim,''',', i,')))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask =  'MM/DD/YYYY hh24:mi:ss'          THEN v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),''01/01/9999 0024:00:00''),''MM/DD/YYYY hh24:mi:ss'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql3 := concat_ws('',v_sql3,' case WHEN length(trim(split_part(col1,''',v_delim,''',', i,'))) =0                 THEN null                  ELSE                   coalesce((trim(split_part(col1,''',v_delim,''',', i,'))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;        i :=i+1;        end loop;          -- raise notice 'v_sql 3, %',v_sql3;        SELECT trim(trailing ' ' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ',' FROM v_sql1) INTO v_sql1;        SELECT trim(trailing ' ' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ',' FROM v_sql2) INTO v_sql2;        SELECT trim(trailing ' ' FROM v_sql3) INTO v_sql3;        SELECT trim(trailing ',' FROM v_sql3) INTO v_sql3;        END IF;       raise notice 'v_delim , %',v_delim;      EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)  INTO v_cnt;     raise notice 'stg cnt , %',v_cnt;     /* if upper(v_delim) = 'CSV' then        v_sql_ins := concat_ws('', ' SELECT * from ' ,v_table_name_stg );      else       -- v_sql_ins := concat_ws('',' SELECT ',v_sql1,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ')sub ');        END IF;*/ v_chunk := v_cnt/100; for i in 1..101 loop      BEGIN     -- raise notice 'v_sql , %',v_sql;        -- raise notice 'Chunk number , %',i;        v_sql_ins := concat_ws('',' SELECT ',v_sql3,'  from (select col1 from ' ,v_table_name_stg , ' offset ',v_chunk*(i-1), ' limit ',v_chunk,') sub ');      v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);      -- raise notice 'select statement , %',v_sql_ins;           -- v_sql := null;      -- EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins, 'offset ',v_chunk*(i-1), ' limit ',v_chunk );      --v_sql := concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins );      -- raise notice 'insert statement , %',v_sql;     raise NOTICE 'CHUNK START %',v_chunk*(i-1);    raise NOTICE 'CHUNK END %',v_chunk;      EXECUTE v_sql;   EXCEPTION        WHEN OTHERS THEN        -- v_sql_ins := concat_ws('',' SELECT ',v_sql1, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          -- raise notice 'Chunk number for cursor , %',i;     raise NOTICE 'Cursor - CHUNK START %',v_chunk*(i-1);    raise NOTICE 'Cursor -  CHUNK END %',v_chunk;          v_sql_ins := concat_ws('',' SELECT ',v_sql3, '  from (select col1 from ' ,v_table_name_stg , ' )sub ');          v_final_sql := REPLACE (v_sql_ins, ''''::text, ''''''::text);         -- raise notice 'v_final_sql %',v_final_sql;          v_sql :=concat_ws('','do $a$ declare  r refcursor;v_sql text; i numeric;v_conname text;  v_typ  ',pi_ext_table,'[]; v_rec  ','record',';            begin            open r for execute ''select col1 from ',v_table_name_stg ,'  offset ',v_chunk*(i-1), ' limit ',v_chunk,''';            loop            begin            fetch r into v_rec;            EXIT WHEN NOT FOUND;            v_sql := concat_ws('''',''insert into  ',pi_ext_table,' SELECT ',REPLACE (v_sql3, ''''::text, ''''''::text) , '  from ( select '''''',v_rec.col1,'''''' as col1) v'');             execute v_sql;            exception             when others then           v_sql := ''INSERT INTO  ERROR_TABLE VALUES (concat_ws('''''''',''''Error Name: '''',$$''||SQLERRM||''$$,''''Error State: '''',''''''||SQLSTATE||'''''',''''record : '''',$$''||v_rec.col1||''$$),'''''||pi_filename||''''',now())'';                execute v_sql;              continue;            end ;            end loop;            close r;            exception            when others then          raise;            end ; $a$');       -- raise notice ' inside excp v_sql %',v_sql;           execute v_sql;       --  raise notice 'v_sql %',v_sql;        END;   END LOOP;      ELSE      SELECT distinct DELIMETER,FILE_NAME,DIRECTORY ,concat_ws('',' ',table_name_stg),        case  header_exist when 'YES' then 'CSV HEADER' else 'CSV' end as header_exist        INTO STRICT v_delim,v_file_name,v_directory,v_table_name_stg,v_header      FROM  meta_EXTERNAL_TABLE      WHERE table_name = pi_ext_table                  ;      v_sql := concat_ws('','SELECT aws_s3.table_import_FROM_s3(''',        v_table_name_stg, ''','''', ''DELIMITER AS ''''#'''' ',v_header,' '',','       aws_commons.create_s3_uri        ( ''',v_directory, ''',''',        v_file_name, ''',',         '''',v_region,''')       )');          EXECUTE  v_sql;      FOR rec in cr1      LOOP       IF rec.start_pos IS NULL AND rec.END_pos IS NULL AND rec.no_position = 'recnum'       THEN         v_rec_val := 1;       ELSE        case          WHEN upper(rec.data_type) = 'NUMERIC'          THEN v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1)))::NUMERIC,0)::',rec.data_type,' END as ',rec.col_list,',') ;          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDD'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''99990101''),''YYYYMMDD'')::',rec.data_type,' END as ',rec.col_list,',');          WHEN UPPER(rec.data_type) = 'TIMESTAMP WITHOUT TIME ZONE' AND rec.date_mask = 'YYYYMMDDHH24MISS'          THEN v_sql1 := concat_ws('','case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                  to_date(coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),''9999010100240000''),''YYYYMMDDHH24MISS'')::',rec.data_type,' END as ',rec.col_list,',');           ELSE         v_sql1 := concat_ws('',' case WHEN length(trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))) =0                 THEN null                  ELSE                   coalesce((trim(substring(COL1, ',rec.start_pos ,',', rec.END_pos,'-',rec.start_pos ,'+1))),'''')::',rec.data_type,' END as ',rec.col_list,',') ;        END case;       END IF;       v_col_list := concat_ws('',v_col_list ,v_sql1);      END LOOP;            SELECT trim(trailing ' ' FROM v_col_list) INTO v_col_list;            SELECT trim(trailing ',' FROM v_col_list) INTO v_col_list;            v_sql_col   :=  concat_ws('',trim(trailing ',' FROM v_col_list) , ' FROM  ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 ');            v_sql_dynamic := v_sql_col;            EXECUTE  concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg) INTO v_cnt;          IF v_rec_val = 1 THEN              v_sql_ins := concat_ws('',' select row_number() over(order by ctid) as line_number ,' ,v_sql_dynamic) ;          ELSE                v_sql_ins := concat_ws('',' SELECT' ,v_sql_dynamic) ;            END IF;      BEGIN        EXECUTE concat_ws('','insert into  ', pi_ext_table ,' ', v_sql_ins);            EXCEPTION               WHEN OTHERS THEN           IF v_rec_val = 1 THEN                   v_final_sql := ' select row_number() over(order by ctid) as line_number ,col1 from ';                 ELSE                  v_final_sql := ' SELECT col1 from';                END IF;        v_sql :=concat_ws('','do $a$ declare  r refcursor;v_rec_val numeric := ',coalesce(v_rec_val,0),';line_number numeric; col1 text; v_typ  ',pi_ext_table,'[]; v_rec  ',pi_ext_table,';              begin              open r for execute ''',v_final_sql, ' ',v_table_name_stg,' WHERE col1 IS NOT NULL AND length(col1)>0 '' ;              loop              begin              if   v_rec_val = 1 then              fetch r into line_number,col1;              else              fetch r into col1;              end if;              EXIT WHEN NOT FOUND;               if v_rec_val = 1 then               select line_number,',trim(trailing ',' FROM v_col_list) ,' into v_rec;               else                 select ',trim(trailing ',' FROM v_col_list) ,' into v_rec;               end if;              insert into  ',pi_ext_table,' select v_rec.*;               exception               when others then                INSERT INTO  ERROR_TABLE VALUES (concat_ws('''',''Error Name: '',SQLERRM,''Error State: '',SQLSTATE,''record : '',v_rec),''',pi_filename,''',now());                continue;               end ;                end loop;              close r;               exception               when others then               raise;               end ; $a$');          execute v_sql;      END;          END IF;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  ' ,pi_ext_table)   INTO proc_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM  error_table WHERE file_name =''',pi_filename,''' and processed_time::date = clock_timestamp()::date')  INTO error_rec_COUNT;    EXECUTE concat_ws('','SELECT COUNT(*) FROM ',v_table_name_stg)   INTO tot_rec_COUNT;    INSERT INTO  log_table values(pi_filename,now(),tot_rec_COUNT,proc_rec_COUNT, error_rec_COUNT);    raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT replace(trim(substring(error_details,position(''('' in error_details)+1),'')''),'','','';''),file_name,processed_time FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    ); raise notice 'v_directory, %',v_directory;    raise notice 'pi_filename, %',pi_filename;    raise notice 'v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  log_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.log', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );    END IF;  j := j+1;  END LOOP;        RETURN 'OK'; EXCEPTION     WHEN  OTHERS THEN   raise notice 'error %',sqlerrm;    ERCODE=SQLSTATE;    IF ERCODE = 'NTFIP' THEN      v_sqlerrm := concat_Ws('',sqlerrm,'No data for the filename');    ELSIF ERCODE = 'S3IMP' THEN     v_sqlerrm := concat_Ws('',sqlerrm,'Error While exporting the file from S3');    ELSE       v_sqlerrm := sqlerrm;    END IF;  select distinct directory into v_directory from  meta_EXTERNAL_TABLE;  raise notice 'exc v_directory, %',v_directory;    raise notice 'exc pi_filename, %',pi_filename;    raise notice 'exc v_region, %',v_region;   perform aws_s3.query_export_to_s3('SELECT * FROM  error_table WHERE file_name = '''||pi_filename||'''',    aws_commons.create_s3_uri(v_directory, pi_filename||'.bad', v_region),    options :='FORmat csv, header, delimiter $$,$$'    );     RETURN null; END; $function$