SageMaker 使用 的範例 AWS SDK for .NET - AWS SDK for .NET

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

SageMaker 使用 的範例 AWS SDK for .NET

下列程式碼範例示範如何使用 AWS SDK for .NET 搭配 來執行動作和實作常見案例 SageMaker。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會示範如何呼叫個別服務函數,但您可以在相關案例中查看內容中的動作。

案例是程式碼範例,示範如何透過呼叫服務內的多個函數或與其他 結合來完成特定任務 AWS 服務。

每個範例都包含完整原始程式碼的連結,您可以在其中找到如何在內容中設定和執行程式碼的指示。

開始使用

下列程式碼範例示範如何使用 SageMaker。

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

using Amazon.SageMaker; using Amazon.SageMaker.Model; namespace SageMakerActions; public static class HelloSageMaker { static async Task Main(string[] args) { var sageMakerClient = new AmazonSageMakerClient(); Console.WriteLine($"Hello Amazon SageMaker! Let's list some of your notebook instances:"); Console.WriteLine(); // You can use await and any of the async methods to get a response. // Let's get the first five notebook instances. var response = await sageMakerClient.ListNotebookInstancesAsync( new ListNotebookInstancesRequest() { MaxResults = 5 }); if (!response.NotebookInstances.Any()) { Console.WriteLine($"No notebook instances found."); Console.WriteLine("See https://docs.aws.amazon.com/sagemaker/latest/dg/howitworks-create-ws.html to create one."); } foreach (var notebookInstance in response.NotebookInstances) { Console.WriteLine($"\tInstance: {notebookInstance.NotebookInstanceName}"); Console.WriteLine($"\tArn: {notebookInstance.NotebookInstanceArn}"); Console.WriteLine($"\tCreation Date: {notebookInstance.CreationTime.ToShortDateString()}"); Console.WriteLine(); } } }

動作

下列程式碼範例示範如何使用 CreatePipeline

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }
  • 如需API詳細資訊,請參閱 參考 CreatePipeline中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DeletePipeline

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; }
  • 如需API詳細資訊,請參閱 參考 DeletePipeline中的 。 AWS SDK for .NET API

下列程式碼範例示範如何使用 DescribePipelineExecution

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; }

下列程式碼範例示範如何使用 StartPipelineExecution

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; }

下列程式碼範例示範如何使用 UpdatePipeline

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }
  • 如需API詳細資訊,請參閱 參考 UpdatePipeline中的 。 AWS SDK for .NET API

案例

以下程式碼範例顯示做法:

  • 設定管道的資源。

  • 設定執行地理空間任務的管道。

  • 啟動管道執行。

  • 監控執行的狀態。

  • 檢視管道的輸出。

  • 清除資源。

如需詳細資訊,請參閱在 AWS SDKs Community.aws 上使用 建立和執行 SageMaker 管道

AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立包裝 SageMaker 操作的類別。

using System.Text.Json; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; namespace SageMakerActions; /// <summary> /// Wrapper class for Amazon SageMaker actions and logic. /// </summary> public class SageMakerWrapper { private readonly IAmazonSageMaker _amazonSageMaker; public SageMakerWrapper(IAmazonSageMaker amazonSageMaker) { _amazonSageMaker = amazonSageMaker; } /// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } } /// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; } /// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; } /// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; } }

建立處理 SageMaker 管道回呼的函數。

