用于 Python 的 Amaz SDK on Keyspaces 示例 (Boto3) - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用于 Python 的 Amaz SDK on Keyspaces 示例 (Boto3)

以下代码示例向您展示了如何使用 AWS SDK for Python (Boto3) 与 Amazon Keyspaces 配合使用来执行操作和实现常见场景。

基础知识是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例展示了如何开始使用 Amazon Keyspaces。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 def hello_keyspaces(keyspaces_client): """ Use the AWS SDK for Python (Boto3) to create an Amazon Keyspaces (for Apache Cassandra) client and list the keyspaces in your account. This example uses the default settings specified in your shared credentials and config files. :param keyspaces_client: A Boto3 Amazon Keyspaces Client object. This object wraps the low-level Amazon Keyspaces service API. """ print("Hello, Amazon Keyspaces! Let's list some of your keyspaces:\n") for ks in keyspaces_client.list_keyspaces(maxResults=5).get("keyspaces", []): print(ks["keyspaceName"]) print(f"\t{ks['resourceArn']}") if __name__ == "__main__": hello_keyspaces(boto3.client("keyspaces"))
  • 有关API详细信息,请参阅ListKeyspaces中的 AWS SDKPython (Boto3) API 参考。

基础知识

以下代码示例展示了如何:

  • 创建密钥空间和表。表架构保存电影数据并启用了 point-in-time恢复。

  • 使用带有 Sigv4 身份验证的安全TLS连接连接到密钥空间。

  • 查询表。添加、检索和更新电影数据。

  • 更新表。添加一列来跟踪观看的电影。

  • 将表还原到以前的状态并清理资源。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

