AWS Data Pipeline non è più disponibile per i nuovi clienti. I clienti esistenti di AWS Data Pipeline possono continuare a utilizzare il servizio normalmente. Ulteriori informazioni
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
RedshiftCopyActivity
Copia i dati da DynamoDB o Amazon S3 su Amazon Redshift. È possibile caricare i dati in una nuova tabella, oppure unirli facilmente alla tabella esistente.
Questa è una panoramica di un caso d'uso in cui utilizzare RedshiftCopyActivity
:
-
Inizia a AWS Data Pipeline utilizzarlo per lo staging dei dati in Amazon S3.
-
RedshiftCopyActivity
Utilizzalo per spostare i dati da Amazon RDS e Amazon EMR ad Amazon Redshift.In questo modo puoi caricare i dati in Amazon Redshift dove puoi analizzarli.
-
SqlActivityUtilizzalo per eseguire SQL query sui dati che hai caricato in Amazon Redshift.
Inoltre, RedshiftCopyActivity
consente di lavorare con un S3DataNode
, poiché supporta un file manifest. Per ulteriori informazioni, consulta S3 DataNode.
Esempio
Di seguito è illustrato un esempio di questo tipo di oggetto.
Per garantire la conversione dei formati, questo esempio utilizza parametri EMPTYASNULLdi conversione IGNOREBLANKLINESspeciali in. commandOptions
Per informazioni, consulta i parametri di conversione dei dati nella Amazon Redshift Database Developer Guide.
{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }
L'esempio seguente di definizione di pipeline mostra un'attività che utilizza la modalità di inserimento APPEND
:
{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }
L'operazione APPEND
aggiunge gli elementi a una tabella indipendentemente dalle chiavi di ordinamento o primarie. Ad esempio, se si ha la seguente tabella, è possibile aggiungere un record con lo stesso ID e valore utente.
ID(PK) USER 1 aaa 2 bbb
È possibile aggiungere un record con lo stesso ID e valore utente:
ID(PK) USER 1 aaa 2 bbb 1 aaa
Nota
Se un'operazione APPEND
viene interrotta e riprovata, la risultante pipeline rieseguita viene potenzialmente aggiunta dall'inizio. L'operazione potrebbe causare un ulteriore doppione, quindi è necessario essere a conoscenza di questo comportamento, soprattutto se si ha una logica che conteggia il numero di righe.
Per un tutorial, vedere Copia i dati su Amazon Redshift utilizzando AWS Data Pipeline.
Sintassi
Campi obbligatori | Descrizione | Tipo di slot |
---|---|---|
insertMode |
Determina AWS Data Pipeline cosa fare con i dati preesistenti nella tabella di destinazione che si sovrappongono alle righe dei dati da caricare. I valori validi sono:
|
Enumerazione |
Campi Object Invocation | Descrizione | Tipo di slot |
---|---|---|
schedule |
Questo oggetto viene richiamato entro l'esecuzione di un intervallo di pianificazione. Specificare un riferimento alla pianificazione di un altro oggetto per impostare l'ordine di esecuzione delle dipendenze per questo oggetto. Nella maggior parte dei casi, è preferibile inserire il riferimento alla pianificazione nell'oggetto pipeline di default, in modo che tutti gli oggetti possano ereditare tale pianificazione. Ad esempio, è possibile impostare una pianificazione esplicitamente sull'oggetto, specificando Se la pianificazione master nella pipeline contiene pianificazioni nidificate, è possibile creare un oggetto padre che dispone di un riferimento alla pianificazione. Per ulteriori informazioni sulle configurazioni di pianificazione opzionali di esempio, consulta Pianificazione. |
Reference Object, ad esempio: "schedule":{"ref":"myScheduleId"} |
Gruppo richiesto (uno dei seguenti è obbligatorio) | Descrizione | Tipo di slot |
---|---|---|
runsOn | Le risorse di calcolo per eseguire l'attività o il comando. Ad esempio, un'EC2istanza Amazon o un EMR cluster Amazon. | Oggetto di riferimento, ad esempio "runsOn«: {" ref»:» myResourceId «} |
workerGroup | Il gruppo di lavoro. Utilizzato per le attività di routing. Se si fornisce un runsOn valore ed workerGroup esiste, workerGroup viene ignorato. |
Stringa |
Campi opzionali | Descrizione | Tipo di slot |
---|---|---|
attemptStatus | Lo stato segnalato più di recente dall'attività remota. | Stringa |
attemptTimeout | Timeout per il completamento del lavoro in remoto. Se questo campo è impostato, un'attività remota che non viene completata entro il tempo impostato di avvio viene tentata di nuovo. | Periodo |
commandOptions |
Richiede i parametri da passare al nodo di dati Amazon Redshift durante l' Mentre carica la tabella, Se un formato di dati è associato al nodo di dati in ingresso o in uscita, allora i parametri forniti vengono ignorati. Poiché l'operazione di copia utilizza per prima cosa Inoltre, in alcuni casi, quando deve scaricare dati dal cluster Amazon Redshift e creare file in Amazon S3, si affida Per migliorare le prestazioni durante la copia e lo scaricamento, specificare il parametro |
Stringa |
dependsOn | Specifica una dipendenza su un altro oggetto eseguibile. | Reference Object: "dependsOn":{"ref":"myActivityId"} |
failureAndRerunModalità | Descrive il comportamento del nodo consumer quando le dipendenze presentano un errore o vengono di nuovo eseguite | Enumerazione |
input | Nodo dei dati di input. L'origine dati può essere Amazon S3, DynamoDB o Amazon Redshift. | Reference Object: "input":{"ref":"myDataNodeId"} |
lateAfterTimeout | Il tempo trascorso dopo l'inizio della pipeline entro il quale l'oggetto deve essere completato. Viene attivato solo quando il tipo di pianificazione non è impostato su. ondemand |
Periodo |
maxActiveInstances | Il numero massimo di istanze attive simultanee di un componente. Le riesecuzioni non contano ai fini del numero di istanze attive. | Numero intero |
maximumRetries | Numero massimo di tentativi in caso di errore | Numero intero |
onFail | Un'azione da eseguire quando l'oggetto corrente ha esito negativo. | Reference Object: "onFail":{"ref":"myActionId"} |
onLateAction | Azioni che devono essere attivate se un oggetto non è stato ancora pianificato o non è ancora completo. | Reference Object: "onLateAction":{"ref":"myActionId"} |
onSuccess | Un'operazione da eseguire quando l'oggetto corrente ha esito positivo. | Reference Object: "onSuccess":{"ref":"myActionId"} |
output | Nodo dei dati di output. Il percorso di output può essere Amazon S3 o Amazon Redshift. | Reference Object: "output":{"ref":"myDataNodeId"} |
parent | Padre dell'oggetto corrente da cui saranno ereditati gli slot. | Reference Object: "parent":{"ref":"myBaseObjectId"} |
pipelineLogUri | L'S3 URI (come 's3://BucketName/Key/ ') per caricare i log per la pipeline. | Stringa |
precondizione | Definisce eventualmente una precondizione. Un nodo di dati non è contrassegnato con "READY" finché non sono state soddisfatte tutte le condizioni preliminari. | Reference Object: "precondition":{"ref":"myPreconditionId"} |
coda |
Corrisponde all' Amazon Redshift limita il numero di connessioni simultanee a 15. Per ulteriori informazioni, consulta Assigning Queries to Queues nella Amazon RDS Database Developer Guide. |
Stringa |
reportProgressTimeout |
Timeout per chiamate successive di attività in remoto a Se impostato, le attività in remoto che non presentano avanzamenti nel periodo specificato potrebbero essere considerate bloccate e sono quindi oggetto di un altro tentativo. |
Periodo |
retryDelay | La durata del timeout tra due tentativi. | Periodo |
scheduleType |
Consente di specificare se la pianificazione per gli oggetti è nella pipeline. I valori sono La pianificazione La pianificazione Una pianificazione Per utilizzare le pipeline Se utilizzi una pianificazione |
Enumerazione |
transformSql |
L'espressione Esegui l'espressione Quando copi dati da DynamoDB o Amazon S3 AWS Data Pipeline , crea una tabella chiamata «staging» e inizialmente carica i dati al suo interno. I dati di questa tabella vengono utilizzati per aggiornare la tabella di destinazione. Lo schema di output di Se si specifica l' |
Stringa |
Campi Runtime | Descrizione | Tipo di slot |
---|---|---|
@activeInstances | Elenco di oggetti di istanze attive attualmente programmate. | Reference Object: "activeInstances":{"ref":"myRunnableObjectId"} |
@actualEndTime | L'ora in cui è terminata l'esecuzione di questo oggetto. | DateTime |
@actualStartTime | L'ora in cui è stata avviata l'esecuzione di questo oggetto. | DateTime |
cancellationReason | Il cancellationReason se questo oggetto è stato annullato. | Stringa |
@cascadeFailedOn | Descrizione della catena di dipendenza che ha generato l'errore dell'oggetto. | Reference Object: "cascadeFailedOn":{"ref":"myRunnableObjectId"} |
emrStepLog | EMRi registri dei passaggi sono disponibili solo in caso di tentativi di attività EMR | Stringa |
errorId | Il errorId se questo oggetto ha fallito. | Stringa |
errorMessage | Il errorMessage se questo oggetto ha avuto esito negativo. | Stringa |
errorStackTrace | Traccia dello stack di errore se l'oggetto non è riuscito. | Stringa |
@finishedTime | L'ora in cui è terminata l'esecuzione di questo oggetto. | DateTime |
hadoopJobLog | I log dei job Hadoop sono disponibili per EMR i tentativi di attività basate su di esse. | Stringa |
@healthStatus | Lo stato di integrità dell'oggetto che riflette l'esito positivo o negativo dell'ultima istanza dell'oggetto che ha raggiunto lo stato di un'istanza terminata. | Stringa |
@healthStatusFromInstanceId | Id dell'ultimo oggetto dell'istanza che ha raggiunto lo stato terminato. | Stringa |
@ Ora healthStatusUpdated | L'ora in cui lo stato di integrità è stato aggiornato l'ultima volta. | DateTime |
hostname | Il nome host del client che si è aggiudicato il tentativo dell'attività. | Stringa |
@lastDeactivatedTime | L'ora in cui l'oggetto è stato disattivato. | DateTime |
@ latestCompletedRun Ora | L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata completata. | DateTime |
@latestRunTime | L'orario dell'esecuzione più recente durante il quale l'esecuzione è stata pianificata. | DateTime |
@nextRunTime | L'orario dell'esecuzione da programmare come successiva. | DateTime |
reportProgressTime | Il periodo di tempo più recente in cui l'attività remota ha segnalato un progresso. | DateTime |
@scheduledEndTime | L'orario di termine della pianificazione per un oggetto. | DateTime |
@scheduledStartTime | L'orario di inizio della pianificazione per l'oggetto. | DateTime |
@status | Lo stato di questo oggetto. | Stringa |
@version | Versione della pipeline con cui l'oggetto è stato creato. | Stringa |
@waitingOn | Descrizione dell'elenco di dipendenze per cui questo oggetto è in attesa. | Reference Object: "waitingOn":{"ref":"myRunnableObjectId"} |
Campi di sistema | Descrizione | Tipo di slot |
---|---|---|
@error | Errore che descrive il formato oggetto errato. | Stringa |
@pipelineId | L'id della pipeline a cui appartiene questo oggetto. | Stringa |
@sphere | La sfera di un oggetto. Indica la propria posizione nel ciclo di vita. Ad esempio, i Component Objects generano Instance Objects che eseguono Attempt Objects. | Stringa |