完整的 EventBridge 日程安排器计划事件工作流程,使用 AWS SDK - AWS SDK代码示例

AWS 文档 AWS SDK示例 GitHub 存储库中还有更多SDK示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

完整的 EventBridge 日程安排器计划事件工作流程,使用 AWS SDK

以下代码示例演示了如何:

  • 部署包含所需资源的 AWS CloudFormation 堆栈。

  • 创建 EventBridge 日程安排组。

  • 创建具有灵活时间 EventBridge 范围的一次性日程安排。

  • 创建具有指定速率的定期 EventBridge 日程安排。

  • 删除 “ EventBridge 日程安排器” 和 “日程组”。

  • 清理资源并删除堆栈。

.NET
AWS SDK for .NET
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

运行工作流程。

using System.Text.RegularExpressions; using Amazon.CloudFormation; using Amazon.CloudFormation.Model; using Amazon.Scheduler; using Amazon.Scheduler.Model; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Console; using Microsoft.Extensions.Logging.Debug; using SchedulerActions; using Exception = System.Exception; namespace SchedulerScenario; public class SchedulerWorkflow { /* Before running this .NET code example, set up your development environment, including your credentials. This .NET code example performs the following tasks for the Amazon EventBridge Scheduler workflow: 1. Prepare the Application: - Prompt the user for an email address to use for the subscription for the SNS topic subscription. - Prompt the user for a name for the Cloud Formation stack. - Deploy the Cloud Formation template in resources/cfn_template.yaml for resource creation. - Store the outputs of the stack into variables for use in the workflow. - Create a schedule group for all workflow schedules. 2. Create one-time Schedule: - Create a one-time schedule to send an initial event. - Use a Flexible Time Window and set the schedule to delete after completion. - Wait for the user to receive the event email from SNS. 3. Create a time-based schedule: - Prompt the user for how many X times per Y hours a recurring event should be scheduled. - Create the scheduled event for X times per hour for Y hours. - Wait for the user to receive the event email from SNS. - Delete the schedule when the user is finished. 4. Clean up: - Prompt the user for y/n answer if they want to destroy the stack and clean up all resources. - Delete the schedule group. - Destroy the Cloud Formation stack and wait until the stack has been removed. */ public static ILogger<SchedulerWorkflow> _logger = null!; public static SchedulerWrapper _schedulerWrapper = null!; public static IAmazonCloudFormation _amazonCloudFormation = null!; private static string _roleArn = null!; private static string _snsTopicArn = null!; public static bool _interactive = true; private static string _stackName = "default-scheduler-workflow-stack-name"; private static string _scheduleGroupName = "workflow-schedules-group"; private static string _stackResourcePath = "../../../../../../workflows/eventbridge_scheduler/resources/cfn_template.yaml"; public static async Task Main(string[] args) { using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonScheduler>() .AddAWSService<IAmazonCloudFormation>() .AddTransient<SchedulerWrapper>() ) .Build(); if (_interactive) { _logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<SchedulerWorkflow>(); _schedulerWrapper = host.Services.GetRequiredService<SchedulerWrapper>(); _amazonCloudFormation = host.Services.GetRequiredService<IAmazonCloudFormation>(); } Console.WriteLine(new string('-', 80)); Console.WriteLine("Welcome to the Amazon EventBridge Scheduler Workflow."); Console.WriteLine(new string('-', 80)); try { Console.WriteLine(new string('-', 80)); var prepareSuccess = await PrepareApplication(); Console.WriteLine(new string('-', 80)); if (prepareSuccess) { Console.WriteLine(new string('-', 80)); await CreateOneTimeSchedule(); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); await CreateRecurringSchedule(); Console.WriteLine(new string('-', 80)); } Console.WriteLine(new string('-', 80)); await Cleanup(); Console.WriteLine(new string('-', 80)); } catch (Exception ex) { _logger.LogError(ex, "There was a problem with the workflow, initiating cleanup..."); _interactive = false; await Cleanup(); } Console.WriteLine("Amazon EventBridge Scheduler workflow completed."); } /// <summary> /// Prepares the application by creating the necessary resources. /// </summary> /// <returns>True if the application was prepared successfully.</returns> public static async Task<bool> PrepareApplication() { Console.WriteLine("Preparing the application..."); try { // Prompt the user for an email address to use for the subscription. Console.WriteLine("\nThis example creates resources in a CloudFormation stack, including an SNS topic" + "\nthat will be subscribed to the EventBridge Scheduler events. " + "\n\nYou will need to confirm the subscription in order to receive event emails. "); var emailAddress = PromptUserForEmail(); // Prompt the user for a name for the CloudFormation stack _stackName = PromptUserForStackName(); // Deploy the CloudFormation stack var deploySuccess = await DeployCloudFormationStack(_stackName, emailAddress); if (deploySuccess) { // Create a schedule group for all workflow schedules await _schedulerWrapper.CreateScheduleGroupAsync(_scheduleGroupName); Console.WriteLine("Application preparation complete."); return true; } } catch (Exception ex) { _logger.LogError(ex, "An error occurred while preparing the application."); } Console.WriteLine("Application preparation failed."); return false; } /// <summary> /// Deploys the CloudFormation stack with the necessary resources. /// </summary> /// <param name="stackName">The name of the CloudFormation stack.</param> /// <param name="email">The email to use for the subscription.</param> /// <returns>True if the stack was deployed successfully.</returns> private static async Task<bool> DeployCloudFormationStack(string stackName, string email) { Console.WriteLine($"\nDeploying CloudFormation stack: {stackName}"); try { var request = new CreateStackRequest { StackName = stackName, TemplateBody = await File.ReadAllTextAsync(_stackResourcePath), Capabilities = { Capability.CAPABILITY_NAMED_IAM } }; // If an email is provided, set the parameter. if (!string.IsNullOrWhiteSpace(email)) { request.Parameters = new List<Parameter>() { new() { ParameterKey = "email", ParameterValue = email } }; } var response = await _amazonCloudFormation.CreateStackAsync(request); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { Console.WriteLine($"CloudFormation stack creation started: {stackName}"); // Wait for the stack to be in CREATE_COMPLETE state bool stackCreated = await WaitForStackCompletion(response.StackId); if (stackCreated) { // Retrieve the output values var success = await GetStackOutputs(response.StackId); return success; } else { _logger.LogError($"CloudFormation stack creation failed: {stackName}"); return false; } } else { _logger.LogError($"Failed to create CloudFormation stack: {stackName}"); return false; } } catch (AlreadyExistsException) { _logger.LogWarning($"CloudFormation stack '{stackName}' already exists. Please provide a unique name."); var newStackName = PromptUserForStackName(); return await DeployCloudFormationStack(newStackName, email); } catch (Exception ex) { _logger.LogError(ex, $"An error occurred while deploying the CloudFormation stack: {stackName}"); return false; } } /// <summary> /// Waits for the CloudFormation stack to be in the CREATE_COMPLETE state. /// </summary> /// <param name="client">The CloudFormation client.</param> /// <param name="stackId">The ID of the CloudFormation stack.</param> /// <returns>True if the stack was created successfully.</returns> private static async Task<bool> WaitForStackCompletion(string stackId) { int retryCount = 0; const int maxRetries = 10; const int retryDelay = 30000; // 30 seconds. while (retryCount < maxRetries) { var describeStacksRequest = new DescribeStacksRequest { StackName = stackId }; var describeStacksResponse = await _amazonCloudFormation.DescribeStacksAsync(describeStacksRequest); if (describeStacksResponse.Stacks.Count > 0) { if (describeStacksResponse.Stacks[0].StackStatus == StackStatus.CREATE_COMPLETE) { Console.WriteLine("CloudFormation stack creation complete."); return true; } if (describeStacksResponse.Stacks[0].StackStatus == StackStatus.CREATE_FAILED || describeStacksResponse.Stacks[0].StackStatus == StackStatus.ROLLBACK_COMPLETE) { Console.WriteLine("CloudFormation stack creation failed."); return false; } } Console.WriteLine("Waiting for CloudFormation stack creation to complete..."); await Task.Delay(retryDelay); retryCount++; } _logger.LogError("Timed out waiting for CloudFormation stack creation to complete."); return false; } /// <summary> /// Retrieves the output values from the CloudFormation stack. /// </summary> /// <param name="stackId">The ID of the CloudFormation stack.</param> private static async Task<bool> GetStackOutputs(string stackId) { try { var describeStacksRequest = new DescribeStacksRequest { StackName = stackId }; var describeStacksResponse = await _amazonCloudFormation.DescribeStacksAsync(describeStacksRequest); if (describeStacksResponse.Stacks.Count > 0) { var stack = describeStacksResponse.Stacks[0]; _roleArn = GetStackOutputValue(stack, "RoleARN"); _snsTopicArn = GetStackOutputValue(stack, "SNStopicARN"); return true; } else { _logger.LogError($"No stack found for stack outputs: {stackId}"); return false; } } catch (Exception ex) { _logger.LogError( ex, $"Failed to retrieve CloudFormation stack outputs: {stackId}"); return false; } } /// <summary> /// Get an output value by key from a CloudFormation stack. /// </summary> /// <param name="stack">The CloudFormation stack.</param> /// <param name="outputKey">The key of the output.</param> /// <returns>The value as a string.</returns> private static string GetStackOutputValue(Stack stack, string outputKey) { var output = stack.Outputs.First(o => o.OutputKey == outputKey); var outputValue = output.OutputValue; Console.WriteLine($"Stack output {outputKey}: {outputValue}"); return outputValue; } /// <summary> /// Creates a one-time schedule to send an initial event. /// </summary> /// <returns>True if the one-time schedule was created successfully.</returns> public static async Task<bool> CreateOneTimeSchedule() { var scheduleName = PromptUserForResourceName("Enter a name for the one-time schedule:"); Console.WriteLine($"Creating a one-time schedule named '{scheduleName}' " + $"\nto send an initial event in 1 minute with a flexible time window..."); try { // Create a one-time schedule with a flexible time // window set to delete after completion. // You may also set a timezone instead of using UTC. var scheduledTime = DateTime.UtcNow.AddMinutes(1).ToString("s"); var createSuccess = await _schedulerWrapper.CreateScheduleAsync( scheduleName, $"at({scheduledTime})", _scheduleGroupName, _snsTopicArn, _roleArn, $"One time scheduled event test from schedule {scheduleName}.", true, useFlexibleTimeWindow: true); Console.WriteLine($"Subscription email will receive an email from this event."); Console.WriteLine($"You must confirm your subscription to receive event emails."); Console.WriteLine($"One-time schedule '{scheduleName}' created successfully."); return createSuccess; } catch (ResourceNotFoundException ex) { _logger.LogError(ex, $"The target with ARN '{_snsTopicArn}' was not found."); return false; } catch (Exception ex) { _logger.LogError(ex, $"An error occurred while creating the one-time schedule '{scheduleName}'."); return false; } } /// <summary> /// Create a recurring schedule to send events at a specified rate in minutes. /// </summary> /// <returns>True if the recurring schedule was created successfully.</returns> public static async Task<bool> CreateRecurringSchedule() { Console.WriteLine("Creating a recurring schedule to send events for one hour..."); try { // Prompt the user for a schedule name. var scheduleName = PromptUserForResourceName("Enter a name for the recurring schedule: "); // Prompt the user for the schedule rate (in minutes). var scheduleRateInMinutes = PromptUserForInteger("Enter the desired schedule rate (in minutes): "); // Create the recurring schedule. var createSuccess = await _schedulerWrapper.CreateScheduleAsync( scheduleName, $"rate({scheduleRateInMinutes} minutes)", _scheduleGroupName, _snsTopicArn, _roleArn, $"Recurrent event test from schedule {scheduleName}."); Console.WriteLine($"Subscription email will receive an email from this event."); Console.WriteLine($"You must confirm your subscription to receive event emails."); // Delete the schedule when the user is finished. if (!_interactive || GetYesNoResponse($"Are you ready to delete the '{scheduleName}' schedule? (y/n)")) { await _schedulerWrapper.DeleteScheduleAsync(scheduleName, _scheduleGroupName); } return createSuccess; } catch (ResourceNotFoundException ex) { _logger.LogError(ex, $"The target with ARN '{_snsTopicArn}' was not found."); return false; } catch (Exception ex) { _logger.LogError(ex, "An error occurred while creating the recurring schedule."); return false; } } /// <summary> /// Cleans up the resources created during the workflow. /// </summary> /// <returns>True if the cleanup was successful.</returns> public static async Task<bool> Cleanup() { // Prompt the user to confirm cleanup. var cleanup = !_interactive || GetYesNoResponse( "Do you want to delete all resources created by this workflow? (y/n) "); if (cleanup) { try { // Delete the schedule group. var groupDeleteSuccess = await _schedulerWrapper.DeleteScheduleGroupAsync(_scheduleGroupName); // Destroy the CloudFormation stack and wait for it to be removed. var stackDeleteSuccess = await DeleteCloudFormationStack(_stackName, false); return groupDeleteSuccess && stackDeleteSuccess; } catch (Exception ex) { _logger.LogError(ex, "An error occurred while cleaning up the resources."); return false; } } _logger.LogInformation("EventBridge Scheduler workflow is complete."); return true; } /// <summary> /// Delete the resources in the stack and wait for confirmation. /// </summary> /// <param name="stackName">The name of the stack.</param> /// <param name="forceDelete">True to force delete the stack.</param> /// <returns>True if successful.</returns> private static async Task<bool> DeleteCloudFormationStack(string stackName, bool forceDelete) { var request = new DeleteStackRequest { StackName = stackName, }; if (forceDelete) { request.DeletionMode = DeletionMode.FORCE_DELETE_STACK; } await _amazonCloudFormation.DeleteStackAsync(request); Console.WriteLine($"CloudFormation stack '{_stackName}' is being deleted. This may take a few minutes."); bool stackDeleted = await WaitForStackDeletion(_stackName, forceDelete); if (stackDeleted) { Console.WriteLine($"CloudFormation stack '{_stackName}' has been deleted."); return true; } else { _logger.LogError($"Failed to delete CloudFormation stack '{_stackName}'."); return false; } } /// <summary> /// Wait for the stack to be deleted. /// </summary> /// <param name="stackName">The name of the stack.</param> /// <param name="forceDelete">True to force delete the stack.</param> /// <returns>True if successful.</returns> private static async Task<bool> WaitForStackDeletion(string stackName, bool forceDelete) { int retryCount = 0; const int maxRetries = 10; const int retryDelay = 30000; // 30 seconds while (retryCount < maxRetries) { var describeStacksRequest = new DescribeStacksRequest { StackName = stackName }; try { var describeStacksResponse = await _amazonCloudFormation.DescribeStacksAsync(describeStacksRequest); if (describeStacksResponse.Stacks.Count == 0 || describeStacksResponse.Stacks[0].StackStatus == StackStatus.DELETE_COMPLETE) { return true; } if (!forceDelete && describeStacksResponse.Stacks[0].StackStatus == StackStatus.DELETE_FAILED) { // Try one time to force delete. return await DeleteCloudFormationStack(stackName, true); } } catch (AmazonCloudFormationException ex) when (ex.ErrorCode == "ValidationError") { // Stack does not exist, so it has been successfully deleted. return true; } Console.WriteLine($"Waiting for CloudFormation stack '{stackName}' to be deleted..."); await Task.Delay(retryDelay); retryCount++; } _logger.LogError($"Timed out waiting for CloudFormation stack '{stackName}' to be deleted."); return false; } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } /// <summary> /// Prompt the user for a valid email address. /// </summary> /// <returns>The valid email address.</returns> private static string PromptUserForEmail() { if (_interactive) { Console.WriteLine("Enter an email address to use for event subscriptions: "); string email = Console.ReadLine()!; if (!IsValidEmail(email)) { Console.WriteLine("Invalid email address. Please try again."); return PromptUserForEmail(); } return email; } // Used when running without user prompts. return ""; } /// <summary> /// Prompt the user for a non-empty stack name. /// </summary> /// <returns>The valid stack name</returns> private static string PromptUserForStackName() { Console.WriteLine("Enter a name for the AWS Cloud Formation Stack: "); if (_interactive) { string stackName = Console.ReadLine()!; var regex = "[a-zA-Z][-a-zA-Z0-9]|arn:[-a-zA-Z0-9:/._+]"; if (!Regex.IsMatch(stackName, regex)) { Console.WriteLine( $"Invalid stack name. Please use a name that matches the pattern {regex}."); return PromptUserForStackName(); } return stackName; } // Used when running without user prompts. return _stackName; } /// <summary> /// Prompt the user for a non-empty resource name. /// </summary> /// <returns>The valid stack name</returns> private static string PromptUserForResourceName(string prompt) { if (_interactive) { Console.WriteLine(prompt); string resourceName = Console.ReadLine()!; var regex = "[0-9a-zA-Z-_.]+"; if (!Regex.IsMatch(resourceName, regex)) { Console.WriteLine($"Invalid resource name. Please use a name that matches the pattern {regex}."); return PromptUserForResourceName(prompt); } return resourceName!; } // Used when running without user prompts. return "resource-" + Guid.NewGuid(); } /// <summary> /// Prompt the user for a non-empty resource name. /// </summary> /// <returns>The valid stack name</returns> private static int PromptUserForInteger(string prompt) { if (_interactive) { Console.WriteLine(prompt); string stringResponse = Console.ReadLine()!; if (string.IsNullOrWhiteSpace(stringResponse) || !Int32.TryParse(stringResponse, out var intResponse)) { Console.WriteLine($"Invalid integer. "); return PromptUserForInteger(prompt); } return intResponse!; } // Used when running without user prompts. return 1; } /// <summary> /// Use System Mail to check for a valid email address. /// </summary> /// <param name="email">The string to verify.</param> /// <returns>True if a valid email address.</returns> private static bool IsValidEmail(string email) { try { var mailAddress = new System.Net.Mail.MailAddress(email); return mailAddress.Address == email; } catch { // Invalid emails will cause an exception, return false. return false; } } }