class KeyspaceScenario: """Runs an interactive scenario that shows how to get started using Amazon Keyspaces.""" def __init__(self, ks_wrapper): """ :param ks_wrapper: An object that wraps Amazon Keyspace actions. """ self.ks_wrapper = ks_wrapper @demo_func def create_keyspace(self): """ 1. Creates a keyspace. 2. Lists up to 10 keyspaces in your account. """ print("Let's create a keyspace.") ks_name = q.ask( "Enter a name for your new keyspace.\nThe name can contain only letters, " "numbers and underscores: ", q.non_empty, ) if self.ks_wrapper.exists_keyspace(ks_name): print(f"A keyspace named {ks_name} exists.") else: ks_arn = self.ks_wrapper.create_keyspace(ks_name) ks_exists = False while not ks_exists: wait(3) ks_exists = self.ks_wrapper.exists_keyspace(ks_name) print(f"Created a new keyspace.\n\t{ks_arn}.") print("The first 10 keyspaces in your account are:\n") self.ks_wrapper.list_keyspaces(10) @demo_func def create_table(self): """ 1. Creates a table in the keyspace. The table is configured with a schema to hold movie data and has point-in-time recovery enabled. 2. Waits for the table to be in an active state. 3. Displays schema information for the table. 4. Lists tables in the keyspace. """ print("Let's create a table for movies in your keyspace.") table_name = q.ask("Enter a name for your table: ", q.non_empty) table = self.ks_wrapper.get_table(table_name) if table is not None: print( f"A table named {table_name} already exists in keyspace " f"{self.ks_wrapper.ks_name}." ) else: table_arn = self.ks_wrapper.create_table(table_name) print(f"Created table {table_name}:\n\t{table_arn}") table = {"status": None} print("Waiting for your table to be ready...") while table["status"] != "ACTIVE": wait(5) table = self.ks_wrapper.get_table(table_name) print(f"Your table is {table['status']}. Its schema is:") pp(table["schemaDefinition"]) print("\nThe tables in your keyspace are:\n") self.ks_wrapper.list_tables() @demo_func def ensure_tls_cert(self): """ Ensures you have a TLS certificate available to use to secure the connection to the keyspace. This function downloads a default certificate or lets you specify your own. """ print("To connect to your keyspace, you must have a TLS certificate.") print("Checking for TLS certificate...") cert_path = os.path.join( os.path.dirname(__file__), QueryManager.DEFAULT_CERT_FILE ) if not os.path.exists(cert_path): cert_choice = q.ask( f"Press enter to download a certificate from {QueryManager.CERT_URL} " f"or enter the full path to the certificate you want to use: " ) if cert_choice: cert_path = cert_choice else: cert = requests.get(QueryManager.CERT_URL).text with open(cert_path, "w") as cert_file: cert_file.write(cert) else: q.ask(f"Certificate {cert_path} found. Press Enter to continue.") print( f"Certificate {cert_path} will be used to secure the connection to your keyspace." ) return cert_path @demo_func def query_table(self, qm, movie_file): """ 1. Adds movies to the table from a sample movie data file. 2. Gets a list of movies from the table and lets you select one. 3. Displays more information about the selected movie. """ qm.add_movies(self.ks_wrapper.table_name, movie_file) movies = qm.get_movies(self.ks_wrapper.table_name) print(f"Added {len(movies)} movies to the table:") sel = q.choose("Pick one to learn more about it: ", [m.title for m in movies]) movie_choice = qm.get_movie( self.ks_wrapper.table_name, movies[sel].title, movies[sel].year ) print(movie_choice.title) print(f"\tReleased: {movie_choice.release_date}") print(f"\tPlot: {movie_choice.plot}") @demo_func def update_and_restore_table(self, qm): """ 1. Updates the table by adding a column to track watched movies. 2. Marks some of the movies as watched. 3. Gets the list of watched movies from the table. 4. Restores to a movies_restored table at a previous point in time. 5. Gets the list of movies from the restored table. """ print("Let's add a column to record which movies you've watched.") pre_update_timestamp = datetime.utcnow() print( f"Recorded the current UTC time of {pre_update_timestamp} so we can restore the table later." ) self.ks_wrapper.update_table() print("Waiting for your table to update...") table = {"status": "UPDATING"} while table["status"] != "ACTIVE": wait(5) table = self.ks_wrapper.get_table(self.ks_wrapper.table_name) print("Column 'watched' added to table.") q.ask( "Let's mark some of the movies as watched. Press Enter when you're ready.\n" ) movies = qm.get_movies(self.ks_wrapper.table_name) for movie in movies[:10]: qm.watched_movie(self.ks_wrapper.table_name, movie.title, movie.year) print(f"Marked {movie.title} as watched.") movies = qm.get_movies(self.ks_wrapper.table_name, watched=True) print("-" * 88) print("The watched movies in our table are:\n") for movie in movies: print(movie.title) print("-" * 88) if q.ask( "Do you want to restore the table to the way it was before all of these\n" "updates? Keep in mind, this can take up to 20 minutes. (y/n) ", q.is_yesno, ): starting_table_name = self.ks_wrapper.table_name table_name_restored = self.ks_wrapper.restore_table(pre_update_timestamp) table = {"status": "RESTORING"} while table["status"] != "ACTIVE": wait(10) table = self.ks_wrapper.get_table(table_name_restored) print( f"Restored {starting_table_name} to {table_name_restored} " f"at a point in time of {pre_update_timestamp}." ) movies = qm.get_movies(table_name_restored) print("Now the movies in our table are:") for movie in movies: print(movie.title) def cleanup(self, cert_path): """ 1. Deletes the table and waits for it to be removed. 2. Deletes the keyspace. :param cert_path: The path of the TLS certificate used in the demo. If the certificate was downloaded during the demo, it is removed. """ if q.ask( f"Do you want to delete your {self.ks_wrapper.table_name} table and " f"{self.ks_wrapper.ks_name} keyspace? (y/n) ", q.is_yesno, ): table_name = self.ks_wrapper.table_name self.ks_wrapper.delete_table() table = self.ks_wrapper.get_table(table_name) print("Waiting for the table to be deleted.") while table is not None: wait(5) table = self.ks_wrapper.get_table(table_name) print("Table deleted.") self.ks_wrapper.delete_keyspace() print( "Keyspace deleted. If you chose to restore your table during the " "demo, the original table is also deleted." ) if cert_path == os.path.join( os.path.dirname(__file__), QueryManager.DEFAULT_CERT_FILE ) and os.path.exists(cert_path): os.remove(cert_path) print("Removed certificate that was downloaded for this demo.") def run_scenario(self): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon Keyspaces (for Apache Cassandra) demo.") print("-" * 88) self.create_keyspace() self.create_table() cert_file_path = self.ensure_tls_cert() # Use a context manager to ensure the connection to the keyspace is closed. with QueryManager( cert_file_path, boto3.DEFAULT_SESSION, self.ks_wrapper.ks_name ) as qm: self.query_table(qm, "../../../resources/sample_files/movies.json") self.update_and_restore_table(qm) self.cleanup(cert_file_path) print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: scenario = KeyspaceScenario(KeyspaceWrapper.from_client()) scenario.run_scenario() except Exception: logging.exception("Something went wrong with the demo.")