using System.Text.Json; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; // Assembly attribute to enable the AWS Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SageMakerLambda; /// <summary> /// The AWS Lambda function handler for the Amazon SageMaker pipeline. /// </summary> public class SageMakerLambdaFunction { /// <summary> /// Default constructor. This constructor is used by AWS Lambda to construct the instance. When invoked in a Lambda environment /// the AWS credentials will come from the AWS Identity and Access Management (IAM) role associated with the function. The AWS Region will be set to the /// Region that the Lambda function is running in. /// </summary> public SageMakerLambdaFunction() { } /// <summary> /// The AWS Lambda function handler that processes events from the SageMaker pipeline and starts a job or export. /// </summary> /// <param name="request">The custom SageMaker pipeline request object.</param> /// <param name="context">The Lambda context.</param> /// <returns>The dictionary of output parameters.</returns> public async Task<Dictionary<string, string>> FunctionHandler(PipelineRequest request, ILambdaContext context) { var geoSpatialClient = new AmazonSageMakerGeospatialClient(); var sageMakerClient = new AmazonSageMakerClient(); var responseDictionary = new Dictionary<string, string>(); context.Logger.LogInformation("Function handler started with request: " + JsonSerializer.Serialize(request)); if (request.Records != null && request.Records.Any()) { context.Logger.LogInformation("Records found, this is a queue event. Processing the queue records."); foreach (var message in request.Records) { await ProcessMessageAsync(message, context, geoSpatialClient, sageMakerClient); } } else if (!string.IsNullOrEmpty(request.vej_export_config)) { context.Logger.LogInformation("Export configuration found, this is an export. Start the Vector Enrichment Job (VEJ) export."); var outputConfig = JsonSerializer.Deserialize<ExportVectorEnrichmentJobOutputConfig>( request.vej_export_config); var exportResponse = await geoSpatialClient.ExportVectorEnrichmentJobAsync( new ExportVectorEnrichmentJobRequest() { Arn = request.vej_arn, ExecutionRoleArn = request.Role, OutputConfig = outputConfig }); context.Logger.LogInformation($"Export response: {JsonSerializer.Serialize(exportResponse)}"); responseDictionary = new Dictionary<string, string> { { "export_eoj_status", exportResponse.ExportStatus.ToString() }, { "vej_arn", exportResponse.Arn } }; } else if (!string.IsNullOrEmpty(request.vej_name)) { context.Logger.LogInformation("Vector Enrichment Job name found, starting the job."); var inputConfig = JsonSerializer.Deserialize<VectorEnrichmentJobInputConfig>( request.vej_input_config); var jobConfig = JsonSerializer.Deserialize<VectorEnrichmentJobConfig>( request.vej_config); var jobResponse = await geoSpatialClient.StartVectorEnrichmentJobAsync( new StartVectorEnrichmentJobRequest() { ExecutionRoleArn = request.Role, InputConfig = inputConfig, Name = request.vej_name, JobConfig = jobConfig }); context.Logger.LogInformation("Job response: " + JsonSerializer.Serialize(jobResponse)); responseDictionary = new Dictionary<string, string> { { "vej_arn", jobResponse.Arn }, { "statusCode", jobResponse.HttpStatusCode.ToString() } }; } return responseDictionary; } /// <summary> /// Process a queue message and check the status of a SageMaker job. /// </summary> /// <param name="message">The queue message.</param> /// <param name="context">The Lambda context.</param> /// <param name="geoClient">The SageMaker GeoSpatial client.</param> /// <param name="sageMakerClient">The SageMaker client.</param> /// <returns>Async task.</returns> private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context, AmazonSageMakerGeospatialClient geoClient, AmazonSageMakerClient sageMakerClient) { context.Logger.LogInformation($"Processed message {message.Body}"); // Get information about the SageMaker job. var payload = JsonSerializer.Deserialize<QueuePayload>(message.Body); context.Logger.LogInformation($"Payload token {payload!.token}"); var token = payload.token; if (payload.arguments.ContainsKey("vej_arn")) { // Use the job ARN and the token to get the job status. var job_arn = payload.arguments["vej_arn"]; context.Logger.LogInformation($"Token: {token}, arn {job_arn}"); var jobInfo = geoClient.GetVectorEnrichmentJobAsync( new GetVectorEnrichmentJobRequest() { Arn = job_arn }); context.Logger.LogInformation("Job info: " + JsonSerializer.Serialize(jobInfo)); if (jobInfo.Result.Status == VectorEnrichmentJobStatus.COMPLETED) { context.Logger.LogInformation($"Status completed, resuming pipeline..."); await sageMakerClient.SendPipelineExecutionStepSuccessAsync( new SendPipelineExecutionStepSuccessRequest() { CallbackToken = token, OutputParameters = new List<OutputParameter>() { new OutputParameter() { Name = "export_status", Value = jobInfo.Result.Status } } }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.FAILED) { context.Logger.LogInformation($"Status failed, stopping pipeline..."); await sageMakerClient.SendPipelineExecutionStepFailureAsync( new SendPipelineExecutionStepFailureRequest() { CallbackToken = token, FailureReason = jobInfo.Result.ErrorDetails.ErrorMessage }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.IN_PROGRESS) { // Put this message back in the queue to reprocess later. context.Logger.LogInformation( $"Status still in progress, check back later."); throw new("Job still running."); } } } }

