LambdaFunctionProcessor

class aws_cdk.aws_kinesisfirehose.LambdaFunctionProcessor(lambda_function, *, buffer_interval=None, buffer_size=None, retries=None)

Bases: object

(experimental) Use an AWS Lambda function to transform records.

Stability:

experimental

ExampleMetadata:

lit=../aws-kinesisfirehose-destinations/test/integ.s3-bucket.lit.ts infused

Example:

import path as path
import aws_cdk.aws_kinesisfirehose as firehose
import aws_cdk.aws_kms as kms
import aws_cdk.aws_lambda_nodejs as lambdanodejs
import aws_cdk.aws_logs as logs
import aws_cdk.aws_s3 as s3
import aws_cdk.core as cdk
import aws_cdk.aws_kinesisfirehose_destinations as destinations

app = cdk.App()

stack = cdk.Stack(app, "aws-cdk-firehose-delivery-stream-s3-all-properties")

bucket = s3.Bucket(stack, "Bucket",
    removal_policy=cdk.RemovalPolicy.DESTROY,
    auto_delete_objects=True
)

backup_bucket = s3.Bucket(stack, "BackupBucket",
    removal_policy=cdk.RemovalPolicy.DESTROY,
    auto_delete_objects=True
)
log_group = logs.LogGroup(stack, "LogGroup",
    removal_policy=cdk.RemovalPolicy.DESTROY
)

data_processor_function = lambdanodejs.NodejsFunction(stack, "DataProcessorFunction",
    entry=path.join(__dirname, "lambda-data-processor.js"),
    timeout=cdk.Duration.minutes(1)
)

processor = firehose.LambdaFunctionProcessor(data_processor_function,
    buffer_interval=cdk.Duration.seconds(60),
    buffer_size=cdk.Size.mebibytes(1),
    retries=1
)

key = kms.Key(stack, "Key",
    removal_policy=cdk.RemovalPolicy.DESTROY
)

backup_key = kms.Key(stack, "BackupKey",
    removal_policy=cdk.RemovalPolicy.DESTROY
)

firehose.DeliveryStream(stack, "Delivery Stream",
    destinations=[destinations.S3Bucket(bucket,
        logging=True,
        log_group=log_group,
        processor=processor,
        compression=destinations.Compression.GZIP,
        data_output_prefix="regularPrefix",
        error_output_prefix="errorPrefix",
        buffering_interval=cdk.Duration.seconds(60),
        buffering_size=cdk.Size.mebibytes(1),
        encryption_key=key,
        s3_backup=destinations.DestinationS3BackupProps(
            mode=destinations.BackupMode.ALL,
            bucket=backup_bucket,
            compression=destinations.Compression.ZIP,
            data_output_prefix="backupPrefix",
            error_output_prefix="backupErrorPrefix",
            buffering_interval=cdk.Duration.seconds(60),
            buffering_size=cdk.Size.mebibytes(1),
            encryption_key=backup_key
        )
    )]
)

app.synth()
Parameters:
  • lambda_function (IFunction)

  • buffer_interval (Optional[Duration]) – (experimental) The length of time Kinesis Data Firehose will buffer incoming data before calling the processor. s Default: Duration.minutes(1)

  • buffer_size (Optional[Size]) – (experimental) The amount of incoming data Kinesis Data Firehose will buffer before calling the processor. Default: Size.mebibytes(3)

  • retries (Union[int, float, None]) – (experimental) The number of times Kinesis Data Firehose will retry the processor invocation after a failure due to network timeout or invocation limits. Default: 3

Stability:

experimental

Methods

bind(_scope, *, role)

(experimental) Binds this processor to a destination of a delivery stream.

Implementers should use this method to grant processor invocation permissions to the provided stream and return the necessary configuration to register as a processor.

Parameters:
  • _scope (Construct)

  • role (IRole) – (experimental) The IAM role assumed by Kinesis Data Firehose to write to the destination that this DataProcessor will bind to.

Stability:

experimental

Return type:

DataProcessorConfig

Attributes

props

(experimental) The constructor props of the LambdaFunctionProcessor.

Stability:

experimental