服务操作的封装器。

using Amazon.Scheduler; using Amazon.Scheduler.Model; using Microsoft.Extensions.Logging; namespace SchedulerActions; /// <summary> /// Wrapper class for Amazon EventBridge Scheduler operations. /// </summary> public class SchedulerWrapper { private readonly IAmazonScheduler _amazonScheduler; private readonly ILogger<SchedulerWrapper> _logger; /// <summary> /// Constructor for the SchedulerWrapper class. /// </summary> /// <param name="amazonScheduler">The injected EventBridge Scheduler client.</param> /// <param name="logger">The injected logger.</param> public SchedulerWrapper(IAmazonScheduler amazonScheduler, ILogger<SchedulerWrapper> logger) { _amazonScheduler = amazonScheduler; _logger = logger; } /// <summary> /// Creates a new schedule in Amazon EventBridge Scheduler. /// </summary> /// <param name="name">The name of the schedule.</param> /// <param name="scheduleExpression">The schedule expression that defines when the schedule should run.</param> /// <param name="scheduleGroupName">The name of the schedule group to which the schedule should be added.</param> /// <param name="deleteAfterCompletion">Indicates whether to delete the schedule after completion.</param> /// <param name="useFlexibleTimeWindow">Indicates whether to use a flexible time window for the schedule.</param> /// <param name="targetArn">ARN of the event target.</param> /// <param name="roleArn">Execution Role ARN.</param> /// <returns>True if the schedule was created successfully, false otherwise.</returns> public async Task<bool> CreateScheduleAsync( string name, string scheduleExpression, string scheduleGroupName, string targetArn, string roleArn, string input, bool deleteAfterCompletion = false, bool useFlexibleTimeWindow = false) { try { int hoursToRun = 1; int flexibleTimeWindowMinutes = 10; var request = new CreateScheduleRequest { Name = name, ScheduleExpression = scheduleExpression, GroupName = scheduleGroupName, Target = new Target { Arn = targetArn, RoleArn = roleArn, Input = input }, ActionAfterCompletion = deleteAfterCompletion ? ActionAfterCompletion.DELETE : ActionAfterCompletion.NONE, StartDate = DateTime.UtcNow, // Ignored for one-time schedules. EndDate = DateTime.UtcNow .AddHours(hoursToRun) // Ignored for one-time schedules. }; // Allow a flexible time window if the caller specifies it. request.FlexibleTimeWindow = new FlexibleTimeWindow { Mode = useFlexibleTimeWindow ? FlexibleTimeWindowMode.FLEXIBLE : FlexibleTimeWindowMode.OFF, MaximumWindowInMinutes = useFlexibleTimeWindow ? flexibleTimeWindowMinutes : null }; var response = await _amazonScheduler.CreateScheduleAsync(request); Console.WriteLine($"Successfully created schedule '{name}' " + $"in schedule group '{scheduleGroupName}': {response.ScheduleArn}."); return true; } catch (ConflictException ex) { // If the name is not unique, a ConflictException will be thrown. _logger.LogError($"Failed to create schedule '{name}' due to a conflict. {ex.Message}"); return false; } catch (Exception ex) { _logger.LogError($"An error occurred while creating schedule '{name}' " + $"in schedule group '{scheduleGroupName}': {ex.Message}"); return false; } } /// <summary> /// Creates a new schedule group in Amazon EventBridge Scheduler. /// </summary> /// <param name="name">The name of the schedule group.</param> /// <returns>True if the schedule group was created successfully, false otherwise.</returns> public async Task<bool> CreateScheduleGroupAsync(string name) { try { var request = new CreateScheduleGroupRequest { Name = name }; var response = await _amazonScheduler.CreateScheduleGroupAsync(request); Console.WriteLine($"Successfully created schedule group '{name}': {response.ScheduleGroupArn}."); return true; } catch (ConflictException ex) { // If the name is not unique, a ConflictException will be thrown. _logger.LogError($"Failed to create schedule group '{name}' due to a conflict. {ex.Message}"); return false; } catch (Exception ex) { _logger.LogError( $"An error occurred while creating schedule group '{name}': {ex.Message}"); return false; } } /// <summary> /// Deletes an existing schedule from Amazon EventBridge Scheduler. /// </summary> /// <param name="name">The name of the schedule to delete.</param> /// <param name="groupName">The group name of the schedule to delete.</param> /// <returns>True if the schedule was deleted successfully, false otherwise.</returns> public async Task<bool> DeleteScheduleAsync(string name, string groupName) { try { var request = new DeleteScheduleRequest { Name = name, GroupName = groupName }; await _amazonScheduler.DeleteScheduleAsync(request); Console.WriteLine($"Successfully deleted schedule with name '{name}'."); return true; } catch (ResourceNotFoundException ex) { _logger.LogError( $"Failed to delete schedule with ID '{name}' because the resource was not found: {ex.Message}"); return true; } catch (Exception ex) { _logger.LogError( $"An error occurred while deleting schedule with ID '{name}': {ex.Message}"); return false; } } /// <summary> /// Deletes an existing schedule group from Amazon EventBridge Scheduler. /// </summary> /// <param name="name">The name of the schedule group to delete.</param> /// <returns>True if the schedule group was deleted successfully, false otherwise.</returns> public async Task<bool> DeleteScheduleGroupAsync(string name) { try { var request = new DeleteScheduleGroupRequest { Name = name }; await _amazonScheduler.DeleteScheduleGroupAsync(request); Console.WriteLine($"Successfully deleted schedule group '{name}'."); return true; } catch (ResourceNotFoundException ex) { _logger.LogError( $"Failed to delete schedule group '{name}' because the resource was not found: {ex.Message}"); return true; } catch (Exception ex) { _logger.LogError( $"An error occurred while deleting schedule group '{name}': {ex.Message}"); return false; } } }
Java
SDK适用于 Java 2.x
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

