There are more AWS SDK examples available in the AWS Doc SDK Examples
Firehose examples using SDK for Java 2.x
The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Java 2.x with Firehose.
Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.
Scenarios are code examples that show you how to accomplish specific tasks by calling multiple functions within a service or combined with other AWS services.
Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.
Actions
The following code example shows how to use PutRecord
.
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. /** * Puts a record to the specified Amazon Kinesis Data Firehose delivery stream. * * @param record The record to be put to the delivery stream. The record must be a {@link Map} of String keys and Object values. * @param deliveryStreamName The name of the Amazon Kinesis Data Firehose delivery stream to which the record should be put. * @throws IllegalArgumentException if the input record or delivery stream name is null or empty. * @throws RuntimeException if there is an error putting the record to the delivery stream. */ public static void putRecord(Map<String, Object> record, String deliveryStreamName) { if (record == null || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: record or delivery stream name cannot be null/empty"); } try { String jsonRecord = new ObjectMapper().writeValueAsString(record); Record firehoseRecord = Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); PutRecordRequest putRecordRequest = PutRecordRequest.builder() .deliveryStreamName(deliveryStreamName) .record(firehoseRecord) .build(); getFirehoseClient().putRecord(putRecordRequest); System.out.println("Record sent: " + jsonRecord); } catch (Exception e) { throw new RuntimeException("Failed to put record: " + e.getMessage(), e); } }
-
For API details, see PutRecord in AWS SDK for Java 2.x API Reference.
-
The following code example shows how to use PutRecordBatch
.
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. /** * Puts a batch of records to an Amazon Kinesis Data Firehose delivery stream. * * @param records a list of maps representing the records to be sent * @param batchSize the maximum number of records to include in each batch * @param deliveryStreamName the name of the Kinesis Data Firehose delivery stream * @throws IllegalArgumentException if the input parameters are invalid (null or empty) * @throws RuntimeException if there is an error putting the record batch */ public static void putRecordBatch(List<Map<String, Object>> records, int batchSize, String deliveryStreamName) { if (records == null || records.isEmpty() || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: records or delivery stream name cannot be null/empty"); } ObjectMapper objectMapper = new ObjectMapper(); try { for (int i = 0; i < records.size(); i += batchSize) { List<Map<String, Object>> batch = records.subList(i, Math.min(i + batchSize, records.size())); List<Record> batchRecords = batch.stream().map(record -> { try { String jsonRecord = objectMapper.writeValueAsString(record); return Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); } catch (Exception e) { throw new RuntimeException("Error creating Firehose record", e); } }).collect(Collectors.toList()); PutRecordBatchRequest request = PutRecordBatchRequest.builder() .deliveryStreamName(deliveryStreamName) .records(batchRecords) .build(); PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request); if (response.failedPutCount() > 0) { response.requestResponses().stream() .filter(r -> r.errorCode() != null) .forEach(r -> System.err.println("Failed record: " + r.errorMessage())); } System.out.println("Batch sent with size: " + batchRecords.size()); } } catch (Exception e) { throw new RuntimeException("Failed to put record batch: " + e.getMessage(), e); } }
-
For API details, see PutRecordBatch in AWS SDK for Java 2.x API Reference.
-
Scenarios
The following code example shows how to use Firehose to process individual and batch records.
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. This example puts individual and batch records to Firehose.
/** * Amazon Firehose Scenario example using Java V2 SDK. * * Demonstrates individual and batch record processing, * and monitoring Firehose delivery stream metrics. */ public class FirehoseScenario { private static FirehoseClient firehoseClient; private static CloudWatchClient cloudWatchClient; public static void main(String[] args) { final String usage = """ Usage: <deliveryStreamName> Where: deliveryStreamName - The Firehose delivery stream name. """; if (args.length != 1) { System.out.println(usage); return; } String deliveryStreamName = args[0]; try { // Read and parse sample data. String jsonContent = readJsonFile("sample_records.json"); ObjectMapper objectMapper = new ObjectMapper(); List<Map<String, Object>> sampleData = objectMapper.readValue(jsonContent, new TypeReference<>() {}); // Process individual records. System.out.println("Processing individual records..."); sampleData.subList(0, 100).forEach(record -> { try { putRecord(record, deliveryStreamName); } catch (Exception e) { System.err.println("Error processing record: " + e.getMessage()); } }); // Monitor metrics. monitorMetrics(deliveryStreamName); // Process batch records. System.out.println("Processing batch records..."); putRecordBatch(sampleData.subList(100, sampleData.size()), 500, deliveryStreamName); monitorMetrics(deliveryStreamName); } catch (Exception e) { System.err.println("Scenario failed: " + e.getMessage()); } finally { closeClients(); } } private static FirehoseClient getFirehoseClient() { if (firehoseClient == null) { firehoseClient = FirehoseClient.create(); } return firehoseClient; } private static CloudWatchClient getCloudWatchClient() { if (cloudWatchClient == null) { cloudWatchClient = CloudWatchClient.create(); } return cloudWatchClient; } /** * Puts a record to the specified Amazon Kinesis Data Firehose delivery stream. * * @param record The record to be put to the delivery stream. The record must be a {@link Map} of String keys and Object values. * @param deliveryStreamName The name of the Amazon Kinesis Data Firehose delivery stream to which the record should be put. * @throws IllegalArgumentException if the input record or delivery stream name is null or empty. * @throws RuntimeException if there is an error putting the record to the delivery stream. */ public static void putRecord(Map<String, Object> record, String deliveryStreamName) { if (record == null || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: record or delivery stream name cannot be null/empty"); } try { String jsonRecord = new ObjectMapper().writeValueAsString(record); Record firehoseRecord = Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); PutRecordRequest putRecordRequest = PutRecordRequest.builder() .deliveryStreamName(deliveryStreamName) .record(firehoseRecord) .build(); getFirehoseClient().putRecord(putRecordRequest); System.out.println("Record sent: " + jsonRecord); } catch (Exception e) { throw new RuntimeException("Failed to put record: " + e.getMessage(), e); } } /** * Puts a batch of records to an Amazon Kinesis Data Firehose delivery stream. * * @param records a list of maps representing the records to be sent * @param batchSize the maximum number of records to include in each batch * @param deliveryStreamName the name of the Kinesis Data Firehose delivery stream * @throws IllegalArgumentException if the input parameters are invalid (null or empty) * @throws RuntimeException if there is an error putting the record batch */ public static void putRecordBatch(List<Map<String, Object>> records, int batchSize, String deliveryStreamName) { if (records == null || records.isEmpty() || deliveryStreamName == null || deliveryStreamName.isEmpty()) { throw new IllegalArgumentException("Invalid input: records or delivery stream name cannot be null/empty"); } ObjectMapper objectMapper = new ObjectMapper(); try { for (int i = 0; i < records.size(); i += batchSize) { List<Map<String, Object>> batch = records.subList(i, Math.min(i + batchSize, records.size())); List<Record> batchRecords = batch.stream().map(record -> { try { String jsonRecord = objectMapper.writeValueAsString(record); return Record.builder() .data(SdkBytes.fromByteArray(jsonRecord.getBytes(StandardCharsets.UTF_8))) .build(); } catch (Exception e) { throw new RuntimeException("Error creating Firehose record", e); } }).collect(Collectors.toList()); PutRecordBatchRequest request = PutRecordBatchRequest.builder() .deliveryStreamName(deliveryStreamName) .records(batchRecords) .build(); PutRecordBatchResponse response = getFirehoseClient().putRecordBatch(request); if (response.failedPutCount() > 0) { response.requestResponses().stream() .filter(r -> r.errorCode() != null) .forEach(r -> System.err.println("Failed record: " + r.errorMessage())); } System.out.println("Batch sent with size: " + batchRecords.size()); } } catch (Exception e) { throw new RuntimeException("Failed to put record batch: " + e.getMessage(), e); } } public static void monitorMetrics(String deliveryStreamName) { Instant endTime = Instant.now(); Instant startTime = endTime.minusSeconds(600); List<String> metrics = List.of("IncomingBytes", "IncomingRecords", "FailedPutCount"); metrics.forEach(metric -> monitorMetric(metric, startTime, endTime, deliveryStreamName)); } private static void monitorMetric(String metricName, Instant startTime, Instant endTime, String deliveryStreamName) { try { GetMetricStatisticsRequest request = GetMetricStatisticsRequest.builder() .namespace("AWS/Firehose") .metricName(metricName) .dimensions(Dimension.builder().name("DeliveryStreamName").value(deliveryStreamName).build()) .startTime(startTime) .endTime(endTime) .period(60) .statistics(Statistic.SUM) .build(); GetMetricStatisticsResponse response = getCloudWatchClient().getMetricStatistics(request); double totalSum = response.datapoints().stream().mapToDouble(Datapoint::sum).sum(); System.out.println(metricName + ": " + totalSum); } catch (Exception e) { System.err.println("Failed to monitor metric " + metricName + ": " + e.getMessage()); } } public static String readJsonFile(String fileName) throws IOException { try (InputStream inputStream = FirehoseScenario.class.getResourceAsStream("/" + fileName); Scanner scanner = new Scanner(inputStream, StandardCharsets.UTF_8)) { return scanner.useDelimiter("\\\\A").next(); } catch (Exception e) { throw new RuntimeException("Error reading file: " + fileName, e); } } private static void closeClients() { try { if (firehoseClient != null) firehoseClient.close(); if (cloudWatchClient != null) cloudWatchClient.close(); } catch (Exception e) { System.err.println("Error closing clients: " + e.getMessage()); } } }
-
For API details, see the following topics in AWS SDK for Java 2.x API Reference.
-