There are more AWS SDK examples available in the AWS Doc SDK Examples
Managed Service for Apache Flink examples using SDK for Python (Boto3)
The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Python (Boto3) with Managed Service for Apache Flink.
Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.
Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.
Topics
Actions
The following code example shows how to use AddApplicationInput
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_input(self, input_prefix, stream_arn, input_schema): """ Adds an input stream to the application. The input stream data is mapped to an in-application stream that can be processed by your code running in Kinesis Data Analytics. :param input_prefix: The prefix prepended to in-application input stream names. :param stream_arn: The ARN of the input stream. :param input_schema: A schema that maps the data in the input stream to the runtime environment. This can be automatically generated by using `discover_input_schema` or you can create it yourself. :return: Metadata about the newly added input. """ try: response = self.analytics_client.add_application_input( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Input={ "NamePrefix": input_prefix, "KinesisStreamsInput": {"ResourceARN": stream_arn}, "InputSchema": input_schema, }, ) self.version_id = response["ApplicationVersionId"] logger.info("Add input stream %s to application %s.", stream_arn, self.name) except ClientError: logger.exception( "Couldn't add input stream %s to application %s.", stream_arn, self.name ) raise else: return response
-
For API details, see AddApplicationInput in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use AddApplicationOutput
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_output(self, in_app_stream_name, output_arn): """ Adds an output stream to the application. Kinesis Data Analytics maps data from the specified in-application stream to the output stream. :param in_app_stream_name: The name of the in-application stream to map to the output stream. :param output_arn: The ARN of the output stream. :return: A list of metadata about the output resources currently assigned to the application. """ try: response = self.analytics_client.add_application_output( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Output={ "Name": in_app_stream_name, "KinesisStreamsOutput": {"ResourceARN": output_arn}, "DestinationSchema": {"RecordFormatType": "JSON"}, }, ) outputs = response["OutputDescriptions"] self.version_id = response["ApplicationVersionId"] logging.info( "Added output %s to %s, which now has %s outputs.", output_arn, self.name, len(outputs), ) except ClientError: logger.exception("Couldn't add output %s to %s.", output_arn, self.name) raise else: return outputs
-
For API details, see AddApplicationOutput in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use CreateApplication
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def create(self, app_name, role_arn, env="SQL-1_0"): """ Creates a Kinesis Data Analytics application. :param app_name: The name of the application. :param role_arn: The ARN of a role that can be assumed by Kinesis Data Analytics and grants needed permissions. :param env: The runtime environment of the application, such as SQL. Code uploaded to the application runs in this environment. :return: Metadata about the newly created application. """ try: response = self.analytics_client.create_application( ApplicationName=app_name, RuntimeEnvironment=env, ServiceExecutionRole=role_arn, ) details = response["ApplicationDetail"] self._update_details(details) logger.info("Application %s created.", app_name) except ClientError: logger.exception("Couldn't create application %s.", app_name) raise else: return details
-
For API details, see CreateApplication in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use DeleteApplication
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def delete(self): """ Deletes an application. """ try: self.analytics_client.delete_application( ApplicationName=self.name, CreateTimestamp=self.create_timestamp ) logger.info("Deleted application %s.", self.name) except ClientError: logger.exception("Couldn't delete application %s.", self.name) raise
-
For API details, see DeleteApplication in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use DescribeApplication
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe(self, name): """ Gets metadata about an application. :param name: The name of the application to look up. :return: Metadata about the application. """ try: response = self.analytics_client.describe_application(ApplicationName=name) details = response["ApplicationDetail"] self._update_details(details) logger.info("Got metadata for application %s.", name) except ClientError: logger.exception("Couldn't get metadata for application %s.", name) raise else: return details
-
For API details, see DescribeApplication in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use DescribeApplicationSnapshot
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe_snapshot(self, application_name, snapshot_name): """ Gets metadata about a previously saved application snapshot. :param application_name: The name of the application. :param snapshot_name: The name of the snapshot. :return: Metadata about the snapshot. """ try: response = self.analytics_client.describe_application_snapshot( ApplicationName=application_name, SnapshotName=snapshot_name ) snapshot = response["SnapshotDetails"] logger.info( "Got metadata for snapshot %s of application %s.", snapshot_name, application_name, ) except ClientError: logger.exception( "Couldn't get metadata for snapshot %s of application %s.", snapshot_name, application_name, ) raise else: return snapshot
-
For API details, see DescribeApplicationSnapshot in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use DiscoverInputSchema
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def discover_input_schema(self, stream_arn, role_arn): """ Discovers a schema that maps data in a stream to a format that is usable by an application's runtime environment. The stream must be active and have enough data moving through it for the service to sample. The returned schema can be used when you add the stream as an input to the application or you can write your own schema. :param stream_arn: The ARN of the stream to map. :param role_arn: A role that lets Kinesis Data Analytics read from the stream. :return: The discovered schema of the data in the input stream. """ try: response = self.analytics_client.discover_input_schema( ResourceARN=stream_arn, ServiceExecutionRole=role_arn, InputStartingPositionConfiguration={"InputStartingPosition": "NOW"}, ) schema = response["InputSchema"] logger.info("Discovered input schema for stream %s.", stream_arn) except ClientError: logger.exception( "Couldn't discover input schema for stream %s.", stream_arn ) raise else: return schema
-
For API details, see DiscoverInputSchema in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use StartApplication
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def start(self, input_id): """ Starts an application. After the application is running, it reads from the specified input stream and runs the application code on the incoming data. :param input_id: The ID of the input to read. """ try: self.analytics_client.start_application( ApplicationName=self.name, RunConfiguration={ "SqlRunConfigurations": [ { "InputId": input_id, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, } ] }, ) logger.info("Started application %s.", self.name) except ClientError: logger.exception("Couldn't start application %s.", self.name) raise
-
For API details, see StartApplication in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use StopApplication
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def stop(self): """ Stops an application. This stops the application from processing data but does not delete any resources. """ try: self.analytics_client.stop_application(ApplicationName=self.name) logger.info("Stopping application %s.", self.name) except ClientError: logger.exception("Couldn't stop application %s.", self.name) raise
-
For API details, see StopApplication in AWS SDK for Python (Boto3) API Reference.
-
The following code example shows how to use UpdateApplication
.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. This example updates the code that runs in an existing application.
class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def update_code(self, code): """ Updates the code that runs in the application. The code must run in the runtime environment of the application, such as SQL. Application code typically reads data from in-application streams and transforms it in some way. :param code: The code to upload. This completely replaces any existing code in the application. :return: Metadata about the application. """ try: response = self.analytics_client.update_application( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, ApplicationConfigurationUpdate={ "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "PLAINTEXT", "CodeContentUpdate": {"TextContentUpdate": code}, } }, ) details = response["ApplicationDetail"] self.version_id = details["ApplicationVersionId"] logger.info("Update code for application %s.", self.name) except ClientError: logger.exception("Couldn't update code for application %s.", self.name) raise else: return details
-
For API details, see UpdateApplication in AWS SDK for Python (Boto3) API Reference.
-
Data generator
The following code example shows how to generate a Kinesis stream with a referrer.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"REFERRER": "http://www.amazon.com"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with blood pressure anomalies.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = "LOW" normal = "NORMAL" high = "HIGH" def get_blood_pressure(pressure_type): pressure = {"BloodPressureLevel": pressure_type.value} if pressure_type == PressureType.low: pressure["Systolic"] = random.randint(50, 80) pressure["Diastolic"] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure["Systolic"] = random.randint(90, 120) pressure["Diastolic"] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure["Systolic"] = random.randint(130, 200) pressure["Diastolic"] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal ) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with data in columns.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with heart rate anomalies.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with hotspots.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
The following code example shows how to generate a Kinesis stream with log entries.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] " '"GET /index.php HTTP/1.1" 200 125 "-" ' '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with stagger data.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with stock ticker data.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with two data types.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
The following code example shows how to generate a Kinesis stream with web log data.
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] " '"GET /icons/apache_pb.gif HTTP/1.1" 304 0' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))