定义一个包装密钥空间和表操作的类。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def create_keyspace(self, name): """ Creates a keyspace. :param name: The name to give the keyspace. :return: The Amazon Resource Name (ARN) of the new keyspace. """ try: response = self.keyspaces_client.create_keyspace(keyspaceName=name) self.ks_name = name self.ks_arn = response["resourceArn"] except ClientError as err: logger.error( "Couldn't create %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return self.ks_arn def exists_keyspace(self, name): """ Checks whether a keyspace exists. :param name: The name of the keyspace to look up. :return: True when the keyspace exists. Otherwise, False. """ try: response = self.keyspaces_client.get_keyspace(keyspaceName=name) self.ks_name = response["keyspaceName"] self.ks_arn = response["resourceArn"] exists = True except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Keyspace %s does not exist.", name) exists = False else: logger.error( "Couldn't verify %s exists. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return exists def list_keyspaces(self, limit): """ Lists the keyspaces in your account. :param limit: The maximum number of keyspaces to list. """ try: ks_paginator = self.keyspaces_client.get_paginator("list_keyspaces") for page in ks_paginator.paginate(PaginationConfig={"MaxItems": limit}): for ks in page["keyspaces"]: print(ks["keyspaceName"]) print(f"\t{ks['resourceArn']}") except ClientError as err: logger.error( "Couldn't list keyspaces. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def create_table(self, table_name): """ Creates a table in the keyspace. The table is created with a schema for storing movie data and has point-in-time recovery enabled. :param table_name: The name to give the table. :return: The ARN of the new table. """ try: response = self.keyspaces_client.create_table( keyspaceName=self.ks_name, tableName=table_name, schemaDefinition={ "allColumns": [ {"name": "title", "type": "text"}, {"name": "year", "type": "int"}, {"name": "release_date", "type": "timestamp"}, {"name": "plot", "type": "text"}, ], "partitionKeys": [{"name": "year"}, {"name": "title"}], }, pointInTimeRecovery={"status": "ENABLED"}, ) except ClientError as err: logger.error( "Couldn't create table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["resourceArn"] def get_table(self, table_name): """ Gets data about a table in the keyspace. :param table_name: The name of the table to look up. :return: Data about the table. """ try: response = self.keyspaces_client.get_table( keyspaceName=self.ks_name, tableName=table_name ) self.table_name = table_name except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Table %s does not exist.", table_name) self.table_name = None response = None else: logger.error( "Couldn't verify %s exists. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return response def list_tables(self): """ Lists the tables in the keyspace. """ try: table_paginator = self.keyspaces_client.get_paginator("list_tables") for page in table_paginator.paginate(keyspaceName=self.ks_name): for table in page["tables"]: print(table["tableName"]) print(f"\t{table['resourceArn']}") except ClientError as err: logger.error( "Couldn't list tables in keyspace %s. Here's why: %s: %s", self.ks_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def update_table(self): """ Updates the schema of the table. This example updates a table of movie data by adding a new column that tracks whether the movie has been watched. """ try: self.keyspaces_client.update_table( keyspaceName=self.ks_name, tableName=self.table_name, addColumns=[{"name": "watched", "type": "boolean"}], ) except ClientError as err: logger.error( "Couldn't update table %s. Here's why: %s: %s", self.table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def restore_table(self, restore_timestamp): """ Restores the table to a previous point in time. The table is restored to a new table in the same keyspace. :param restore_timestamp: The point in time to restore the table. This time must be in UTC format. :return: The name of the restored table. """ try: restored_table_name = f"{self.table_name}_restored" self.keyspaces_client.restore_table( sourceKeyspaceName=self.ks_name, sourceTableName=self.table_name, targetKeyspaceName=self.ks_name, targetTableName=restored_table_name, restoreTimestamp=restore_timestamp, ) except ClientError as err: logger.error( "Couldn't restore table %s. Here's why: %s: %s", restore_timestamp, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return restored_table_name def delete_table(self): """ Deletes the table from the keyspace. """ try: self.keyspaces_client.delete_table( keyspaceName=self.ks_name, tableName=self.table_name ) self.table_name = None except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", self.table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_keyspace(self): """ Deletes the keyspace. """ try: self.keyspaces_client.delete_keyspace(keyspaceName=self.ks_name) self.ks_name = None except ClientError as err: logger.error( "Couldn't delete keyspace %s. Here's why: %s: %s", self.ks_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

定义一个用于创建与密钥空间的TLS连接、使用 Sigv4 进行身份验证以及向密钥空间中的表发送CQL查询的类。

class QueryManager: """ Manages queries to an Amazon Keyspaces (for Apache Cassandra) keyspace. Queries are secured by TLS and authenticated by using the Signature V4 (SigV4) AWS signing protocol. This is more secure than sending username and password with a plain-text authentication provider. This example downloads a default certificate to secure TLS, or lets you specify your own. This example uses a table of movie data to demonstrate basic queries. """ DEFAULT_CERT_FILE = "sf-class2-root.crt" CERT_URL = f"https://certs.secureserver.net/repository/sf-class2-root.crt" def __init__(self, cert_file_path, boto_session, keyspace_name): """ :param cert_file_path: The path and file name of the certificate used for TLS. :param boto_session: A Boto3 session. This is used to acquire your AWS credentials. :param keyspace_name: The name of the keyspace to connect. """ self.cert_file_path = cert_file_path self.boto_session = boto_session self.ks_name = keyspace_name self.cluster = None self.session = None def __enter__(self): """ Creates a session connection to the keyspace that is secured by TLS and authenticated by SigV4. """ ssl_context = SSLContext(PROTOCOL_TLSv1_2) ssl_context.load_verify_locations(self.cert_file_path) ssl_context.verify_mode = CERT_REQUIRED auth_provider = SigV4AuthProvider(self.boto_session) contact_point = f"cassandra.{self.boto_session.region_name}.amazonaws.com" exec_profile = ExecutionProfile( consistency_level=ConsistencyLevel.LOCAL_QUORUM, load_balancing_policy=DCAwareRoundRobinPolicy(), ) self.cluster = Cluster( [contact_point], ssl_context=ssl_context, auth_provider=auth_provider, port=9142, execution_profiles={EXEC_PROFILE_DEFAULT: exec_profile}, protocol_version=4, ) self.cluster.__enter__() self.session = self.cluster.connect(self.ks_name) return self def __exit__(self, *args): """ Exits the cluster. This shuts down all existing session connections. """ self.cluster.__exit__(*args) def add_movies(self, table_name, movie_file_path): """ Gets movies from a JSON file and adds them to a table in the keyspace. :param table_name: The name of the table. :param movie_file_path: The path and file name of a JSON file that contains movie data. """ with open(movie_file_path, "r") as movie_file: movies = json.loads(movie_file.read()) stmt = self.session.prepare( f"INSERT INTO {table_name} (year, title, release_date, plot) VALUES (?, ?, ?, ?);" ) for movie in movies[:20]: self.session.execute( stmt, parameters=[ movie["year"], movie["title"], date.fromisoformat(movie["info"]["release_date"].partition("T")[0]), movie["info"]["plot"], ], ) def get_movies(self, table_name, watched=None): """ Gets the title and year of the full list of movies from the table. :param table_name: The name of the movie table. :param watched: When specified, the returned list of movies is filtered to either movies that have been watched or movies that have not been watched. Otherwise, all movies are returned. :return: A list of movies in the table. """ if watched is None: stmt = SimpleStatement(f"SELECT title, year from {table_name}") params = None else: stmt = SimpleStatement( f"SELECT title, year from {table_name} WHERE watched = %s ALLOW FILTERING" ) params = [watched] return self.session.execute(stmt, parameters=params).all() def get_movie(self, table_name, title, year): """ Gets a single movie from the table, by title and year. :param table_name: The name of the movie table. :param title: The title of the movie. :param year: The year of the movie's release. :return: The requested movie. """ return self.session.execute( SimpleStatement( f"SELECT * from {table_name} WHERE title = %s AND year = %s" ), parameters=[title, year], ).one() def watched_movie(self, table_name, title, year): """ Updates a movie as having been watched. :param table_name: The name of the movie table. :param title: The title of the movie. :param year: The year of the movie's release. """ self.session.execute( SimpleStatement( f"UPDATE {table_name} SET watched=true WHERE title = %s AND year = %s" ), parameters=[title, year], )

操作

以下代码示例演示如何使用 CreateKeyspace

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def create_keyspace(self, name): """ Creates a keyspace. :param name: The name to give the keyspace. :return: The Amazon Resource Name (ARN) of the new keyspace. """ try: response = self.keyspaces_client.create_keyspace(keyspaceName=name) self.ks_name = name self.ks_arn = response["resourceArn"] except ClientError as err: logger.error( "Couldn't create %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return self.ks_arn
  • 有关API详细信息,请参阅CreateKeyspace中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 CreateTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def create_table(self, table_name): """ Creates a table in the keyspace. The table is created with a schema for storing movie data and has point-in-time recovery enabled. :param table_name: The name to give the table. :return: The ARN of the new table. """ try: response = self.keyspaces_client.create_table( keyspaceName=self.ks_name, tableName=table_name, schemaDefinition={ "allColumns": [ {"name": "title", "type": "text"}, {"name": "year", "type": "int"}, {"name": "release_date", "type": "timestamp"}, {"name": "plot", "type": "text"}, ], "partitionKeys": [{"name": "year"}, {"name": "title"}], }, pointInTimeRecovery={"status": "ENABLED"}, ) except ClientError as err: logger.error( "Couldn't create table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["resourceArn"]
  • 有关API详细信息,请参阅CreateTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 DeleteKeyspace

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def delete_keyspace(self): """ Deletes the keyspace. """ try: self.keyspaces_client.delete_keyspace(keyspaceName=self.ks_name) self.ks_name = None except ClientError as err: logger.error( "Couldn't delete keyspace %s. Here's why: %s: %s", self.ks_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅DeleteKeyspace中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 DeleteTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def delete_table(self): """ Deletes the table from the keyspace. """ try: self.keyspaces_client.delete_table( keyspaceName=self.ks_name, tableName=self.table_name ) self.table_name = None except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", self.table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅DeleteTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 GetKeyspace

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def exists_keyspace(self, name): """ Checks whether a keyspace exists. :param name: The name of the keyspace to look up. :return: True when the keyspace exists. Otherwise, False. """ try: response = self.keyspaces_client.get_keyspace(keyspaceName=name) self.ks_name = response["keyspaceName"] self.ks_arn = response["resourceArn"] exists = True except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Keyspace %s does not exist.", name) exists = False else: logger.error( "Couldn't verify %s exists. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return exists
  • 有关API详细信息,请参阅GetKeyspace中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 GetTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def get_table(self, table_name): """ Gets data about a table in the keyspace. :param table_name: The name of the table to look up. :return: Data about the table. """ try: response = self.keyspaces_client.get_table( keyspaceName=self.ks_name, tableName=table_name ) self.table_name = table_name except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Table %s does not exist.", table_name) self.table_name = None response = None else: logger.error( "Couldn't verify %s exists. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return response
  • 有关API详细信息,请参阅GetTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ListKeyspaces

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def list_keyspaces(self, limit): """ Lists the keyspaces in your account. :param limit: The maximum number of keyspaces to list. """ try: ks_paginator = self.keyspaces_client.get_paginator("list_keyspaces") for page in ks_paginator.paginate(PaginationConfig={"MaxItems": limit}): for ks in page["keyspaces"]: print(ks["keyspaceName"]) print(f"\t{ks['resourceArn']}") except ClientError as err: logger.error( "Couldn't list keyspaces. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅ListKeyspaces中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ListTables

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def list_tables(self): """ Lists the tables in the keyspace. """ try: table_paginator = self.keyspaces_client.get_paginator("list_tables") for page in table_paginator.paginate(keyspaceName=self.ks_name): for table in page["tables"]: print(table["tableName"]) print(f"\t{table['resourceArn']}") except ClientError as err: logger.error( "Couldn't list tables in keyspace %s. Here's why: %s: %s", self.ks_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅ListTables中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 RestoreTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def restore_table(self, restore_timestamp): """ Restores the table to a previous point in time. The table is restored to a new table in the same keyspace. :param restore_timestamp: The point in time to restore the table. This time must be in UTC format. :return: The name of the restored table. """ try: restored_table_name = f"{self.table_name}_restored" self.keyspaces_client.restore_table( sourceKeyspaceName=self.ks_name, sourceTableName=self.table_name, targetKeyspaceName=self.ks_name, targetTableName=restored_table_name, restoreTimestamp=restore_timestamp, ) except ClientError as err: logger.error( "Couldn't restore table %s. Here's why: %s: %s", restore_timestamp, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return restored_table_name
  • 有关API详细信息,请参阅RestoreTable中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 UpdateTable

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class KeyspaceWrapper: """Encapsulates Amazon Keyspaces (for Apache Cassandra) keyspace and table actions.""" def __init__(self, keyspaces_client): """ :param keyspaces_client: A Boto3 Amazon Keyspaces client. """ self.keyspaces_client = keyspaces_client self.ks_name = None self.ks_arn = None self.table_name = None @classmethod def from_client(cls): keyspaces_client = boto3.client("keyspaces") return cls(keyspaces_client) def update_table(self): """ Updates the schema of the table. This example updates a table of movie data by adding a new column that tracks whether the movie has been watched. """ try: self.keyspaces_client.update_table( keyspaceName=self.ks_name, tableName=self.table_name, addColumns=[{"name": "watched", "type": "boolean"}], ) except ClientError as err: logger.error( "Couldn't update table %s. Here's why: %s: %s", self.table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅UpdateTable中的 AWS SDKPython (Boto3) API 参考。