Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Programme complet : adaptateur DynamoDB Streams Kinesis
Voici le programme Java complet qui effectue les tâches décrites dans Démonstration : adaptateur Kinesis DynamoDB Streams. Lorsque vous l'exécutez, vous devez visualiser une sortie similaire à ce qui suit.
Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
Important
Pour exécuter ce programme, assurez-vous que l'application cliente a accès à DynamoDB et à CloudWatch Amazon à l'aide de politiques. Pour de plus amples informations, veuillez consulter Politiques basées sur l'identité pour DynamoDB.
Le code source se compose de quatre fichiers .java
:
-
StreamsAdapterDemo.java
-
StreamsRecordProcessor.java
-
StreamsRecordProcessorFactory.java
-
StreamsAdapterDemoHelper.java
StreamsAdapterDemo.java
package com.amazonaws.codesamples; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.regions.Regions; import com.amazonaws.services.cloudwatch.AmazonCloudWatch; import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder; import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; public class StreamsAdapterDemo { private static Worker worker; private static KinesisClientLibConfiguration workerConfig; private static IRecordProcessorFactory recordProcessorFactory; private static AmazonDynamoDB dynamoDBClient; private static AmazonCloudWatch cloudWatchClient; private static AmazonDynamoDBStreams dynamoDBStreamsClient; private static AmazonDynamoDBStreamsAdapterClient adapterClient; private static String tablePrefix = "KCL-Demo"; private static String streamArn; private static Regions awsRegion = Regions.US_EAST_2; private static AWSCredentialsProvider awsCredentialsProvider = DefaultAWSCredentialsProviderChain.getInstance(); /** * @param args */ public static void main(String[] args) throws Exception { System.out.println("Starting demo..."); dynamoDBClient = AmazonDynamoDBClientBuilder.standard() .withRegion(awsRegion) .build(); cloudWatchClient = AmazonCloudWatchClientBuilder.standard() .withRegion(awsRegion) .build(); dynamoDBStreamsClient = AmazonDynamoDBStreamsClientBuilder.standard() .withRegion(awsRegion) .build(); adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient); String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBClient, destTable); setUpTables(); workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo", streamArn, awsCredentialsProvider, "streams-demo-worker") .withMaxRecords(1000) .withIdleTimeBetweenReadsInMillis(500) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON); System.out.println("Creating worker for stream: " + streamArn); worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(recordProcessorFactory, workerConfig, adapterClient, dynamoDBClient, cloudWatchClient); System.out.println("Starting worker..."); Thread t = new Thread(worker); t.start(); Thread.sleep(25000); worker.shutdown(); t.join(); if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); } System.out.println("Done."); cleanupAndExit(0); } private static void setUpTables() { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; streamArn = StreamsAdapterDemoHelper.createTable(dynamoDBClient, srcTable); StreamsAdapterDemoHelper.createTable(dynamoDBClient, destTable); awaitTableCreation(srcTable); performOps(srcTable); } private static void awaitTableCreation(String tableName) { Integer retries = 0; Boolean created = false; while (!created && retries < 100) { DescribeTableResult result = StreamsAdapterDemoHelper.describeTable(dynamoDBClient, tableName); created = result.getTable().getTableStatus().equals("ACTIVE"); if (created) { System.out.println("Table is active."); return; } else { retries++; try { Thread.sleep(1000); } catch (InterruptedException e) { // do nothing } } } System.out.println("Timeout after table creation. Exiting..."); cleanupAndExit(1); } private static void performOps(String tableName) { StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102"); } private static void cleanupAndExit(Integer returnValue) { String srcTable = tablePrefix + "-src"; String destTable = tablePrefix + "-dest"; dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable)); System.exit(returnValue); } }
StreamsRecordProcessor.java
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.model.Record; import java.nio.charset.Charset; public class StreamsRecordProcessor implements IRecordProcessor { private Integer checkpointCounter; private final AmazonDynamoDB dynamoDBClient; private final String tableName; public StreamsRecordProcessor(AmazonDynamoDB dynamoDBClient2, String tableName) { this.dynamoDBClient = dynamoDBClient2; this.tableName = tableName; } @Override public void initialize(InitializationInput initializationInput) { checkpointCounter = 0; } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { processRecordsInput.getCheckpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }
StreamsRecordProcessorFactory.java
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { private final String tableName; private final AmazonDynamoDB dynamoDBClient; public StreamsRecordProcessorFactory(AmazonDynamoDB dynamoDBClient, String tableName) { this.tableName = tableName; this.dynamoDBClient = dynamoDBClient; } @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
StreamsAdapterDemoHelper.java
package com.amazonaws.codesamples; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.AttributeAction; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.CreateTableResult; import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; import com.amazonaws.services.dynamodbv2.model.DescribeTableResult; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.PutItemRequest; import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; import com.amazonaws.services.dynamodbv2.model.StreamSpecification; import com.amazonaws.services.dynamodbv2.model.StreamViewType; import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest; public class StreamsAdapterDemoHelper { /** * @return StreamArn */ public static String createTable(AmazonDynamoDB client, String tableName) { java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification); try { System.out.println("Creating table " + tableName); CreateTableResult result = client.createTable(createTableRequest); return result.getTableDescription().getLatestStreamArn(); } catch (ResourceInUseException e) { System.out.println("Table already exists."); return describeTable(client, tableName).getTable().getLatestStreamArn(); } } public static DescribeTableResult describeTable(AmazonDynamoDB client, String tableName) { return client.describeTable(new DescribeTableRequest().withTableName(tableName)); } public static ScanResult scanTable(AmazonDynamoDB dynamoDBClient, String tableName) { return dynamoDBClient.scan(new ScanRequest().withTableName(tableName)); } public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) { java.util.Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); item.put("Id", new AttributeValue().withN(id)); item.put("attribute-1", new AttributeValue().withS(val)); PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(item); dynamoDBClient.putItem(putItemRequest); } public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, java.util.Map<String, AttributeValue> items) { PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(items); dynamoDBClient.putItem(putItemRequest); } public static void updateItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>(); AttributeValueUpdate update = new AttributeValueUpdate().withAction(AttributeAction.PUT) .withValue(new AttributeValue().withS(val)); attributeUpdates.put("attribute-2", update); UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(tableName).withKey(key) .withAttributeUpdates(attributeUpdates); dynamoDBClient.updateItem(updateItemRequest); } public static void deleteItem(AmazonDynamoDB dynamoDBClient, String tableName, String id) { java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); key.put("Id", new AttributeValue().withN(id)); DeleteItemRequest deleteItemRequest = new DeleteItemRequest().withTableName(tableName).withKey(key); dynamoDBClient.deleteItem(deleteItemRequest); } }