用于 Python 的 Amaz SDK on Cognito 身份提供商示例 (Boto3) - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用于 Python 的 Amaz SDK on Cognito 身份提供商示例 (Boto3)

以下代码示例向您展示了如何使用 AWS SDK for Python (Boto3) 与 Amazon Cognito 身份提供商配合使用来执行操作和实现常见场景。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以在其中找到有关如何在上下文中设置和运行代码的说明。

开始使用

以下代码示例展示了如何开始使用 Amazon Cognito。

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 # Create a Cognito Identity Provider client cognitoidp = boto3.client("cognito-idp") # Initialize a paginator for the list_user_pools operation paginator = cognitoidp.get_paginator("list_user_pools") # Create a PageIterator from the paginator page_iterator = paginator.paginate(MaxResults=10) # Initialize variables for pagination user_pools = [] # Handle pagination for page in page_iterator: user_pools.extend(page.get("UserPools", [])) # Print the list of user pools print("User Pools for the account:") if user_pools: for pool in user_pools: print(f"Name: {pool['Name']}, ID: {pool['Id']}") else: print("No user pools found.")
  • 有关API详细信息,请参阅ListUserPools中的 AWS SDKPython (Boto3) API 参考。

操作

以下代码示例演示如何使用 AdminGetUser

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def sign_up_user(self, user_name, password, user_email): """ Signs up a new user with Amazon Cognito. This action prompts Amazon Cognito to send an email to the specified email address. The email contains a code that can be used to confirm the user. When the user already exists, the user status is checked to determine whether the user has been confirmed. :param user_name: The user name that identifies the new user. :param password: The password for the new user. :param user_email: The email address for the new user. :return: True when the user is already confirmed with Amazon Cognito. Otherwise, false. """ try: kwargs = { "ClientId": self.client_id, "Username": user_name, "Password": password, "UserAttributes": [{"Name": "email", "Value": user_email}], } if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) response = self.cognito_idp_client.sign_up(**kwargs) confirmed = response["UserConfirmed"] except ClientError as err: if err.response["Error"]["Code"] == "UsernameExistsException": response = self.cognito_idp_client.admin_get_user( UserPoolId=self.user_pool_id, Username=user_name ) logger.warning( "User %s exists and is %s.", user_name, response["UserStatus"] ) confirmed = response["UserStatus"] == "CONFIRMED" else: logger.error( "Couldn't sign up %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return confirmed
  • 有关API详细信息,请参阅AdminGetUser中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 AdminInitiateAuth

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def start_sign_in(self, user_name, password): """ Starts the sign-in process for a user by using administrator credentials. This method of signing in is appropriate for code running on a secure server. If the user pool is configured to require MFA and this is the first sign-in for the user, Amazon Cognito returns a challenge response to set up an MFA application. When this occurs, this function gets an MFA secret from Amazon Cognito and returns it to the caller. :param user_name: The name of the user to sign in. :param password: The user's password. :return: The result of the sign-in attempt. When sign-in is successful, this returns an access token that can be used to get AWS credentials. Otherwise, Amazon Cognito returns a challenge to set up an MFA application, or a challenge to enter an MFA code from a registered MFA application. """ try: kwargs = { "UserPoolId": self.user_pool_id, "ClientId": self.client_id, "AuthFlow": "ADMIN_USER_PASSWORD_AUTH", "AuthParameters": {"USERNAME": user_name, "PASSWORD": password}, } if self.client_secret is not None: kwargs["AuthParameters"]["SECRET_HASH"] = self._secret_hash(user_name) response = self.cognito_idp_client.admin_initiate_auth(**kwargs) challenge_name = response.get("ChallengeName", None) if challenge_name == "MFA_SETUP": if ( "SOFTWARE_TOKEN_MFA" in response["ChallengeParameters"]["MFAS_CAN_SETUP"] ): response.update(self.get_mfa_secret(response["Session"])) else: raise RuntimeError( "The user pool requires MFA setup, but the user pool is not " "configured for TOTP MFA. This example requires TOTP MFA." ) except ClientError as err: logger.error( "Couldn't start sign in for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: response.pop("ResponseMetadata", None) return response
  • 有关API详细信息,请参阅AdminInitiateAuth中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 AdminRespondToAuthChallenge

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

通过提供关联MFA应用程序生成的代码来应对MFA挑战。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def respond_to_mfa_challenge(self, user_name, session, mfa_code): """ Responds to a challenge for an MFA code. This completes the second step of a two-factor sign-in. When sign-in is successful, it returns an access token that can be used to get AWS credentials from Amazon Cognito. :param user_name: The name of the user who is signing in. :param session: Session information returned from a previous call to initiate authentication. :param mfa_code: A code generated by the associated MFA application. :return: The result of the authentication. When successful, this contains an access token for the user. """ try: kwargs = { "UserPoolId": self.user_pool_id, "ClientId": self.client_id, "ChallengeName": "SOFTWARE_TOKEN_MFA", "Session": session, "ChallengeResponses": { "USERNAME": user_name, "SOFTWARE_TOKEN_MFA_CODE": mfa_code, }, } if self.client_secret is not None: kwargs["ChallengeResponses"]["SECRET_HASH"] = self._secret_hash( user_name ) response = self.cognito_idp_client.admin_respond_to_auth_challenge(**kwargs) auth_result = response["AuthenticationResult"] except ClientError as err: if err.response["Error"]["Code"] == "ExpiredCodeException": logger.warning( "Your MFA code has expired or has been used already. You might have " "to wait a few seconds until your app shows you a new code." ) else: logger.error( "Couldn't respond to mfa challenge for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return auth_result

以下代码示例演示如何使用 AssociateSoftwareToken

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def get_mfa_secret(self, session): """ Gets a token that can be used to associate an MFA application with the user. :param session: Session information returned from a previous call to initiate authentication. :return: An MFA token that can be used to set up an MFA application. """ try: response = self.cognito_idp_client.associate_software_token(Session=session) except ClientError as err: logger.error( "Couldn't get MFA secret. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: response.pop("ResponseMetadata", None) return response

以下代码示例演示如何使用 ConfirmDevice

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def confirm_mfa_device( self, user_name, device_key, device_group_key, device_password, access_token, aws_srp, ): """ Confirms an MFA device to be tracked by Amazon Cognito. When a device is tracked, its key and password can be used to sign in without requiring a new MFA code from the MFA application. :param user_name: The user that is associated with the device. :param device_key: The key of the device, returned by Amazon Cognito. :param device_group_key: The group key of the device, returned by Amazon Cognito. :param device_password: The password that is associated with the device. :param access_token: The user's access token. :param aws_srp: A class that helps with Secure Remote Password (SRP) calculations. The scenario associated with this example uses the warrant package. :return: True when the user must confirm the device. Otherwise, False. When False, the device is automatically confirmed and tracked. """ srp_helper = aws_srp.AWSSRP( username=user_name, password=device_password, pool_id="_", client_id=self.client_id, client_secret=None, client=self.cognito_idp_client, ) device_and_pw = f"{device_group_key}{device_key}:{device_password}" device_and_pw_hash = aws_srp.hash_sha256(device_and_pw.encode("utf-8")) salt = aws_srp.pad_hex(aws_srp.get_random(16)) x_value = aws_srp.hex_to_long(aws_srp.hex_hash(salt + device_and_pw_hash)) verifier = aws_srp.pad_hex(pow(srp_helper.val_g, x_value, srp_helper.big_n)) device_secret_verifier_config = { "PasswordVerifier": base64.standard_b64encode( bytearray.fromhex(verifier) ).decode("utf-8"), "Salt": base64.standard_b64encode(bytearray.fromhex(salt)).decode("utf-8"), } try: response = self.cognito_idp_client.confirm_device( AccessToken=access_token, DeviceKey=device_key, DeviceSecretVerifierConfig=device_secret_verifier_config, ) user_confirm = response["UserConfirmationNecessary"] except ClientError as err: logger.error( "Couldn't confirm mfa device %s. Here's why: %s: %s", device_key, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return user_confirm
  • 有关API详细信息,请参阅ConfirmDevice中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ConfirmSignUp

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def confirm_user_sign_up(self, user_name, confirmation_code): """ Confirms a previously created user. A user must be confirmed before they can sign in to Amazon Cognito. :param user_name: The name of the user to confirm. :param confirmation_code: The confirmation code sent to the user's registered email address. :return: True when the confirmation succeeds. """ try: kwargs = { "ClientId": self.client_id, "Username": user_name, "ConfirmationCode": confirmation_code, } if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) self.cognito_idp_client.confirm_sign_up(**kwargs) except ClientError as err: logger.error( "Couldn't confirm sign up for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return True
  • 有关API详细信息,请参阅ConfirmSignUp中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 InitiateAuth

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

此示例演示了如何使用被跟踪设备开始身份验证。要完成登录,客户端必须正确响应安全远程密码 (SRP) 挑战。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def sign_in_with_tracked_device( self, user_name, password, device_key, device_group_key, device_password, aws_srp, ): """ Signs in to Amazon Cognito as a user who has a tracked device. Signing in with a tracked device lets a user sign in without entering a new MFA code. Signing in with a tracked device requires that the client respond to the SRP protocol. The scenario associated with this example uses the warrant package to help with SRP calculations. For more information on SRP, see https://en.wikipedia.org/wiki/Secure_Remote_Password_protocol. :param user_name: The user that is associated with the device. :param password: The user's password. :param device_key: The key of a tracked device. :param device_group_key: The group key of a tracked device. :param device_password: The password that is associated with the device. :param aws_srp: A class that helps with SRP calculations. The scenario associated with this example uses the warrant package. :return: The result of the authentication. When successful, this contains an access token for the user. """ try: srp_helper = aws_srp.AWSSRP( username=user_name, password=device_password, pool_id="_", client_id=self.client_id, client_secret=None, client=self.cognito_idp_client, ) response_init = self.cognito_idp_client.initiate_auth( ClientId=self.client_id, AuthFlow="USER_PASSWORD_AUTH", AuthParameters={ "USERNAME": user_name, "PASSWORD": password, "DEVICE_KEY": device_key, }, ) if response_init["ChallengeName"] != "DEVICE_SRP_AUTH": raise RuntimeError( f"Expected DEVICE_SRP_AUTH challenge but got {response_init['ChallengeName']}." ) auth_params = srp_helper.get_auth_params() auth_params["DEVICE_KEY"] = device_key response_auth = self.cognito_idp_client.respond_to_auth_challenge( ClientId=self.client_id, ChallengeName="DEVICE_SRP_AUTH", ChallengeResponses=auth_params, ) if response_auth["ChallengeName"] != "DEVICE_PASSWORD_VERIFIER": raise RuntimeError( f"Expected DEVICE_PASSWORD_VERIFIER challenge but got " f"{response_init['ChallengeName']}." ) challenge_params = response_auth["ChallengeParameters"] challenge_params["USER_ID_FOR_SRP"] = device_group_key + device_key cr = srp_helper.process_challenge(challenge_params, {"USERNAME": user_name}) cr["USERNAME"] = user_name cr["DEVICE_KEY"] = device_key response_verifier = self.cognito_idp_client.respond_to_auth_challenge( ClientId=self.client_id, ChallengeName="DEVICE_PASSWORD_VERIFIER", ChallengeResponses=cr, ) auth_tokens = response_verifier["AuthenticationResult"] except ClientError as err: logger.error( "Couldn't start client sign in for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return auth_tokens
  • 有关API详细信息,请参阅InitiateAuth中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ListUsers

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def list_users(self): """ Returns a list of the users in the current user pool. :return: The list of users. """ try: response = self.cognito_idp_client.list_users(UserPoolId=self.user_pool_id) users = response["Users"] except ClientError as err: logger.error( "Couldn't list users for %s. Here's why: %s: %s", self.user_pool_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return users
  • 有关API详细信息,请参阅ListUsers中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 ResendConfirmationCode

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def resend_confirmation(self, user_name): """ Prompts Amazon Cognito to resend an email with a new confirmation code. :param user_name: The name of the user who will receive the email. :return: Delivery information about where the email is sent. """ try: kwargs = {"ClientId": self.client_id, "Username": user_name} if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) response = self.cognito_idp_client.resend_confirmation_code(**kwargs) delivery = response["CodeDeliveryDetails"] except ClientError as err: logger.error( "Couldn't resend confirmation to %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return delivery

以下代码示例演示如何使用 RespondToAuthChallenge

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

登录跟踪的设备。要完成登录,客户端必须正确响应安全远程密码 (SRP) 挑战。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def sign_in_with_tracked_device( self, user_name, password, device_key, device_group_key, device_password, aws_srp, ): """ Signs in to Amazon Cognito as a user who has a tracked device. Signing in with a tracked device lets a user sign in without entering a new MFA code. Signing in with a tracked device requires that the client respond to the SRP protocol. The scenario associated with this example uses the warrant package to help with SRP calculations. For more information on SRP, see https://en.wikipedia.org/wiki/Secure_Remote_Password_protocol. :param user_name: The user that is associated with the device. :param password: The user's password. :param device_key: The key of a tracked device. :param device_group_key: The group key of a tracked device. :param device_password: The password that is associated with the device. :param aws_srp: A class that helps with SRP calculations. The scenario associated with this example uses the warrant package. :return: The result of the authentication. When successful, this contains an access token for the user. """ try: srp_helper = aws_srp.AWSSRP( username=user_name, password=device_password, pool_id="_", client_id=self.client_id, client_secret=None, client=self.cognito_idp_client, ) response_init = self.cognito_idp_client.initiate_auth( ClientId=self.client_id, AuthFlow="USER_PASSWORD_AUTH", AuthParameters={ "USERNAME": user_name, "PASSWORD": password, "DEVICE_KEY": device_key, }, ) if response_init["ChallengeName"] != "DEVICE_SRP_AUTH": raise RuntimeError( f"Expected DEVICE_SRP_AUTH challenge but got {response_init['ChallengeName']}." ) auth_params = srp_helper.get_auth_params() auth_params["DEVICE_KEY"] = device_key response_auth = self.cognito_idp_client.respond_to_auth_challenge( ClientId=self.client_id, ChallengeName="DEVICE_SRP_AUTH", ChallengeResponses=auth_params, ) if response_auth["ChallengeName"] != "DEVICE_PASSWORD_VERIFIER": raise RuntimeError( f"Expected DEVICE_PASSWORD_VERIFIER challenge but got " f"{response_init['ChallengeName']}." ) challenge_params = response_auth["ChallengeParameters"] challenge_params["USER_ID_FOR_SRP"] = device_group_key + device_key cr = srp_helper.process_challenge(challenge_params, {"USERNAME": user_name}) cr["USERNAME"] = user_name cr["DEVICE_KEY"] = device_key response_verifier = self.cognito_idp_client.respond_to_auth_challenge( ClientId=self.client_id, ChallengeName="DEVICE_PASSWORD_VERIFIER", ChallengeResponses=cr, ) auth_tokens = response_verifier["AuthenticationResult"] except ClientError as err: logger.error( "Couldn't start client sign in for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return auth_tokens

以下代码示例演示如何使用 SignUp

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def sign_up_user(self, user_name, password, user_email): """ Signs up a new user with Amazon Cognito. This action prompts Amazon Cognito to send an email to the specified email address. The email contains a code that can be used to confirm the user. When the user already exists, the user status is checked to determine whether the user has been confirmed. :param user_name: The user name that identifies the new user. :param password: The password for the new user. :param user_email: The email address for the new user. :return: True when the user is already confirmed with Amazon Cognito. Otherwise, false. """ try: kwargs = { "ClientId": self.client_id, "Username": user_name, "Password": password, "UserAttributes": [{"Name": "email", "Value": user_email}], } if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) response = self.cognito_idp_client.sign_up(**kwargs) confirmed = response["UserConfirmed"] except ClientError as err: if err.response["Error"]["Code"] == "UsernameExistsException": response = self.cognito_idp_client.admin_get_user( UserPoolId=self.user_pool_id, Username=user_name ) logger.warning( "User %s exists and is %s.", user_name, response["UserStatus"] ) confirmed = response["UserStatus"] == "CONFIRMED" else: logger.error( "Couldn't sign up %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return confirmed
  • 有关API详细信息,请参阅SignUp中的 AWS SDKPython (Boto3) API 参考。

以下代码示例演示如何使用 VerifySoftwareToken

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def verify_mfa(self, session, user_code): """ Verify a new MFA application that is associated with a user. :param session: Session information returned from a previous call to initiate authentication. :param user_code: A code generated by the associated MFA application. :return: Status that indicates whether the MFA application is verified. """ try: response = self.cognito_idp_client.verify_software_token( Session=session, UserCode=user_code ) except ClientError as err: logger.error( "Couldn't verify MFA. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: response.pop("ResponseMetadata", None) return response
  • 有关API详细信息,请参阅VerifySoftwareToken中的 AWS SDKPython (Boto3) API 参考。

场景

以下代码示例展示了如何:

  • 使用用户名、密码和电子邮件地址注册和确认用户。

  • 通过将MFA应用程序与用户关联来设置多因素身份验证。

  • 使用密码和密码登录。MFA

SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

创建一个包含场景中使用的 Amazon Cognito 函数的类。

class CognitoIdentityProviderWrapper: """Encapsulates Amazon Cognito actions""" def __init__(self, cognito_idp_client, user_pool_id, client_id, client_secret=None): """ :param cognito_idp_client: A Boto3 Amazon Cognito Identity Provider client. :param user_pool_id: The ID of an existing Amazon Cognito user pool. :param client_id: The ID of a client application registered with the user pool. :param client_secret: The client secret, if the client has a secret. """ self.cognito_idp_client = cognito_idp_client self.user_pool_id = user_pool_id self.client_id = client_id self.client_secret = client_secret def _secret_hash(self, user_name): """ Calculates a secret hash from a user name and a client secret. :param user_name: The user name to use when calculating the hash. :return: The secret hash. """ key = self.client_secret.encode() msg = bytes(user_name + self.client_id, "utf-8") secret_hash = base64.b64encode( hmac.new(key, msg, digestmod=hashlib.sha256).digest() ).decode() logger.info("Made secret hash for %s: %s.", user_name, secret_hash) return secret_hash def sign_up_user(self, user_name, password, user_email): """ Signs up a new user with Amazon Cognito. This action prompts Amazon Cognito to send an email to the specified email address. The email contains a code that can be used to confirm the user. When the user already exists, the user status is checked to determine whether the user has been confirmed. :param user_name: The user name that identifies the new user. :param password: The password for the new user. :param user_email: The email address for the new user. :return: True when the user is already confirmed with Amazon Cognito. Otherwise, false. """ try: kwargs = { "ClientId": self.client_id, "Username": user_name, "Password": password, "UserAttributes": [{"Name": "email", "Value": user_email}], } if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) response = self.cognito_idp_client.sign_up(**kwargs) confirmed = response["UserConfirmed"] except ClientError as err: if err.response["Error"]["Code"] == "UsernameExistsException": response = self.cognito_idp_client.admin_get_user( UserPoolId=self.user_pool_id, Username=user_name ) logger.warning( "User %s exists and is %s.", user_name, response["UserStatus"] ) confirmed = response["UserStatus"] == "CONFIRMED" else: logger.error( "Couldn't sign up %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return confirmed def resend_confirmation(self, user_name): """ Prompts Amazon Cognito to resend an email with a new confirmation code. :param user_name: The name of the user who will receive the email. :return: Delivery information about where the email is sent. """ try: kwargs = {"ClientId": self.client_id, "Username": user_name} if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) response = self.cognito_idp_client.resend_confirmation_code(**kwargs) delivery = response["CodeDeliveryDetails"] except ClientError as err: logger.error( "Couldn't resend confirmation to %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return delivery def confirm_user_sign_up(self, user_name, confirmation_code): """ Confirms a previously created user. A user must be confirmed before they can sign in to Amazon Cognito. :param user_name: The name of the user to confirm. :param confirmation_code: The confirmation code sent to the user's registered email address. :return: True when the confirmation succeeds. """ try: kwargs = { "ClientId": self.client_id, "Username": user_name, "ConfirmationCode": confirmation_code, } if self.client_secret is not None: kwargs["SecretHash"] = self._secret_hash(user_name) self.cognito_idp_client.confirm_sign_up(**kwargs) except ClientError as err: logger.error( "Couldn't confirm sign up for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return True def list_users(self): """ Returns a list of the users in the current user pool. :return: The list of users. """ try: response = self.cognito_idp_client.list_users(UserPoolId=self.user_pool_id) users = response["Users"] except ClientError as err: logger.error( "Couldn't list users for %s. Here's why: %s: %s", self.user_pool_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return users def start_sign_in(self, user_name, password): """ Starts the sign-in process for a user by using administrator credentials. This method of signing in is appropriate for code running on a secure server. If the user pool is configured to require MFA and this is the first sign-in for the user, Amazon Cognito returns a challenge response to set up an MFA application. When this occurs, this function gets an MFA secret from Amazon Cognito and returns it to the caller. :param user_name: The name of the user to sign in. :param password: The user's password. :return: The result of the sign-in attempt. When sign-in is successful, this returns an access token that can be used to get AWS credentials. Otherwise, Amazon Cognito returns a challenge to set up an MFA application, or a challenge to enter an MFA code from a registered MFA application. """ try: kwargs = { "UserPoolId": self.user_pool_id, "ClientId": self.client_id, "AuthFlow": "ADMIN_USER_PASSWORD_AUTH", "AuthParameters": {"USERNAME": user_name, "PASSWORD": password}, } if self.client_secret is not None: kwargs["AuthParameters"]["SECRET_HASH"] = self._secret_hash(user_name) response = self.cognito_idp_client.admin_initiate_auth(**kwargs) challenge_name = response.get("ChallengeName", None) if challenge_name == "MFA_SETUP": if ( "SOFTWARE_TOKEN_MFA" in response["ChallengeParameters"]["MFAS_CAN_SETUP"] ): response.update(self.get_mfa_secret(response["Session"])) else: raise RuntimeError( "The user pool requires MFA setup, but the user pool is not " "configured for TOTP MFA. This example requires TOTP MFA." ) except ClientError as err: logger.error( "Couldn't start sign in for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: response.pop("ResponseMetadata", None) return response def get_mfa_secret(self, session): """ Gets a token that can be used to associate an MFA application with the user. :param session: Session information returned from a previous call to initiate authentication. :return: An MFA token that can be used to set up an MFA application. """ try: response = self.cognito_idp_client.associate_software_token(Session=session) except ClientError as err: logger.error( "Couldn't get MFA secret. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: response.pop("ResponseMetadata", None) return response def verify_mfa(self, session, user_code): """ Verify a new MFA application that is associated with a user. :param session: Session information returned from a previous call to initiate authentication. :param user_code: A code generated by the associated MFA application. :return: Status that indicates whether the MFA application is verified. """ try: response = self.cognito_idp_client.verify_software_token( Session=session, UserCode=user_code ) except ClientError as err: logger.error( "Couldn't verify MFA. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: response.pop("ResponseMetadata", None) return response def respond_to_mfa_challenge(self, user_name, session, mfa_code): """ Responds to a challenge for an MFA code. This completes the second step of a two-factor sign-in. When sign-in is successful, it returns an access token that can be used to get AWS credentials from Amazon Cognito. :param user_name: The name of the user who is signing in. :param session: Session information returned from a previous call to initiate authentication. :param mfa_code: A code generated by the associated MFA application. :return: The result of the authentication. When successful, this contains an access token for the user. """ try: kwargs = { "UserPoolId": self.user_pool_id, "ClientId": self.client_id, "ChallengeName": "SOFTWARE_TOKEN_MFA", "Session": session, "ChallengeResponses": { "USERNAME": user_name, "SOFTWARE_TOKEN_MFA_CODE": mfa_code, }, } if self.client_secret is not None: kwargs["ChallengeResponses"]["SECRET_HASH"] = self._secret_hash( user_name ) response = self.cognito_idp_client.admin_respond_to_auth_challenge(**kwargs) auth_result = response["AuthenticationResult"] except ClientError as err: if err.response["Error"]["Code"] == "ExpiredCodeException": logger.warning( "Your MFA code has expired or has been used already. You might have " "to wait a few seconds until your app shows you a new code." ) else: logger.error( "Couldn't respond to mfa challenge for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return auth_result def confirm_mfa_device( self, user_name, device_key, device_group_key, device_password, access_token, aws_srp, ): """ Confirms an MFA device to be tracked by Amazon Cognito. When a device is tracked, its key and password can be used to sign in without requiring a new MFA code from the MFA application. :param user_name: The user that is associated with the device. :param device_key: The key of the device, returned by Amazon Cognito. :param device_group_key: The group key of the device, returned by Amazon Cognito. :param device_password: The password that is associated with the device. :param access_token: The user's access token. :param aws_srp: A class that helps with Secure Remote Password (SRP) calculations. The scenario associated with this example uses the warrant package. :return: True when the user must confirm the device. Otherwise, False. When False, the device is automatically confirmed and tracked. """ srp_helper = aws_srp.AWSSRP( username=user_name, password=device_password, pool_id="_", client_id=self.client_id, client_secret=None, client=self.cognito_idp_client, ) device_and_pw = f"{device_group_key}{device_key}:{device_password}" device_and_pw_hash = aws_srp.hash_sha256(device_and_pw.encode("utf-8")) salt = aws_srp.pad_hex(aws_srp.get_random(16)) x_value = aws_srp.hex_to_long(aws_srp.hex_hash(salt + device_and_pw_hash)) verifier = aws_srp.pad_hex(pow(srp_helper.val_g, x_value, srp_helper.big_n)) device_secret_verifier_config = { "PasswordVerifier": base64.standard_b64encode( bytearray.fromhex(verifier) ).decode("utf-8"), "Salt": base64.standard_b64encode(bytearray.fromhex(salt)).decode("utf-8"), } try: response = self.cognito_idp_client.confirm_device( AccessToken=access_token, DeviceKey=device_key, DeviceSecretVerifierConfig=device_secret_verifier_config, ) user_confirm = response["UserConfirmationNecessary"] except ClientError as err: logger.error( "Couldn't confirm mfa device %s. Here's why: %s: %s", device_key, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return user_confirm def sign_in_with_tracked_device( self, user_name, password, device_key, device_group_key, device_password, aws_srp, ): """ Signs in to Amazon Cognito as a user who has a tracked device. Signing in with a tracked device lets a user sign in without entering a new MFA code. Signing in with a tracked device requires that the client respond to the SRP protocol. The scenario associated with this example uses the warrant package to help with SRP calculations. For more information on SRP, see https://en.wikipedia.org/wiki/Secure_Remote_Password_protocol. :param user_name: The user that is associated with the device. :param password: The user's password. :param device_key: The key of a tracked device. :param device_group_key: The group key of a tracked device. :param device_password: The password that is associated with the device. :param aws_srp: A class that helps with SRP calculations. The scenario associated with this example uses the warrant package. :return: The result of the authentication. When successful, this contains an access token for the user. """ try: srp_helper = aws_srp.AWSSRP( username=user_name, password=device_password, pool_id="_", client_id=self.client_id, client_secret=None, client=self.cognito_idp_client, ) response_init = self.cognito_idp_client.initiate_auth( ClientId=self.client_id, AuthFlow="USER_PASSWORD_AUTH", AuthParameters={ "USERNAME": user_name, "PASSWORD": password, "DEVICE_KEY": device_key, }, ) if response_init["ChallengeName"] != "DEVICE_SRP_AUTH": raise RuntimeError( f"Expected DEVICE_SRP_AUTH challenge but got {response_init['ChallengeName']}." ) auth_params = srp_helper.get_auth_params() auth_params["DEVICE_KEY"] = device_key response_auth = self.cognito_idp_client.respond_to_auth_challenge( ClientId=self.client_id, ChallengeName="DEVICE_SRP_AUTH", ChallengeResponses=auth_params, ) if response_auth["ChallengeName"] != "DEVICE_PASSWORD_VERIFIER": raise RuntimeError( f"Expected DEVICE_PASSWORD_VERIFIER challenge but got " f"{response_init['ChallengeName']}." ) challenge_params = response_auth["ChallengeParameters"] challenge_params["USER_ID_FOR_SRP"] = device_group_key + device_key cr = srp_helper.process_challenge(challenge_params, {"USERNAME": user_name}) cr["USERNAME"] = user_name cr["DEVICE_KEY"] = device_key response_verifier = self.cognito_idp_client.respond_to_auth_challenge( ClientId=self.client_id, ChallengeName="DEVICE_PASSWORD_VERIFIER", ChallengeResponses=cr, ) auth_tokens = response_verifier["AuthenticationResult"] except ClientError as err: logger.error( "Couldn't start client sign in for %s. Here's why: %s: %s", user_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return auth_tokens

创建运行场景的类。此示例还注册了要由 Amazon Cognito 跟踪的MFA设备,并向您展示了如何使用被跟踪设备中的密码和信息登录。这样就无需输入新MFA代码。

def run_scenario(cognito_idp_client, user_pool_id, client_id): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the Amazon Cognito user signup with MFA demo.") print("-" * 88) cog_wrapper = CognitoIdentityProviderWrapper( cognito_idp_client, user_pool_id, client_id ) user_name = q.ask("Let's sign up a new user. Enter a user name: ", q.non_empty) password = q.ask("Enter a password for the user: ", q.non_empty) email = q.ask("Enter a valid email address that you own: ", q.non_empty) confirmed = cog_wrapper.sign_up_user(user_name, password, email) while not confirmed: print( f"User {user_name} requires confirmation. Check {email} for " f"a verification code." ) confirmation_code = q.ask("Enter the confirmation code from the email: ") if not confirmation_code: if q.ask("Do you need another confirmation code (y/n)? ", q.is_yesno): delivery = cog_wrapper.resend_confirmation(user_name) print( f"Confirmation code sent by {delivery['DeliveryMedium']} " f"to {delivery['Destination']}." ) else: confirmed = cog_wrapper.confirm_user_sign_up(user_name, confirmation_code) print(f"User {user_name} is confirmed and ready to use.") print("-" * 88) print("Let's get a list of users in the user pool.") q.ask("Press Enter when you're ready.") users = cog_wrapper.list_users() if users: print(f"Found {len(users)} users:") pp(users) else: print("No users found.") print("-" * 88) print("Let's sign in and get an access token.") auth_tokens = None challenge = "ADMIN_USER_PASSWORD_AUTH" response = {} while challenge is not None: if challenge == "ADMIN_USER_PASSWORD_AUTH": response = cog_wrapper.start_sign_in(user_name, password) challenge = response["ChallengeName"] elif response["ChallengeName"] == "MFA_SETUP": print("First, we need to set up an MFA application.") qr_img = qrcode.make( f"otpauth://totp/{user_name}?secret={response['SecretCode']}" ) qr_img.save("qr.png") q.ask( "Press Enter to see a QR code on your screen. Scan it into an MFA " "application, such as Google Authenticator." ) webbrowser.open("qr.png") mfa_code = q.ask( "Enter the verification code from your MFA application: ", q.non_empty ) response = cog_wrapper.verify_mfa(response["Session"], mfa_code) print(f"MFA device setup {response['Status']}") print("Now that an MFA application is set up, let's sign in again.") print( "You might have to wait a few seconds for a new MFA code to appear in " "your MFA application." ) challenge = "ADMIN_USER_PASSWORD_AUTH" elif response["ChallengeName"] == "SOFTWARE_TOKEN_MFA": auth_tokens = None while auth_tokens is None: mfa_code = q.ask( "Enter a verification code from your MFA application: ", q.non_empty ) auth_tokens = cog_wrapper.respond_to_mfa_challenge( user_name, response["Session"], mfa_code ) print(f"You're signed in as {user_name}.") print("Here's your access token:") pp(auth_tokens["AccessToken"]) print("And your device information:") pp(auth_tokens["NewDeviceMetadata"]) challenge = None else: raise Exception(f"Got unexpected challenge {response['ChallengeName']}") print("-" * 88) device_group_key = auth_tokens["NewDeviceMetadata"]["DeviceGroupKey"] device_key = auth_tokens["NewDeviceMetadata"]["DeviceKey"] device_password = base64.standard_b64encode(os.urandom(40)).decode("utf-8") print("Let's confirm your MFA device so you don't have re-enter MFA tokens for it.") q.ask("Press Enter when you're ready.") cog_wrapper.confirm_mfa_device( user_name, device_key, device_group_key, device_password, auth_tokens["AccessToken"], aws_srp, ) print(f"Your device {device_key} is confirmed.") print("-" * 88) print( f"Now let's sign in as {user_name} from your confirmed device {device_key}.\n" f"Because this device is tracked by Amazon Cognito, you won't have to re-enter an MFA code." ) q.ask("Press Enter when ready.") auth_tokens = cog_wrapper.sign_in_with_tracked_device( user_name, password, device_key, device_group_key, device_password, aws_srp ) print("You're signed in. Your access token is:") pp(auth_tokens["AccessToken"]) print("-" * 88) print("Don't forget to delete your user pool when you're done with this example.") print("\nThanks for watching!") print("-" * 88) def main(): parser = argparse.ArgumentParser( description="Shows how to sign up a new user with Amazon Cognito and associate " "the user with an MFA application for multi-factor authentication." ) parser.add_argument( "user_pool_id", help="The ID of the user pool to use for the example." ) parser.add_argument( "client_id", help="The ID of the client application to use for the example." ) args = parser.parse_args() try: run_scenario(boto3.client("cognito-idp"), args.user_pool_id, args.client_id) except Exception: logging.exception("Something went wrong with the demo.") if __name__ == "__main__": main()