This sample uses the OpenSearchService
import boto3
from botocore.config import Config
import time
# Build the client using the default credential configuration.
# You can use the CLI and run 'aws configure' to set access key, secret
# key, and default Region.
DOMAIN_NAME = '' # The name of the domain to upgrade
TARGET_VERSION = '' # The version you want to upgrade the domain to. For example, OpenSearch_1.1
my_config = Config(
# Optionally lets you specify a Region other than your default.
region_name='us-east-1'
)
client = boto3.client('opensearch', config=my_config)
def check_versions():
"""Determine whether domain is eligible for upgrade"""
response = client.get_compatible_versions(
DomainName=DOMAIN_NAME
)
compatible_versions = response['CompatibleVersions']
for i in range(len(compatible_versions)):
if TARGET_VERSION in compatible_versions[i]["TargetVersions"]:
print('Domain is eligible for upgrade to ' + TARGET_VERSION)
upgrade_domain()
print(response)
else:
print('Domain not eligible for upgrade to ' + TARGET_VERSION)
def upgrade_domain():
"""Upgrades the domain"""
response = client.upgrade_domain(
DomainName=DOMAIN_NAME,
TargetVersion=TARGET_VERSION
)
print('Upgrading domain to ' + TARGET_VERSION + '...' + response)
time.sleep(5)
wait_for_upgrade()
def wait_for_upgrade():
"""Get the status of the upgrade"""
response = client.get_upgrade_status(
DomainName=DOMAIN_NAME
)
if (response['UpgradeStep']) == 'UPGRADE' and (response['StepStatus']) == 'SUCCEEDED':
print('Domain successfully upgraded to ' + TARGET_VERSION)
elif (response['StepStatus']) == 'FAILED':
print('Upgrade failed. Please try again.')
elif (response['StepStatus']) == 'SUCCEEDED_WITH_ISSUES':
print('Upgrade succeeded with issues')
elif (response['StepStatus']) == 'IN_PROGRESS':
time.sleep(30)
wait_for_upgrade()
def main():
check_versions()
if __name__ == "__main__":
main()