运行工作流程。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.scheduler.model.SchedulerException; import javax.mail.internet.AddressException; import javax.mail.internet.InternetAddress; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.Scanner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; /** * This Java code example performs the following tasks for the Amazon EventBridge Scheduler workflow: * <p> * 1. Prepare the Application: * - Prompt the user for an email address to use for the subscription for the SNS topic subscription. * - Deploy the Cloud Formation template in resources/cfn_template.yaml for resource creation. * - Store the outputs of the stack into variables for use in the workflow. * - Create a schedule group for all workflow schedules. * <p> * 2. Create one-time Schedule: * - Create a one-time schedule to send an initial event. * - Use a Flexible Time Window and set the schedule to delete after completion. * - Wait for the user to receive the event email from SNS. * <p> * 3. Create a time-based schedule: * - Prompt the user for how many X times per Y hours a recurring event should be scheduled. * - Create the scheduled event for X times per hour for Y hours. * - Wait for the user to receive the event email from SNS. * - Delete the schedule when the user is finished. * <p> * 4. Clean up: * - Prompt the user for y/n answer if they want to destroy the stack and clean up all resources. * - Delete the schedule group. * - Destroy the Cloud Formation stack and wait until the stack has been removed. */ public class EventbridgeSchedulerScenario { private static final Logger logger = LoggerFactory.getLogger(EventbridgeSchedulerScenario.class); private static final Scanner scanner = new Scanner(System.in); private static String STACK_NAME = "workflow-stack-name"; private static final String scheduleGroupName = "schedules-group"; private static String recurringScheduleName = ""; private static String oneTimeScheduleName = ""; private static final EventbridgeSchedulerActions eventbridgeActions = new EventbridgeSchedulerActions(); public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static String roleArn = ""; public static String snsTopicArn = ""; public static void main(String[] args) { logger.info(DASHES); logger.info("Welcome to the Amazon EventBridge Scheduler Workflow."); logger.info(""" Amazon EventBridge Scheduler is a fully managed service that helps you schedule and execute a wide range of tasks and events in the cloud. It's designed to simplify the process of scheduling and managing recurring or one-time events, making it easier for developers and businesses to automate various workflows and processes. One of the key features of Amazon EventBridge Scheduler is its ability to schedule events based on a variety of triggers, including time-based schedules, custom event patterns, or even integration with other AWS services. For example, you can use EventBridge Scheduler to schedule a report generation task to run every weekday at 9 AM, or to trigger a Lambda function when a specific Amazon S3 object is created. This flexibility allows you to build complex and dynamic event-driven architectures that adapt to your business needs. Lets get started... """); waitForInputToContinue(); logger.info(DASHES); logger.info(DASHES); logger.info("1. Prepare the application."); waitForInputToContinue(); try { boolean prepareSuccess = prepareApplication(); logger.info(DASHES); if (prepareSuccess) { logger.info("2. Create one-time schedule."); logger.info(""" A one-time schedule in Amazon EventBridge Scheduler is an event trigger that allows you to schedule a one-time event to run at a specific date and time. This is useful for executing a specific task or workflow at a predetermined time, without the need for recurring or complex scheduling. """); waitForInputToContinue(); createOneTimeSchedule(); logger.info("Do you want to delete the schedule {} (y/n) ?", oneTimeScheduleName); String ans = scanner.nextLine().trim(); if (ans.equalsIgnoreCase("y")) { eventbridgeActions.deleteScheduleAsync(oneTimeScheduleName,scheduleGroupName); } logger.info(DASHES); logger.info("3. Create a recurring schedule."); logger.info(""" A recurring schedule is a feature that allows you to schedule and manage the execution of your serverless applications or workloads on a recurring basis. For example, with EventBridge Scheduler, you can create custom schedules for your AWS Lambda functions, AWS Step Functions, and other supported event sources, enabling you to automate tasks and workflows without the need for complex infrastructure management. """); waitForInputToContinue(); createRecurringSchedule(); logger.info("Do you want to delete the schedule {} (y/n) ?", oneTimeScheduleName); String ans2 = scanner.nextLine().trim(); if (ans2.equalsIgnoreCase("y")) { eventbridgeActions.deleteScheduleAsync(recurringScheduleName,scheduleGroupName); } logger.info(DASHES); } } catch (Exception ex) { logger.info("There was a problem with the workflow {}, initiating cleanup...", ex.getMessage()); cleanUp(); } logger.info(DASHES); logger.info("4. Clean up the resources."); logger.info("Do you want to delete these AWS resources (y/n) ?"); String delAns = scanner.nextLine().trim(); if (delAns.equalsIgnoreCase("y")) { cleanUp(); } else { logger.info("The AWS resources will not be deleted."); } logger.info("Amazon EventBridge Scheduler workflow completed."); logger.info(DASHES); } /** * Cleans up the resources associated with the EventBridge scheduler. * If any errors occur during the cleanup process, the corresponding error messages are logged. */ public static void cleanUp() { logger.info("First, delete the schedule group."); logger.info("When the schedule group is deleted, schedules that are part of that group are deleted."); waitForInputToContinue(); try { eventbridgeActions.deleteScheduleGroupAsync(scheduleGroupName).join(); } catch (CompletionException ce) { Throwable cause = ce.getCause(); if (cause instanceof SchedulerException schedulerException) { logger.error("Scheduler error occurred: Error message: {}, Error code {}", schedulerException.getMessage(), schedulerException.awsErrorDetails().errorCode(), schedulerException); } else { logger.error("An unexpected error occurred: {}", cause.getMessage()); } return; } logger.info("Destroy the CloudFormation stack"); waitForInputToContinue(); CloudFormationHelper.destroyCloudFormationStack(STACK_NAME); } /** * Prepares the application by creating resources in a CloudFormation stack, including an SNS topic * that will be subscribed to the EventBridge Scheduler events. The user will need to confirm the subscription * in order to receive event emails. * * @return true if the application preparation was successful, false otherwise */ public static boolean prepareApplication() { logger.info(""" This example creates resources in a CloudFormation stack, including an SNS topic that will be subscribed to the EventBridge Scheduler events. You will need to confirm the subscription in order to receive event emails. """); String emailAddress = promptUserForEmail(); logger.info("You entered {}", emailAddress); logger.info("Do you want to use a custom Stack name (y/n) ?"); String ans = scanner.nextLine().trim(); if (ans.equalsIgnoreCase("y")) { String newStackName = scanner.nextLine(); logger.info("You entered {} for the new stack name", newStackName); waitForInputToContinue(); STACK_NAME = newStackName; } logger.info("Get the roleArn and snsTopicArn values using a Cloudformation template."); waitForInputToContinue(); CloudFormationHelper.deployCloudFormationStack(STACK_NAME, emailAddress); Map<String, String> stackOutputs = CloudFormationHelper.getStackOutputs(STACK_NAME); roleArn = stackOutputs.get("RoleARN"); snsTopicArn = stackOutputs.get("SNStopicARN"); logger.info("The roleARN is {}", roleArn); logger.info("The snsTopicArn is {}", snsTopicArn); try { eventbridgeActions.createScheduleGroup(scheduleGroupName).join(); logger.info("createScheduleGroupAsync completed successfully."); } catch (RuntimeException e) { logger.error("Error occurred: {} ", e.getMessage()); return false; } logger.info("Application preparation complete."); return true; } /** * Waits for the user to enter 'c' followed by <ENTER> to continue the program. * This method is used to pause the program execution and wait for user input before * proceeding. */ private static void waitForInputToContinue() { while (true) { logger.info(""); logger.info("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { logger.info("Continuing with the program..."); logger.info(""); break; } else { // Handle invalid input. logger.info("Invalid input. Please try again."); } } } /** * Prompts the user to enter an email address and validates the input. * If the provided email address is invalid, the method will prompt the user to try again. * * @return the valid email address entered by the user */ private static String promptUserForEmail() { logger.info("Enter an email address to use for event subscriptions: "); String email = scanner.nextLine(); if (!isValidEmail(email)) { logger.info("Invalid email address. Please try again."); return promptUserForEmail(); } return email; } /** * Checks if the given email address is valid. * * @param email the email address to be validated * @return {@code true} if the email address is valid, {@code false} otherwise */ private static boolean isValidEmail(String email) { try { InternetAddress emailAddress = new InternetAddress(email); emailAddress.validate(); return true; } catch (AddressException e) { return false; } } /** * Creates a one-time schedule to send an initial event in 1 minute with a flexible time window. * * @return {@code true} if the schedule was created successfully, {@code false} otherwise */ public static Boolean createOneTimeSchedule() { oneTimeScheduleName = promptUserForResourceName("Enter a name for the one-time schedule:"); logger.info("Creating a one-time schedule named {} to send an initial event in 1 minute with a flexible time window...", oneTimeScheduleName); LocalDateTime scheduledTime = LocalDateTime.now(); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); String scheduleExpression = "at(" + scheduledTime.format(formatter) + ")"; return eventbridgeActions.createScheduleAsync( oneTimeScheduleName, scheduleExpression, scheduleGroupName, snsTopicArn, roleArn, "One time scheduled event test from schedule", true, true).join(); } /** * Creates a recurring schedule to send events based on a specific time. * * @return A {@link CompletableFuture} that completes with a boolean value indicating the success or failure of the operation. */ public static Boolean createRecurringSchedule() { logger.info("Creating a recurring schedule to send events for one hour..."); recurringScheduleName = promptUserForResourceName("Enter a name for the recurring schedule:"); // Prompt the user for the schedule rate (in minutes). int scheduleRateInMinutes = promptUserForInteger("Enter the desired schedule rate (in minutes): "); String scheduleExpression = "rate(" + scheduleRateInMinutes + " minutes)"; return eventbridgeActions.createScheduleAsync( recurringScheduleName, scheduleExpression, scheduleGroupName, snsTopicArn, roleArn, "Recurrent event test from schedule " + recurringScheduleName, true, true).join(); } /** * Prompts the user for a resource name and validates the input. * * @param prompt the message to display to the user when prompting for the resource name * @return the valid resource name entered by the user */ private static String promptUserForResourceName(String prompt) { logger.info(prompt); String resourceName = scanner.nextLine(); String regex = "[0-9a-zA-Z-_.]+"; if (!resourceName.matches(regex)) { logger.info("Invalid resource name. Please use a name that matches the pattern " + regex + "."); return promptUserForResourceName(prompt); } return resourceName; } /** * Prompts the user for an integer input and returns the integer value. * * @param prompt the message to be displayed to the user when prompting for input * @return the integer value entered by the user */ private static int promptUserForInteger(String prompt) { logger.info(prompt); String stringResponse = scanner.nextLine(); if (stringResponse == null || stringResponse.trim().isEmpty() || !isInteger(stringResponse)) { logger.info("Invalid integer."); return promptUserForInteger(prompt); } return Integer.parseInt(stringResponse); } /** * Checks if the given string represents a valid integer. * * @param str the string to be checked * @return {@code true} if the string represents a valid integer, {@code false} otherwise */ private static boolean isInteger(String str) { try { Integer.parseInt(str); return true; } catch (NumberFormatException e) { return false; } } }

服务操作的封装器。

import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryMode; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.scheduler.SchedulerAsyncClient; import software.amazon.awssdk.services.scheduler.model.ActionAfterCompletion; import software.amazon.awssdk.services.scheduler.model.ConflictException; import software.amazon.awssdk.services.scheduler.model.CreateScheduleGroupRequest; import software.amazon.awssdk.services.scheduler.model.CreateScheduleGroupResponse; import software.amazon.awssdk.services.scheduler.model.CreateScheduleRequest; import software.amazon.awssdk.services.scheduler.model.DeleteScheduleGroupRequest; import software.amazon.awssdk.services.scheduler.model.DeleteScheduleRequest; import software.amazon.awssdk.services.scheduler.model.DeleteScheduleResponse; import software.amazon.awssdk.services.scheduler.model.FlexibleTimeWindow; import software.amazon.awssdk.services.scheduler.model.FlexibleTimeWindowMode; import software.amazon.awssdk.services.scheduler.model.ResourceNotFoundException; import software.amazon.awssdk.services.scheduler.model.Target; import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.time.Duration; import java.util.concurrent.CompletionException; public class EventbridgeSchedulerActions { private static SchedulerAsyncClient schedulerClient; private static final Logger logger = LoggerFactory.getLogger(EventbridgeSchedulerActions.class); public static SchedulerAsyncClient getAsyncClient() { if (schedulerClient == null) { /* The `NettyNioAsyncHttpClient` class is part of the AWS SDK for Java, version 2, and it is designed to provide a high-performance, asynchronous HTTP client for interacting with AWS services. It uses the Netty framework to handle the underlying network communication and the Java NIO API to provide a non-blocking, event-driven approach to HTTP requests and responses. */ SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(50) // Adjust as needed. .connectionTimeout(Duration.ofSeconds(60)) // Set the connection timeout. .readTimeout(Duration.ofSeconds(60)) // Set the read timeout. .writeTimeout(Duration.ofSeconds(60)) // Set the write timeout. .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) // Set the overall API call timeout. .apiCallAttemptTimeout(Duration.ofSeconds(90)) // Set the individual call attempt timeout. .retryStrategy(RetryMode.STANDARD) .build(); schedulerClient = SchedulerAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .build(); } return schedulerClient; } /** * Creates a new schedule group. * * @param name the name of the schedule group to be created * @return a {@link CompletableFuture} representing the asynchronous operation of creating the schedule group */ public CompletableFuture<CreateScheduleGroupResponse> createScheduleGroup(String name) { CreateScheduleGroupRequest request = CreateScheduleGroupRequest.builder() .name(name) .build(); logger.info("Initiating createScheduleGroup call for group: {}", name); CompletableFuture<CreateScheduleGroupResponse> futureResponse = getAsyncClient().createScheduleGroup(request); futureResponse.whenComplete((response, ex) -> { if (ex != null) { if (ex instanceof CompletionException && ex.getCause() instanceof ConflictException) { // Rethrow the ConflictException throw (ConflictException) ex.getCause(); } else { throw new CompletionException("Failed to create schedule group: " + name, ex); } } else if (response == null) { throw new RuntimeException("Failed to create schedule group: response was null"); } else { logger.info("Successfully created schedule group '{}': {}", name, response.scheduleGroupArn()); } }); return futureResponse; } /** * Creates a new schedule for a target task. * * @param name the name of the schedule * @param scheduleExpression The schedule expression that defines when the schedule should run. * @param scheduleGroupName the name of the schedule group to which the schedule belongs * @param targetArn the Amazon Resource Name (ARN) of the target task * @param roleArn the ARN of the IAM role to be used for the schedule * @param input the input data for the target task * @param deleteAfterCompletion whether to delete the schedule after it's executed * @param useFlexibleTimeWindow whether to use a flexible time window for the schedule execution * @return true if the schedule was successfully created, false otherwise */ public CompletableFuture<Boolean> createScheduleAsync( String name, String scheduleExpression, String scheduleGroupName, String targetArn, String roleArn, String input, boolean deleteAfterCompletion, boolean useFlexibleTimeWindow) { int hoursToRun = 1; int flexibleTimeWindowMinutes = 10; Target target = Target.builder() .arn(targetArn) .roleArn(roleArn) .input(input) .build(); FlexibleTimeWindow flexibleTimeWindow = FlexibleTimeWindow.builder() .mode(useFlexibleTimeWindow ? FlexibleTimeWindowMode.FLEXIBLE : FlexibleTimeWindowMode.OFF) .maximumWindowInMinutes(useFlexibleTimeWindow ? flexibleTimeWindowMinutes : null) .build(); Instant startDate = Instant.now(); Instant endDate = startDate.plus(Duration.ofHours(hoursToRun)); CreateScheduleRequest request = CreateScheduleRequest.builder() .name(name) .scheduleExpression(scheduleExpression) .groupName(scheduleGroupName) .target(target) .actionAfterCompletion(deleteAfterCompletion ? ActionAfterCompletion.DELETE : ActionAfterCompletion.NONE) .startDate(startDate) .endDate(endDate) .flexibleTimeWindow(flexibleTimeWindow) .build(); return getAsyncClient().createSchedule(request) .thenApply(response -> { logger.info("Successfully created schedule {} in schedule group {}, The ARN is {} ", name, scheduleGroupName, response.scheduleArn()); return true; }) .whenComplete((result, ex) -> { if (ex != null) { if (ex instanceof ConflictException) { // Handle ConflictException logger.error("A conflict exception occurred while creating the schedule: {}", ex.getMessage()); throw new CompletionException("A conflict exception occurred while creating the schedule: " + ex.getMessage(), ex); } else { throw new CompletionException("Error creating schedule: " + ex.getMessage(), ex); } } }); } /** * Deletes the specified schedule group. * * @param name the name of the schedule group to delete * @return a {@link CompletableFuture} that completes when the schedule group has been deleted * @throws CompletionException if an error occurs while deleting the schedule group */ public CompletableFuture<Void> deleteScheduleGroupAsync(String name) { DeleteScheduleGroupRequest request = DeleteScheduleGroupRequest.builder() .name(name) .build(); return getAsyncClient().deleteScheduleGroup(request) .thenRun(() -> { logger.info("Successfully deleted schedule group {}", name); }) .whenComplete((result, ex) -> { if (ex != null) { if (ex instanceof ResourceNotFoundException) { throw new CompletionException("The resource was not found: " + ex.getMessage(), ex); } else { throw new CompletionException("Error deleting schedule group: " + ex.getMessage(), ex); } } }); } /** * Deletes a schedule with the specified name and group name. * * @param name the name of the schedule to be deleted * @param groupName the group name of the schedule to be deleted * @return a {@link CompletableFuture} that, when completed, indicates whether the schedule was successfully deleted * @throws CompletionException if an error occurs while deleting the schedule, except for the case where the schedule is not found */ public CompletableFuture<Boolean> deleteScheduleAsync(String name, String groupName) { DeleteScheduleRequest request = DeleteScheduleRequest.builder() .name(name) .groupName(groupName) .build(); CompletableFuture<DeleteScheduleResponse> response = getAsyncClient().deleteSchedule(request); return response.handle((result, ex) -> { if (ex != null) { if (ex instanceof ResourceNotFoundException) { throw new CompletionException("Resource not found while deleting schedule with ID: " + name, ex); } else { throw new CompletionException("Failed to delete schedule.", ex); } } logger.info("Successfully deleted schedule with name {}.", name); return true; }); } }
Python
SDK适用于 Python (Boto3)
注意

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

在命令提示符中运行交互式场景。

class SchedulerScenario: """ A scenario that demonstrates how to use Boto3 to schedule and receive events using the Amazon EventBridge Scheduler. """ def __init__( self, scheduler_wrapper: SchedulerWrapper, cloud_formation_resource: ServiceResource, ): self.eventbridge_scheduler = scheduler_wrapper self.cloud_formation_resource = cloud_formation_resource self.stack: ServiceResource = None self.schedule_group_name = None self.sns_topic_arn = None self.role_arn = None def run(self) -> None: """ Runs the scenario. """ print(DASHES) print("Welcome to the Amazon EventBridge Scheduler Workflow.") print(DASHES) print(DASHES) self.prepare_application() print(DASHES) print(DASHES) self.create_one_time_schedule() print(DASHES) print(DASHES) self.create_recurring_schedule() print(DASHES) print(DASHES) if q.ask( "Do you want to delete all resources created by this workflow? (y/n) ", q.is_yesno, ): self.cleanup() print(DASHES) print("Amazon EventBridge Scheduler workflow completed.") def prepare_application(self) -> None: """ Prepares the application by prompting the user setup information, deploying a CloudFormation stack and creating a schedule group. """ print("Preparing the application...") print( "\nThis example creates resources in a CloudFormation stack, including an SNS topic" + "\nthat will be subscribed to the EventBridge Scheduler events. " + "\n\nYou will need to confirm the subscription in order to receive event emails. " ) email_address = q.ask("Enter an email address to use for event subscriptions: ") stack_name = q.ask("Enter a name for the AWS Cloud Formation Stack: ") template_file = SchedulerScenario.get_template_as_string() parameters = [{"ParameterKey": "email", "ParameterValue": email_address}] self.stack = self.deploy_cloudformation_stack( stack_name, template_file, parameters ) outputs = self.stack.outputs for output in outputs: if output.get("OutputKey") == "RoleARN": self.role_arn = output.get("OutputValue") elif output.get("OutputKey") == "SNStopicARN": self.sns_topic_arn = output.get("OutputValue") if not self.sns_topic_arn or not self.role_arn: error_string = f""" Failed to retrieve required outputs from CloudFormation stack. 'sns_topic_arn'={self.sns_topic_arn}, 'role_arn'={self.role_arn} """ logger.error(error_string) raise ValueError(error_string) print(f"Stack output RoleARN: {self.role_arn}") print(f"Stack output SNStopicARN: a") schedule_group_name = "workflow-schedules-group" schedule_group_arn = self.eventbridge_scheduler.create_schedule_group( schedule_group_name ) print( f"Successfully created schedule group '{self.schedule_group_name}': {schedule_group_arn}." ) self.schedule_group_name = schedule_group_name print("Application preparation complete.") def create_one_time_schedule(self) -> None: """ Creates a one-time schedule to send an initial event. """ schedule_name = q.ask("Enter a name for the one-time schedule:") scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=1) formatted_scheduled_time = scheduled_time.strftime("%Y-%m-%dT%H:%M:%S") print( f"Creating a one-time schedule named '{schedule_name}' " + f"\nto send an initial event in 1 minute with a flexible time window..." ) schedule_arn = self.eventbridge_scheduler.create_schedule( schedule_name, f"at({formatted_scheduled_time})", self.schedule_group_name, self.sns_topic_arn, self.role_arn, f"One time scheduled event test from schedule {schedule_name}.", delete_after_completion=True, use_flexible_time_window=True, ) print( f"Successfully created schedule '{schedule_name}' in schedule group 'workflow-schedules-group': {schedule_arn}." ) print(f"Subscription email will receive an email from this event.") print(f"You must confirm your subscription to receive event emails.") print(f"One-time schedule '{schedule_name}' created successfully.") def create_recurring_schedule(self) -> None: """ Create a recurring schedule to send events at a specified rate in minutes. """ print("Creating a recurring schedule to send events for one hour...") schedule_name = q.ask("Enter a name for the recurring schedule: ") schedule_rate_in_minutes = q.ask( "Enter the desired schedule rate (in minutes): ", q.is_int ) schedule_arn = self.eventbridge_scheduler.create_schedule( schedule_name, f"rate({schedule_rate_in_minutes} minutes)", self.schedule_group_name, self.sns_topic_arn, self.role_arn, f"Recurrent event test from schedule {schedule_name}.", ) print( f"Successfully created schedule '{schedule_name}' in schedule group 'workflow-schedules-group': {schedule_arn}." ) print(f"Subscription email will receive an email from this event.") print(f"You must confirm your subscription to receive event emails.") if q.ask( f"Are you ready to delete the '{schedule_name}' schedule? (y/n)", q.is_yesno ): self.eventbridge_scheduler.delete_schedule( schedule_name, self.schedule_group_name ) def deploy_cloudformation_stack( self, stack_name: str, cfn_template: str, parameters: [dict[str, str]] ) -> ServiceResource: """ Deploys prerequisite resources used by the scenario. The resources are defined in the associated `cfn_template.yaml` AWS CloudFormation script and are deployed as a CloudFormation stack, so they can be easily managed and destroyed. :param stack_name: The name of the CloudFormation stack. :param cfn_template: The CloudFormation template as a string. :param parameters: The parameters for the CloudFormation stack. :return: The CloudFormation stack resource. """ print(f"Deploying CloudFormation stack: {stack_name}.") stack = self.cloud_formation_resource.create_stack( StackName=stack_name, TemplateBody=cfn_template, Capabilities=["CAPABILITY_NAMED_IAM"], Parameters=parameters, ) print(f"CloudFormation stack creation started: {stack_name}") print("Waiting for CloudFormation stack creation to complete...") waiter = self.cloud_formation_resource.meta.client.get_waiter( "stack_create_complete" ) waiter.wait(StackName=stack.name) stack.load() print("CloudFormation stack creation complete.") return stack def destroy_cloudformation_stack(self, stack: ServiceResource) -> None: """ Destroys the resources managed by the CloudFormation stack, and the CloudFormation stack itself. :param stack: The CloudFormation stack that manages the example resources. """ print( f"CloudFormation stack '{stack.name}' is being deleted. This may take a few minutes." ) stack.delete() waiter = self.cloud_formation_resource.meta.client.get_waiter( "stack_delete_complete" ) waiter.wait(StackName=stack.name) print(f"CloudFormation stack '{stack.name}' has been deleted.") def cleanup(self) -> None: """ Deletes the CloudFormation stack and the resources created for the demo. """ if self.schedule_group_name: schedule_group_name = self.schedule_group_name self.schedule_group_name = None self.eventbridge_scheduler.delete_schedule_group(schedule_group_name) print(f"Successfully deleted schedule group '{schedule_group_name}'.") if self.stack is not None: stack = self.stack self.stack = None self.destroy_cloudformation_stack(stack) print("Stack deleted, demo complete.") @staticmethod def get_template_as_string() -> str: """ Returns a string containing this scenario's CloudFormation template. """ script_directory = os.path.dirname(os.path.abspath(__file__)) template_file_path = os.path.join(script_directory, "cfn_template.yaml") file = open(template_file_path, "r") return file.read() if __name__ == "__main__": demo: SchedulerScenario = None try: scheduler_wrapper = SchedulerWrapper.from_client() cloud_formation_resource = resource("cloudformation") demo = SchedulerScenario(scheduler_wrapper, cloud_formation_resource) demo.run() except Exception as exception: logging.exception("Something went wrong with the demo!") if demo is not None: demo.cleanup()

SchedulerWrapper 封装 Amazon EventBridge 计划程序操作的类。

class SchedulerWrapper: def __init__(self, eventbridge_scheduler_client: client): self.scheduler_client = eventbridge_scheduler_client @classmethod def from_client(cls) -> "SchedulerWrapper": """ Creates a SchedulerWrapper instance with a default EventBridge Scheduler client. :return: An instance of SchedulerWrapper initialized with the default EventBridge Scheduler client. """ eventbridge_scheduler_client = boto3.client("scheduler") return cls(eventbridge_scheduler_client) def create_schedule( self, name: str, schedule_expression: str, schedule_group_name: str, target_arn: str, role_arn: str, input: str, delete_after_completion: bool = False, use_flexible_time_window: bool = False, ) -> str: """ Creates a new schedule with the specified parameters. :param name: The name of the schedule. :param schedule_expression: The expression that defines when the schedule runs. :param schedule_group_name: The name of the schedule group. :param target_arn: The Amazon Resource Name (ARN) of the target. :param role_arn: The Amazon Resource Name (ARN) of the execution IAM role. :param input: The input for the target. :param delete_after_completion: Whether to delete the schedule after it completes. :param use_flexible_time_window: Whether to use a flexible time window. :return The ARN of the created schedule. """ try: hours_to_run = 1 flexible_time_window_minutes = 10 parameters = { "Name": name, "ScheduleExpression": schedule_expression, "GroupName": schedule_group_name, "Target": {"Arn": target_arn, "RoleArn": role_arn, "Input": input}, "StartDate": datetime.now(timezone.utc), "EndDate": datetime.now(timezone.utc) + timedelta(hours=hours_to_run), } if delete_after_completion: parameters["ActionAfterCompletion"] = "DELETE" if use_flexible_time_window: parameters["FlexibleTimeWindow"] = { "Mode": "FLEXIBLE", "MaximumWindowInMinutes": flexible_time_window_minutes, } else: parameters["FlexibleTimeWindow"] = {"Mode": "OFF"} response = self.scheduler_client.create_schedule(**parameters) return response["ScheduleArn"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": logger.error( "Failed to create schedule '%s' due to a conflict. %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error creating schedule: %s", err.response["Error"]["Message"] ) raise def delete_schedule(self, name: str, schedule_group_name: str) -> None: """ Deletes the schedule with the specified name and schedule group. :param name: The name of the schedule. :param schedule_group_name: The name of the schedule group. """ try: self.scheduler_client.delete_schedule( Name=name, GroupName=schedule_group_name ) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Failed to delete schedule with ID '%s' because the resource was not found: %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error deleting schedule: %s", err.response["Error"]["Message"] ) raise def create_schedule_group(self, name: str) -> str: """ Creates a new schedule group with the specified name and description. :param name: The name of the schedule group. :param description: The description of the schedule group. :return: The ARN of the created schedule group. """ try: response = self.scheduler_client.create_schedule_group(Name=name) return response["ScheduleGroupArn"] except ClientError as err: if err.response["Error"]["Code"] == "ConflictException": logger.error( "Failed to create schedule group '%s' due to a conflict. %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error creating schedule group: %s", err.response["Error"]["Message"], ) raise def delete_schedule_group(self, name: str) -> None: """ Deletes the schedule group with the specified name. :param name: The name of the schedule group. """ try: self.scheduler_client.delete_schedule_group(Name=name) logger.info("Schedule group %s deleted successfully.", name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.error( "Failed to delete schedule group with ID '%s' because the resource was not found: %s", name, err.response["Error"]["Message"], ) else: logger.error( "Error deleting schedule group: %s", err.response["Error"]["Message"], ) raise