次に、チュートリアル: DynamoDB Streams Kinesis Adapter で説明したタスクを実行する、完成した Java プログラムを次に示します。実行すると、次のような出力が表示されます。
Creating table KCL-Demo-src Creating table KCL-Demo-dest Table is active. Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601 Starting worker... Scan result is equal. Done.
重要
このプログラムを実行するには、クライアントアプリケーションがポリシーを使用して DynamoDB および Amazon CloudWatch にアクセスできることを確認します。詳細については、「DynamoDB のアイデンティティベースのポリシー」を参照してください。
ソースコードは、4 つの .java
ファイルから構成されています。
-
StreamsAdapterDemo.java
-
StreamsRecordProcessor.java
-
StreamsRecordProcessorFactory.java
-
StreamsAdapterDemoHelper.java
StreamsAdapterDemo.java
package com.amazonaws.codesamples;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClientBuilder;
import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
public class StreamsAdapterDemo {
private static Worker worker;
private static KinesisClientLibConfiguration workerConfig;
private static IRecordProcessorFactory recordProcessorFactory;
private static AmazonDynamoDB dynamoDBClient;
private static AmazonCloudWatch cloudWatchClient;
private static AmazonDynamoDBStreams dynamoDBStreamsClient;
private static AmazonDynamoDBStreamsAdapterClient adapterClient;
private static String tablePrefix = "KCL-Demo";
private static String streamArn;
private static Regions awsRegion = Regions.US_EAST_2;
private static AWSCredentialsProvider awsCredentialsProvider = DefaultAWSCredentialsProviderChain.getInstance();
/**
* @param args
*/
public static void main(String[] args) throws Exception {
System.out.println("Starting demo...");
dynamoDBClient = AmazonDynamoDBClientBuilder.standard()
.withRegion(awsRegion)
.build();
cloudWatchClient = AmazonCloudWatchClientBuilder.standard()
.withRegion(awsRegion)
.build();
dynamoDBStreamsClient = AmazonDynamoDBStreamsClientBuilder.standard()
.withRegion(awsRegion)
.build();
adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);
String srcTable = tablePrefix + "-src";
String destTable = tablePrefix + "-dest";
recordProcessorFactory = new StreamsRecordProcessorFactory(dynamoDBClient, destTable);
setUpTables();
workerConfig = new KinesisClientLibConfiguration("streams-adapter-demo",
streamArn,
awsCredentialsProvider,
"streams-demo-worker")
.withMaxRecords(1000)
.withIdleTimeBetweenReadsInMillis(500)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
System.out.println("Creating worker for stream: " + streamArn);
worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(recordProcessorFactory, workerConfig, adapterClient,
dynamoDBClient, cloudWatchClient);
System.out.println("Starting worker...");
Thread t = new Thread(worker);
t.start();
Thread.sleep(25000);
worker.shutdown();
t.join();
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
.equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
System.out.println("Scan result is equal.");
} else {
System.out.println("Tables are different!");
}
System.out.println("Done.");
cleanupAndExit(0);
}
private static void setUpTables() {
String srcTable = tablePrefix + "-src";
String destTable = tablePrefix + "-dest";
streamArn = StreamsAdapterDemoHelper.createTable(dynamoDBClient, srcTable);
StreamsAdapterDemoHelper.createTable(dynamoDBClient, destTable);
awaitTableCreation(srcTable);
performOps(srcTable);
}
private static void awaitTableCreation(String tableName) {
Integer retries = 0;
Boolean created = false;
while (!created && retries < 100) {
DescribeTableResult result = StreamsAdapterDemoHelper.describeTable(dynamoDBClient, tableName);
created = result.getTable().getTableStatus().equals("ACTIVE");
if (created) {
System.out.println("Table is active.");
return;
} else {
retries++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// do nothing
}
}
}
System.out.println("Timeout after table creation. Exiting...");
cleanupAndExit(1);
}
private static void performOps(String tableName) {
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
}
private static void cleanupAndExit(Integer returnValue) {
String srcTable = tablePrefix + "-src";
String destTable = tablePrefix + "-dest";
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
System.exit(returnValue);
}
}
StreamsRecordProcessor.java
package com.amazonaws.codesamples;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
import java.nio.charset.Charset;
public class StreamsRecordProcessor implements IRecordProcessor {
private Integer checkpointCounter;
private final AmazonDynamoDB dynamoDBClient;
private final String tableName;
public StreamsRecordProcessor(AmazonDynamoDB dynamoDBClient2, String tableName) {
this.dynamoDBClient = dynamoDBClient2;
this.tableName = tableName;
}
@Override
public void initialize(InitializationInput initializationInput) {
checkpointCounter = 0;
}
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
for (Record record : processRecordsInput.getRecords()) {
String data = new String(record.getData().array(), Charset.forName("UTF-8"));
System.out.println(data);
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
switch (streamRecord.getEventName()) {
case "INSERT":
case "MODIFY":
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getNewImage());
break;
case "REMOVE":
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getKeys().get("Id").getN());
}
}
checkpointCounter += 1;
if (checkpointCounter % 10 == 0) {
try {
processRecordsInput.getCheckpointer().checkpoint();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
try {
shutdownInput.getCheckpointer().checkpoint();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
StreamsRecordProcessorFactory.java
package com.amazonaws.codesamples;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
private final String tableName;
private final AmazonDynamoDB dynamoDBClient;
public StreamsRecordProcessorFactory(AmazonDynamoDB dynamoDBClient, String tableName) {
this.tableName = tableName;
this.dynamoDBClient = dynamoDBClient;
}
@Override
public IRecordProcessor createProcessor() {
return new StreamsRecordProcessor(dynamoDBClient, tableName);
}
}
StreamsAdapterDemoHelper.java
package com.amazonaws.codesamples;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
import com.amazonaws.services.dynamodbv2.model.DeleteItemRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.PutItemRequest;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ScanRequest;
import com.amazonaws.services.dynamodbv2.model.ScanResult;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.model.UpdateItemRequest;
public class StreamsAdapterDemoHelper {
/**
* @return StreamArn
*/
public static String createTable(AmazonDynamoDB client, String tableName) {
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
// key
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
.withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
try {
System.out.println("Creating table " + tableName);
CreateTableResult result = client.createTable(createTableRequest);
return result.getTableDescription().getLatestStreamArn();
} catch (ResourceInUseException e) {
System.out.println("Table already exists.");
return describeTable(client, tableName).getTable().getLatestStreamArn();
}
}
public static DescribeTableResult describeTable(AmazonDynamoDB client, String tableName) {
return client.describeTable(new DescribeTableRequest().withTableName(tableName));
}
public static ScanResult scanTable(AmazonDynamoDB dynamoDBClient, String tableName) {
return dynamoDBClient.scan(new ScanRequest().withTableName(tableName));
}
public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) {
java.util.Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
item.put("Id", new AttributeValue().withN(id));
item.put("attribute-1", new AttributeValue().withS(val));
PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(item);
dynamoDBClient.putItem(putItemRequest);
}
public static void putItem(AmazonDynamoDB dynamoDBClient, String tableName,
java.util.Map<String, AttributeValue> items) {
PutItemRequest putItemRequest = new PutItemRequest().withTableName(tableName).withItem(items);
dynamoDBClient.putItem(putItemRequest);
}
public static void updateItem(AmazonDynamoDB dynamoDBClient, String tableName, String id, String val) {
java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put("Id", new AttributeValue().withN(id));
Map<String, AttributeValueUpdate> attributeUpdates = new HashMap<String, AttributeValueUpdate>();
AttributeValueUpdate update = new AttributeValueUpdate().withAction(AttributeAction.PUT)
.withValue(new AttributeValue().withS(val));
attributeUpdates.put("attribute-2", update);
UpdateItemRequest updateItemRequest = new UpdateItemRequest().withTableName(tableName).withKey(key)
.withAttributeUpdates(attributeUpdates);
dynamoDBClient.updateItem(updateItemRequest);
}
public static void deleteItem(AmazonDynamoDB dynamoDBClient, String tableName, String id) {
java.util.Map<String, AttributeValue> key = new HashMap<String, AttributeValue>();
key.put("Id", new AttributeValue().withN(id));
DeleteItemRequest deleteItemRequest = new DeleteItemRequest().withTableName(tableName).withKey(key);
dynamoDBClient.deleteItem(deleteItemRequest);
}
}