@aws-cdk/aws-kinesisanalytics-flink-alpha module
| Language | Package | 
|---|---|
|  .NET | Amazon.CDK.AWS.Kinesisanalytics.Flink.Alpha | 
|  Go | github.com/aws/aws-cdk-go/awscdkkinesisanalyticsflinkalpha/v2 | 
|  Java | software.amazon.awscdk.services.kinesisanalytics.flink.alpha | 
|  Python | aws_cdk.aws_kinesisanalytics_flink_alpha | 
|  TypeScript | @aws-cdk/aws-kinesisanalytics-flink-alpha | 
Kinesis Analytics Flink
The APIs of higher level constructs in this module are experimental and under active development. They are subject to non-backward compatible changes or removal in any future version. These are not subject to the Semantic Versioning model and breaking changes will be announced in the release notes. This means that while you may use them, you may need to update your source code when upgrading to a newer version of this package.
This package provides constructs for creating Kinesis Analytics Flink applications. To learn more about using using managed Flink applications, see the AWS developer guide.
Creating Flink Applications
To create a new Flink application, use the Application construct:
import * as path from 'path';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as core from 'aws-cdk-lib';
import * as integ from '@aws-cdk/integ-tests-alpha';
import * as flink from '../lib';
const app = new core.App();
const stack = new core.Stack(app, 'FlinkAppTest');
const flinkRuntimes = [
  flink.Runtime.FLINK_1_6,
  flink.Runtime.FLINK_1_8,
  flink.Runtime.FLINK_1_11,
  flink.Runtime.FLINK_1_13,
  flink.Runtime.FLINK_1_15,
  flink.Runtime.FLINK_1_18,
  flink.Runtime.FLINK_1_19,
  flink.Runtime.FLINK_1_20,
];
flinkRuntimes.forEach((runtime) => {
  const flinkApp = new flink.Application(stack, `App-${runtime.value}`, {
    code: flink.ApplicationCode.fromAsset(path.join(__dirname, 'code-asset')),
    runtime: runtime,
  });
  new cloudwatch.Alarm(stack, `Alarm-${runtime.value}`, {
    metric: flinkApp.metricFullRestarts(),
    evaluationPeriods: 1,
    threshold: 3,
  });
});
new integ.IntegTest(app, 'ApplicationTest', {
  testCases: [stack],
});
The code property can use fromAsset as shown above to reference a local jar
file in s3 or fromBucket to reference a file in s3.
new flink.Application(stack, 'App', {
  code: flink.ApplicationCode.fromBucket(bucket, fileKey),
  runtime: flink.Runtime.FLINK_1_19,
});
The propertyGroups property provides a way of passing arbitrary runtime
properties to your Flink application. You can use the
aws-kinesisanalytics-runtime library to retrieve these
properties.
declare const bucket: s3.Bucket;
const flinkApp = new flink.Application(this, 'Application', {
  propertyGroups: {
    FlinkApplicationProperties: {
      inputStreamName: 'my-input-kinesis-stream',
      outputStreamName: 'my-output-kinesis-stream',
    },
  },
  // ...
  runtime: flink.Runtime.FLINK_1_20,
  code: flink.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
});
Flink applications also have specific configuration for passing parameters when the Flink job starts. These include parameters for checkpointing, snapshotting, monitoring, and parallelism.
declare const bucket: s3.Bucket;
const flinkApp = new flink.Application(this, 'Application', {
  code: flink.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
  runtime: flink.Runtime.FLINK_1_20,
  checkpointingEnabled: true, // default is true
  checkpointInterval: Duration.seconds(30), // default is 1 minute
  minPauseBetweenCheckpoints: Duration.seconds(10), // default is 5 seconds
  logLevel: flink.LogLevel.ERROR, // default is INFO
  metricsLevel: flink.MetricsLevel.PARALLELISM, // default is APPLICATION
  autoScalingEnabled: false, // default is true
  parallelism: 32, // default is 1
  parallelismPerKpu: 2, // default is 1
  snapshotsEnabled: false, // default is true
  logGroup: new logs.LogGroup(this, 'LogGroup'), // by default, a new LogGroup will be created
});
Flink applications can optionally be deployed in a VPC:
declare const bucket: s3.Bucket;
declare const vpc: ec2.Vpc;
const flinkApp = new flink.Application(this, 'Application', {
  code: flink.ApplicationCode.fromBucket(bucket, 'my-app.jar'),
  runtime: flink.Runtime.FLINK_1_20,
  vpc,
});
