Hay más AWS SDK ejemplos disponibles en el GitHub repositorio de AWS Doc SDK Examples
Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplos de Managed Service for Apache Flink que se utilizan SDK para Python (Boto3)
Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes mediante el uso del AWS SDK for Python (Boto3) servicio administrado para Apache Flink.
Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las funciones de servicio individuales, es posible ver las acciones en contexto en los escenarios relacionados.
Cada ejemplo incluye un enlace al código fuente completo, donde puede encontrar instrucciones sobre cómo configurar y ejecutar el código en su contexto.
Acciones
En el siguiente ejemplo de código se muestra cómo usar AddApplicationInput
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_input(self, input_prefix, stream_arn, input_schema): """ Adds an input stream to the application. The input stream data is mapped to an in-application stream that can be processed by your code running in Kinesis Data Analytics. :param input_prefix: The prefix prepended to in-application input stream names. :param stream_arn: The ARN of the input stream. :param input_schema: A schema that maps the data in the input stream to the runtime environment. This can be automatically generated by using `discover_input_schema` or you can create it yourself. :return: Metadata about the newly added input. """ try: response = self.analytics_client.add_application_input( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Input={ "NamePrefix": input_prefix, "KinesisStreamsInput": {"ResourceARN": stream_arn}, "InputSchema": input_schema, }, ) self.version_id = response["ApplicationVersionId"] logger.info("Add input stream %s to application %s.", stream_arn, self.name) except ClientError: logger.exception( "Couldn't add input stream %s to application %s.", stream_arn, self.name ) raise else: return response
-
Para API obtener más información, consulte AddApplicationInputla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar AddApplicationOutput
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_output(self, in_app_stream_name, output_arn): """ Adds an output stream to the application. Kinesis Data Analytics maps data from the specified in-application stream to the output stream. :param in_app_stream_name: The name of the in-application stream to map to the output stream. :param output_arn: The ARN of the output stream. :return: A list of metadata about the output resources currently assigned to the application. """ try: response = self.analytics_client.add_application_output( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Output={ "Name": in_app_stream_name, "KinesisStreamsOutput": {"ResourceARN": output_arn}, "DestinationSchema": {"RecordFormatType": "JSON"}, }, ) outputs = response["OutputDescriptions"] self.version_id = response["ApplicationVersionId"] logging.info( "Added output %s to %s, which now has %s outputs.", output_arn, self.name, len(outputs), ) except ClientError: logger.exception("Couldn't add output %s to %s.", output_arn, self.name) raise else: return outputs
-
Para API obtener más información, consulte AddApplicationOutputla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar CreateApplication
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def create(self, app_name, role_arn, env="SQL-1_0"): """ Creates a Kinesis Data Analytics application. :param app_name: The name of the application. :param role_arn: The ARN of a role that can be assumed by Kinesis Data Analytics and grants needed permissions. :param env: The runtime environment of the application, such as SQL. Code uploaded to the application runs in this environment. :return: Metadata about the newly created application. """ try: response = self.analytics_client.create_application( ApplicationName=app_name, RuntimeEnvironment=env, ServiceExecutionRole=role_arn, ) details = response["ApplicationDetail"] self._update_details(details) logger.info("Application %s created.", app_name) except ClientError: logger.exception("Couldn't create application %s.", app_name) raise else: return details
-
Para API obtener más información, consulte CreateApplicationla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DeleteApplication
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def delete(self): """ Deletes an application. """ try: self.analytics_client.delete_application( ApplicationName=self.name, CreateTimestamp=self.create_timestamp ) logger.info("Deleted application %s.", self.name) except ClientError: logger.exception("Couldn't delete application %s.", self.name) raise
-
Para API obtener más información, consulte DeleteApplicationla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeApplication
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe(self, name): """ Gets metadata about an application. :param name: The name of the application to look up. :return: Metadata about the application. """ try: response = self.analytics_client.describe_application(ApplicationName=name) details = response["ApplicationDetail"] self._update_details(details) logger.info("Got metadata for application %s.", name) except ClientError: logger.exception("Couldn't get metadata for application %s.", name) raise else: return details
-
Para API obtener más información, consulte DescribeApplicationla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DescribeApplicationSnapshot
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe_snapshot(self, application_name, snapshot_name): """ Gets metadata about a previously saved application snapshot. :param application_name: The name of the application. :param snapshot_name: The name of the snapshot. :return: Metadata about the snapshot. """ try: response = self.analytics_client.describe_application_snapshot( ApplicationName=application_name, SnapshotName=snapshot_name ) snapshot = response["SnapshotDetails"] logger.info( "Got metadata for snapshot %s of application %s.", snapshot_name, application_name, ) except ClientError: logger.exception( "Couldn't get metadata for snapshot %s of application %s.", snapshot_name, application_name, ) raise else: return snapshot
-
Para API obtener más información, consulte DescribeApplicationSnapshotla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar DiscoverInputSchema
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def discover_input_schema(self, stream_arn, role_arn): """ Discovers a schema that maps data in a stream to a format that is usable by an application's runtime environment. The stream must be active and have enough data moving through it for the service to sample. The returned schema can be used when you add the stream as an input to the application or you can write your own schema. :param stream_arn: The ARN of the stream to map. :param role_arn: A role that lets Kinesis Data Analytics read from the stream. :return: The discovered schema of the data in the input stream. """ try: response = self.analytics_client.discover_input_schema( ResourceARN=stream_arn, ServiceExecutionRole=role_arn, InputStartingPositionConfiguration={"InputStartingPosition": "NOW"}, ) schema = response["InputSchema"] logger.info("Discovered input schema for stream %s.", stream_arn) except ClientError: logger.exception( "Couldn't discover input schema for stream %s.", stream_arn ) raise else: return schema
-
Para API obtener más información, consulte DiscoverInputSchemala AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar StartApplication
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def start(self, input_id): """ Starts an application. After the application is running, it reads from the specified input stream and runs the application code on the incoming data. :param input_id: The ID of the input to read. """ try: self.analytics_client.start_application( ApplicationName=self.name, RunConfiguration={ "SqlRunConfigurations": [ { "InputId": input_id, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, } ] }, ) logger.info("Started application %s.", self.name) except ClientError: logger.exception("Couldn't start application %s.", self.name) raise
-
Para API obtener más información, consulte StartApplicationla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar StopApplication
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def stop(self): """ Stops an application. This stops the application from processing data but does not delete any resources. """ try: self.analytics_client.stop_application(ApplicationName=self.name) logger.info("Stopping application %s.", self.name) except ClientError: logger.exception("Couldn't stop application %s.", self.name) raise
-
Para API obtener más información, consulte StopApplicationla AWS SDKreferencia de Python (Boto3). API
-
En el siguiente ejemplo de código se muestra cómo usar UpdateApplication
.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. En este ejemplo se actualiza el código que se ejecuta en una aplicación existente.
class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def update_code(self, code): """ Updates the code that runs in the application. The code must run in the runtime environment of the application, such as SQL. Application code typically reads data from in-application streams and transforms it in some way. :param code: The code to upload. This completely replaces any existing code in the application. :return: Metadata about the application. """ try: response = self.analytics_client.update_application( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, ApplicationConfigurationUpdate={ "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "PLAINTEXT", "CodeContentUpdate": {"TextContentUpdate": code}, } }, ) details = response["ApplicationDetail"] self.version_id = details["ApplicationVersionId"] logger.info("Update code for application %s.", self.name) except ClientError: logger.exception("Couldn't update code for application %s.", self.name) raise else: return details
-
Para API obtener más información, consulte UpdateApplicationla AWS SDKreferencia de Python (Boto3). API
-
Generador de datos
En el siguiente ejemplo de código se muestra cómo generar una transmisión con un referente
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"REFERRER": "http://www.amazon.com"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
En el siguiente ejemplo de código se muestra cómo generar una transmisión de Kinesis con anomalías en la presión arterial.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = "LOW" normal = "NORMAL" high = "HIGH" def get_blood_pressure(pressure_type): pressure = {"BloodPressureLevel": pressure_type.value} if pressure_type == PressureType.low: pressure["Systolic"] = random.randint(50, 80) pressure["Diastolic"] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure["Systolic"] = random.randint(90, 120) pressure["Diastolic"] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure["Systolic"] = random.randint(130, 200) pressure["Diastolic"] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal ) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con datos en columnas.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con anomalías en la presión arterial.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con puntos de acceso.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con entradas de registro.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] " '"GET /index.php HTTP/1.1" 200 125 "-" ' '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con datos escalonados.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con datos de teletipos de bolsa.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
El siguiente ejemplo de código muestra cómo generar una transmisión de Kinesis con dos tipos de datos.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
En el siguiente ejemplo de código, se muestra cómo generar una secuencia de Kinesis con datos de registros web.
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] " '"GET /icons/apache_pb.gif HTTP/1.1" 304 0' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))