在命令提示中執行互動式案例。

public static class PipelineWorkflow { public static IAmazonIdentityManagementService _iamClient = null!; public static SageMakerWrapper _sageMakerWrapper = null!; public static IAmazonSQS _sqsClient = null!; public static IAmazonS3 _s3Client = null!; public static IAmazonLambda _lambdaClient = null!; public static IConfiguration _configuration = null!; public static string lambdaFunctionName = "SageMakerExampleFunction"; public static string sageMakerRoleName = "SageMakerExampleRole"; public static string lambdaRoleName = "SageMakerExampleLambdaRole"; private static string[] lambdaRolePolicies = null!; private static string[] sageMakerRolePolicies = null!; static async Task Main(string[] args) { var options = new AWSOptions() { Region = RegionEndpoint.USWest2 }; // Set up dependency injection for the AWS service. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonIdentityManagementService>(options) .AddAWSService<IAmazonEC2>(options) .AddAWSService<IAmazonSageMaker>(options) .AddAWSService<IAmazonSageMakerGeospatial>(options) .AddAWSService<IAmazonSQS>(options) .AddAWSService<IAmazonS3>(options) .AddAWSService<IAmazonLambda>(options) .AddTransient<SageMakerWrapper>() ) .Build(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally, load local settings. .Build(); ServicesSetup(host); string queueUrl = ""; string queueName = _configuration["queueName"]; string bucketName = _configuration["bucketName"]; var pipelineName = _configuration["pipelineName"]; try { Console.WriteLine(new string('-', 80)); Console.WriteLine( "Welcome to the Amazon SageMaker pipeline example scenario."); Console.WriteLine( "\nThis example workflow will guide you through setting up and running an" + "\nAmazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an" + "\nAmazon SQS Queue. It runs a vector enrichment reverse geocode job to" + "\nreverse geocode addresses in an input file and store the results in an export file."); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine( "First, we will set up the roles, functions, and queue needed by the SageMaker pipeline."); Console.WriteLine(new string('-', 80)); var lambdaRoleArn = await CreateLambdaRole(); var sageMakerRoleArn = await CreateSageMakerRole(); var functionArn = await SetupLambda(lambdaRoleArn, true); queueUrl = await SetupQueue(queueName); await SetupBucket(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can create and run our pipeline."); Console.WriteLine(new string('-', 80)); await SetupPipeline(sageMakerRoleArn, functionArn, pipelineName); var executionArn = await ExecutePipeline(queueUrl, sageMakerRoleArn, pipelineName, bucketName); await WaitForPipelineExecution(executionArn); await GetOutputResults(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("The pipeline has completed. To view the pipeline and runs " + "in SageMaker Studio, follow these instructions:" + "\nhttps://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html"); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine("Finally, let's clean up our resources."); Console.WriteLine(new string('-', 80)); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("SageMaker pipeline scenario is complete."); Console.WriteLine(new string('-', 80)); } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { _sageMakerWrapper = host.Services.GetRequiredService<SageMakerWrapper>(); _iamClient = host.Services.GetRequiredService<IAmazonIdentityManagementService>(); _sqsClient = host.Services.GetRequiredService<IAmazonSQS>(); _s3Client = host.Services.GetRequiredService<IAmazonS3>(); _lambdaClient = host.Services.GetRequiredService<IAmazonLambda>(); } /// <summary> /// Set up AWS Lambda, either by updating an existing function or creating a new function. /// </summary> /// <param name="roleArn">The role Amazon Resource Name (ARN) to use for the Lambda function.</param> /// <param name="askUser">True to ask the user before updating.</param> /// <returns>The ARN of the function.</returns> public static async Task<string> SetupLambda(string roleArn, bool askUser) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Setting up the Lambda function for the pipeline."); var handlerName = "SageMakerLambda::SageMakerLambda.SageMakerLambdaFunction::FunctionHandler"; var functionArn = ""; try { var functionInfo = await _lambdaClient.GetFunctionAsync(new GetFunctionRequest() { FunctionName = lambdaFunctionName }); var updateFunction = true; if (askUser) { updateFunction = GetYesNoResponse( $"\tThe Lambda function {lambdaFunctionName} already exists, do you want to update it?"); } if (updateFunction) { // Update the Lambda function. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); await _lambdaClient.UpdateFunctionCodeAsync( new UpdateFunctionCodeRequest() { FunctionName = lambdaFunctionName, ZipFile = zipMemoryStream, }); } functionArn = functionInfo.Configuration.FunctionArn; } catch (ResourceNotFoundException) { Console.WriteLine($"\tThe Lambda function {lambdaFunctionName} was not found, creating the new function."); // Create the function if it does not already exist. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); var createResult = await _lambdaClient.CreateFunctionAsync( new CreateFunctionRequest() { FunctionName = lambdaFunctionName, Runtime = Runtime.Dotnet6, Description = "SageMaker example function.", Code = new FunctionCode() { ZipFile = zipMemoryStream }, Handler = handlerName, Role = roleArn, Timeout = 30 }); functionArn = createResult.FunctionArn; } Console.WriteLine($"\tLambda ready with ARN {functionArn}."); Console.WriteLine(new string('-', 80)); return functionArn; } /// <summary> /// Create a role to be used by AWS Lambda. Does not create the role if it already exists. /// </summary> /// <returns>The role ARN.</returns> public static async Task<string> CreateLambdaRole() { Console.WriteLine(new string('-', 80)); lambdaRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSQSFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy", "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole" }; var roleArn = await GetRoleArnIfExists(lambdaRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to for AWS Lambda to use."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = lambdaRoleName }); foreach (var policy in lambdaRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = lambdaRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Create a role to be used by SageMaker. /// </summary> /// <returns>The role Amazon Resource Name (ARN).</returns> public static async Task<string> CreateSageMakerRole() { Console.WriteLine(new string('-', 80)); sageMakerRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSageMakerGeospatialFullAccess", }; var roleArn = await GetRoleArnIfExists(sageMakerRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to use with SageMaker."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = sageMakerRoleName }); foreach (var policy in sageMakerRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = sageMakerRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Set up the SQS queue to use with the pipeline. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <returns>The URL for the queue.</returns> public static async Task<string> SetupQueue(string queueName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up queue {queueName}."); try { var queueInfo = await _sqsClient.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = queueName }); return queueInfo.QueueUrl; } catch (QueueDoesNotExistException) { var attrs = new Dictionary<string, string> { { QueueAttributeName.DelaySeconds, "5" }, { QueueAttributeName.ReceiveMessageWaitTimeSeconds, "5" }, { QueueAttributeName.VisibilityTimeout, "300" }, }; var request = new CreateQueueRequest { Attributes = attrs, QueueName = queueName, }; var response = await _sqsClient.CreateQueueAsync(request); Thread.Sleep(10000); await ConnectLambda(response.QueueUrl); Console.WriteLine($"\tQueue ready with Url {response.QueueUrl}."); Console.WriteLine(new string('-', 80)); return response.QueueUrl; } } /// <summary> /// Connect the queue to the Lambda function as an event source. /// </summary> /// <param name="queueUrl">The URL for the queue.</param> /// <returns>Async task.</returns> public static async Task ConnectLambda(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Connecting the Lambda function and queue for the pipeline."); var queueAttributes = await _sqsClient.GetQueueAttributesAsync( new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { "All" } }); var queueArn = queueAttributes.QueueARN; var eventSource = await _lambdaClient.ListEventSourceMappingsAsync( new ListEventSourceMappingsRequest() { FunctionName = lambdaFunctionName }); if (!eventSource.EventSourceMappings.Any()) { // Only add the event source mapping if it does not already exist. await _lambdaClient.CreateEventSourceMappingAsync( new CreateEventSourceMappingRequest() { EventSourceArn = queueArn, FunctionName = lambdaFunctionName, Enabled = true }); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the bucket to use for pipeline input and output. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task SetupBucket(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up bucket {bucketName}."); var bucketExists = await Amazon.S3.Util.AmazonS3Util.DoesS3BucketExistV2Async(_s3Client, bucketName); if (!bucketExists) { await _s3Client.PutBucketAsync(new PutBucketRequest() { BucketName = bucketName, BucketRegion = S3Region.USWest2 }); Thread.Sleep(5000); await _s3Client.PutObjectAsync(new PutObjectRequest() { BucketName = bucketName, Key = "samplefiles/latlongtest.csv", FilePath = "latlongtest.csv" }); } Console.WriteLine($"\tBucket {bucketName} ready."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Display some results from the output directory. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task<string> GetOutputResults(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Getting output results {bucketName}."); string outputKey = ""; Thread.Sleep(15000); var outputFiles = await _s3Client.ListObjectsAsync( new ListObjectsRequest() { BucketName = bucketName, Prefix = "outputfiles/" }); if (outputFiles.S3Objects.Any()) { var sampleOutput = outputFiles.S3Objects.OrderBy(s => s.LastModified).Last(); Console.WriteLine($"\tOutput file: {sampleOutput.Key}"); var outputSampleResponse = await _s3Client.GetObjectAsync( new GetObjectRequest() { BucketName = bucketName, Key = sampleOutput.Key }); outputKey = sampleOutput.Key; StreamReader reader = new StreamReader(outputSampleResponse.ResponseStream); await reader.ReadLineAsync(); Console.WriteLine("\tOutput file contents: \n"); for (int i = 0; i < 10; i++) { if (!reader.EndOfStream) { Console.WriteLine("\t" + await reader.ReadLineAsync()); } } } Console.WriteLine(new string('-', 80)); return outputKey; } /// <summary> /// Create a pipeline from the example pipeline JSON /// that includes the Lambda, callback, processing, and export jobs. /// </summary> /// <param name="roleArn">The ARN of the role for the pipeline.</param> /// <param name="functionArn">The ARN of the Lambda function for the pipeline.</param> /// <param name="pipelineName">The name for the pipeline.</param> /// <returns>The ARN of the pipeline.</returns> public static async Task<string> SetupPipeline(string roleArn, string functionArn, string pipelineName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up the pipeline."); var pipelineJson = await File.ReadAllTextAsync("GeoSpatialPipeline.json"); // Add the correct function ARN instead of the placeholder. pipelineJson = pipelineJson.Replace("*FUNCTION_ARN*", functionArn); var pipelineArn = await _sageMakerWrapper.SetupPipeline(pipelineJson, roleArn, pipelineName, "sdk example pipeline", pipelineName); Console.WriteLine($"\tPipeline set up with ARN {pipelineArn}."); Console.WriteLine(new string('-', 80)); return pipelineArn; } /// <summary> /// Start a pipeline run with job configurations. /// </summary> /// <param name="queueUrl">The URL for the queue used in the pipeline.</param> /// <param name="roleArn">The ARN of the role.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>The pipeline run ARN.</returns> public static async Task<string> ExecutePipeline( string queueUrl, string roleArn, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Starting pipeline execution."); var input = $"s3://{bucketName}/samplefiles/latlongtest.csv"; var output = $"s3://{bucketName}/outputfiles/"; var executionARN = await _sageMakerWrapper.ExecutePipeline(queueUrl, input, output, pipelineName, roleArn); Console.WriteLine($"\tRun started with ARN {executionARN}."); Console.WriteLine(new string('-', 80)); return executionARN; } /// <summary> /// Wait for a pipeline run to complete. /// </summary> /// <param name="executionArn">The pipeline run ARN.</param> /// <returns>Async task.</returns> public static async Task WaitForPipelineExecution(string executionArn) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Waiting for pipeline to finish."); PipelineExecutionStatus status; do { status = await _sageMakerWrapper.CheckPipelineExecutionStatus(executionArn); Thread.Sleep(30000); Console.WriteLine($"\tStatus is {status}."); } while (status == PipelineExecutionStatus.Executing); Console.WriteLine($"\tPipeline finished with status {status}."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <param name="askUser">True to ask the user for cleanup.</param> /// <param name="queueUrl">The URL of the queue to clean up.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>Async task.</returns> public static async Task<bool> CleanupResources( bool askUser, string queueUrl, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); if (!askUser || GetYesNoResponse($"\tDelete pipeline {pipelineName}? (y/n)")) { Console.WriteLine($"\tDeleting pipeline."); // Delete the pipeline. await _sageMakerWrapper.DeletePipelineByName(pipelineName); } if (!string.IsNullOrEmpty(queueUrl) && (!askUser || GetYesNoResponse($"\tDelete queue {queueUrl}? (y/n)"))) { Console.WriteLine($"\tDeleting queue."); // Delete the queue. await _sqsClient.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); } if (!askUser || GetYesNoResponse($"\tDelete Amazon S3 bucket {bucketName}? (y/n)")) { Console.WriteLine($"\tDeleting bucket."); // Delete all objects in the bucket. var deleteList = await _s3Client.ListObjectsV2Async(new ListObjectsV2Request() { BucketName = bucketName }); if (deleteList.KeyCount > 0) { await _s3Client.DeleteObjectsAsync(new DeleteObjectsRequest() { BucketName = bucketName, Objects = deleteList.S3Objects .Select(o => new KeyVersion { Key = o.Key }).ToList() }); } // Now delete the bucket. await _s3Client.DeleteBucketAsync(new DeleteBucketRequest() { BucketName = bucketName }); } if (!askUser || GetYesNoResponse($"\tDelete lambda {lambdaFunctionName}? (y/n)")) { Console.WriteLine($"\tDeleting lambda function."); await _lambdaClient.DeleteFunctionAsync(new DeleteFunctionRequest() { FunctionName = lambdaFunctionName }); } if (!askUser || GetYesNoResponse($"\tDelete role {lambdaRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in lambdaRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = lambdaRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = lambdaRoleName }); } if (!askUser || GetYesNoResponse($"\tDelete role {sageMakerRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in sageMakerRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = sageMakerRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = sageMakerRoleName }); } Console.WriteLine(new string('-', 80)); return true; } /// <summary> /// Helper method to get a role's ARN if it already exists. /// </summary> /// <param name="roleName">The name of the AWS Identity and Access Management (IAM) Role to look for.</param> /// <returns>The role ARN if it exists, otherwise an empty string.</returns> private static async Task<string> GetRoleArnIfExists(string roleName) { Console.WriteLine($"Checking for role named {roleName}."); try { var existingRole = await _iamClient.GetRoleAsync(new GetRoleRequest() { RoleName = lambdaRoleName }); return existingRole.Role.Arn; } catch (NoSuchEntityException) { return string.Empty; } } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } }