使用 開始管道中的 SageMaker 地理空間任務 AWS SDK - AWS SDK 程式碼範例

文件範例儲存庫中有更多 AWS SDK可用的範例。 AWS SDK GitHub

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 開始管道中的 SageMaker 地理空間任務 AWS SDK

下列程式碼範例示範如何:

  • 設定管道的資源。

  • 設定執行地理空間任務的管道。

  • 啟動管道執行。

  • 監控執行的狀態。

  • 檢視管道的輸出。

  • 清除資源。

如需詳細資訊,請參閱在 AWS SDKs Community.aws 上使用 建立和執行 SageMaker 管道

.NET
AWS SDK for .NET
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

建立包裝 SageMaker 操作的類別。

using System.Text.Json; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; namespace SageMakerActions; /// <summary> /// Wrapper class for Amazon SageMaker actions and logic. /// </summary> public class SageMakerWrapper { private readonly IAmazonSageMaker _amazonSageMaker; public SageMakerWrapper(IAmazonSageMaker amazonSageMaker) { _amazonSageMaker = amazonSageMaker; } /// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } } /// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; } /// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; } /// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; } }

建立處理 SageMaker 管道回呼的函數。

using System.Text.Json; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; // Assembly attribute to enable the AWS Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SageMakerLambda; /// <summary> /// The AWS Lambda function handler for the Amazon SageMaker pipeline. /// </summary> public class SageMakerLambdaFunction { /// <summary> /// Default constructor. This constructor is used by AWS Lambda to construct the instance. When invoked in a Lambda environment /// the AWS credentials will come from the AWS Identity and Access Management (IAM) role associated with the function. The AWS Region will be set to the /// Region that the Lambda function is running in. /// </summary> public SageMakerLambdaFunction() { } /// <summary> /// The AWS Lambda function handler that processes events from the SageMaker pipeline and starts a job or export. /// </summary> /// <param name="request">The custom SageMaker pipeline request object.</param> /// <param name="context">The Lambda context.</param> /// <returns>The dictionary of output parameters.</returns> public async Task<Dictionary<string, string>> FunctionHandler(PipelineRequest request, ILambdaContext context) { var geoSpatialClient = new AmazonSageMakerGeospatialClient(); var sageMakerClient = new AmazonSageMakerClient(); var responseDictionary = new Dictionary<string, string>(); context.Logger.LogInformation("Function handler started with request: " + JsonSerializer.Serialize(request)); if (request.Records != null && request.Records.Any()) { context.Logger.LogInformation("Records found, this is a queue event. Processing the queue records."); foreach (var message in request.Records) { await ProcessMessageAsync(message, context, geoSpatialClient, sageMakerClient); } } else if (!string.IsNullOrEmpty(request.vej_export_config)) { context.Logger.LogInformation("Export configuration found, this is an export. Start the Vector Enrichment Job (VEJ) export."); var outputConfig = JsonSerializer.Deserialize<ExportVectorEnrichmentJobOutputConfig>( request.vej_export_config); var exportResponse = await geoSpatialClient.ExportVectorEnrichmentJobAsync( new ExportVectorEnrichmentJobRequest() { Arn = request.vej_arn, ExecutionRoleArn = request.Role, OutputConfig = outputConfig }); context.Logger.LogInformation($"Export response: {JsonSerializer.Serialize(exportResponse)}"); responseDictionary = new Dictionary<string, string> { { "export_eoj_status", exportResponse.ExportStatus.ToString() }, { "vej_arn", exportResponse.Arn } }; } else if (!string.IsNullOrEmpty(request.vej_name)) { context.Logger.LogInformation("Vector Enrichment Job name found, starting the job."); var inputConfig = JsonSerializer.Deserialize<VectorEnrichmentJobInputConfig>( request.vej_input_config); var jobConfig = JsonSerializer.Deserialize<VectorEnrichmentJobConfig>( request.vej_config); var jobResponse = await geoSpatialClient.StartVectorEnrichmentJobAsync( new StartVectorEnrichmentJobRequest() { ExecutionRoleArn = request.Role, InputConfig = inputConfig, Name = request.vej_name, JobConfig = jobConfig }); context.Logger.LogInformation("Job response: " + JsonSerializer.Serialize(jobResponse)); responseDictionary = new Dictionary<string, string> { { "vej_arn", jobResponse.Arn }, { "statusCode", jobResponse.HttpStatusCode.ToString() } }; } return responseDictionary; } /// <summary> /// Process a queue message and check the status of a SageMaker job. /// </summary> /// <param name="message">The queue message.</param> /// <param name="context">The Lambda context.</param> /// <param name="geoClient">The SageMaker GeoSpatial client.</param> /// <param name="sageMakerClient">The SageMaker client.</param> /// <returns>Async task.</returns> private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context, AmazonSageMakerGeospatialClient geoClient, AmazonSageMakerClient sageMakerClient) { context.Logger.LogInformation($"Processed message {message.Body}"); // Get information about the SageMaker job. var payload = JsonSerializer.Deserialize<QueuePayload>(message.Body); context.Logger.LogInformation($"Payload token {payload!.token}"); var token = payload.token; if (payload.arguments.ContainsKey("vej_arn")) { // Use the job ARN and the token to get the job status. var job_arn = payload.arguments["vej_arn"]; context.Logger.LogInformation($"Token: {token}, arn {job_arn}"); var jobInfo = geoClient.GetVectorEnrichmentJobAsync( new GetVectorEnrichmentJobRequest() { Arn = job_arn }); context.Logger.LogInformation("Job info: " + JsonSerializer.Serialize(jobInfo)); if (jobInfo.Result.Status == VectorEnrichmentJobStatus.COMPLETED) { context.Logger.LogInformation($"Status completed, resuming pipeline..."); await sageMakerClient.SendPipelineExecutionStepSuccessAsync( new SendPipelineExecutionStepSuccessRequest() { CallbackToken = token, OutputParameters = new List<OutputParameter>() { new OutputParameter() { Name = "export_status", Value = jobInfo.Result.Status } } }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.FAILED) { context.Logger.LogInformation($"Status failed, stopping pipeline..."); await sageMakerClient.SendPipelineExecutionStepFailureAsync( new SendPipelineExecutionStepFailureRequest() { CallbackToken = token, FailureReason = jobInfo.Result.ErrorDetails.ErrorMessage }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.IN_PROGRESS) { // Put this message back in the queue to reprocess later. context.Logger.LogInformation( $"Status still in progress, check back later."); throw new("Job still running."); } } } }

在命令提示中執行互動式案例。

public static class PipelineWorkflow { public static IAmazonIdentityManagementService _iamClient = null!; public static SageMakerWrapper _sageMakerWrapper = null!; public static IAmazonSQS _sqsClient = null!; public static IAmazonS3 _s3Client = null!; public static IAmazonLambda _lambdaClient = null!; public static IConfiguration _configuration = null!; public static string lambdaFunctionName = "SageMakerExampleFunction"; public static string sageMakerRoleName = "SageMakerExampleRole"; public static string lambdaRoleName = "SageMakerExampleLambdaRole"; private static string[] lambdaRolePolicies = null!; private static string[] sageMakerRolePolicies = null!; static async Task Main(string[] args) { var options = new AWSOptions() { Region = RegionEndpoint.USWest2 }; // Set up dependency injection for the AWS service. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonIdentityManagementService>(options) .AddAWSService<IAmazonEC2>(options) .AddAWSService<IAmazonSageMaker>(options) .AddAWSService<IAmazonSageMakerGeospatial>(options) .AddAWSService<IAmazonSQS>(options) .AddAWSService<IAmazonS3>(options) .AddAWSService<IAmazonLambda>(options) .AddTransient<SageMakerWrapper>() ) .Build(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally, load local settings. .Build(); ServicesSetup(host); string queueUrl = ""; string queueName = _configuration["queueName"]; string bucketName = _configuration["bucketName"]; var pipelineName = _configuration["pipelineName"]; try { Console.WriteLine(new string('-', 80)); Console.WriteLine( "Welcome to the Amazon SageMaker pipeline example scenario."); Console.WriteLine( "\nThis example workflow will guide you through setting up and running an" + "\nAmazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an" + "\nAmazon SQS Queue. It runs a vector enrichment reverse geocode job to" + "\nreverse geocode addresses in an input file and store the results in an export file."); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine( "First, we will set up the roles, functions, and queue needed by the SageMaker pipeline."); Console.WriteLine(new string('-', 80)); var lambdaRoleArn = await CreateLambdaRole(); var sageMakerRoleArn = await CreateSageMakerRole(); var functionArn = await SetupLambda(lambdaRoleArn, true); queueUrl = await SetupQueue(queueName); await SetupBucket(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can create and run our pipeline."); Console.WriteLine(new string('-', 80)); await SetupPipeline(sageMakerRoleArn, functionArn, pipelineName); var executionArn = await ExecutePipeline(queueUrl, sageMakerRoleArn, pipelineName, bucketName); await WaitForPipelineExecution(executionArn); await GetOutputResults(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("The pipeline has completed. To view the pipeline and runs " + "in SageMaker Studio, follow these instructions:" + "\nhttps://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html"); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine("Finally, let's clean up our resources."); Console.WriteLine(new string('-', 80)); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("SageMaker pipeline scenario is complete."); Console.WriteLine(new string('-', 80)); } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { _sageMakerWrapper = host.Services.GetRequiredService<SageMakerWrapper>(); _iamClient = host.Services.GetRequiredService<IAmazonIdentityManagementService>(); _sqsClient = host.Services.GetRequiredService<IAmazonSQS>(); _s3Client = host.Services.GetRequiredService<IAmazonS3>(); _lambdaClient = host.Services.GetRequiredService<IAmazonLambda>(); } /// <summary> /// Set up AWS Lambda, either by updating an existing function or creating a new function. /// </summary> /// <param name="roleArn">The role Amazon Resource Name (ARN) to use for the Lambda function.</param> /// <param name="askUser">True to ask the user before updating.</param> /// <returns>The ARN of the function.</returns> public static async Task<string> SetupLambda(string roleArn, bool askUser) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Setting up the Lambda function for the pipeline."); var handlerName = "SageMakerLambda::SageMakerLambda.SageMakerLambdaFunction::FunctionHandler"; var functionArn = ""; try { var functionInfo = await _lambdaClient.GetFunctionAsync(new GetFunctionRequest() { FunctionName = lambdaFunctionName }); var updateFunction = true; if (askUser) { updateFunction = GetYesNoResponse( $"\tThe Lambda function {lambdaFunctionName} already exists, do you want to update it?"); } if (updateFunction) { // Update the Lambda function. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); await _lambdaClient.UpdateFunctionCodeAsync( new UpdateFunctionCodeRequest() { FunctionName = lambdaFunctionName, ZipFile = zipMemoryStream, }); } functionArn = functionInfo.Configuration.FunctionArn; } catch (ResourceNotFoundException) { Console.WriteLine($"\tThe Lambda function {lambdaFunctionName} was not found, creating the new function."); // Create the function if it does not already exist. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); var createResult = await _lambdaClient.CreateFunctionAsync( new CreateFunctionRequest() { FunctionName = lambdaFunctionName, Runtime = Runtime.Dotnet6, Description = "SageMaker example function.", Code = new FunctionCode() { ZipFile = zipMemoryStream }, Handler = handlerName, Role = roleArn, Timeout = 30 }); functionArn = createResult.FunctionArn; } Console.WriteLine($"\tLambda ready with ARN {functionArn}."); Console.WriteLine(new string('-', 80)); return functionArn; } /// <summary> /// Create a role to be used by AWS Lambda. Does not create the role if it already exists. /// </summary> /// <returns>The role ARN.</returns> public static async Task<string> CreateLambdaRole() { Console.WriteLine(new string('-', 80)); lambdaRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSQSFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy", "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole" }; var roleArn = await GetRoleArnIfExists(lambdaRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to for AWS Lambda to use."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = lambdaRoleName }); foreach (var policy in lambdaRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = lambdaRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Create a role to be used by SageMaker. /// </summary> /// <returns>The role Amazon Resource Name (ARN).</returns> public static async Task<string> CreateSageMakerRole() { Console.WriteLine(new string('-', 80)); sageMakerRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSageMakerGeospatialFullAccess", }; var roleArn = await GetRoleArnIfExists(sageMakerRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to use with SageMaker."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = sageMakerRoleName }); foreach (var policy in sageMakerRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = sageMakerRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Set up the SQS queue to use with the pipeline. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <returns>The URL for the queue.</returns> public static async Task<string> SetupQueue(string queueName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up queue {queueName}."); try { var queueInfo = await _sqsClient.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = queueName }); return queueInfo.QueueUrl; } catch (QueueDoesNotExistException) { var attrs = new Dictionary<string, string> { { QueueAttributeName.DelaySeconds, "5" }, { QueueAttributeName.ReceiveMessageWaitTimeSeconds, "5" }, { QueueAttributeName.VisibilityTimeout, "300" }, }; var request = new CreateQueueRequest { Attributes = attrs, QueueName = queueName, }; var response = await _sqsClient.CreateQueueAsync(request); Thread.Sleep(10000); await ConnectLambda(response.QueueUrl); Console.WriteLine($"\tQueue ready with Url {response.QueueUrl}."); Console.WriteLine(new string('-', 80)); return response.QueueUrl; } } /// <summary> /// Connect the queue to the Lambda function as an event source. /// </summary> /// <param name="queueUrl">The URL for the queue.</param> /// <returns>Async task.</returns> public static async Task ConnectLambda(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Connecting the Lambda function and queue for the pipeline."); var queueAttributes = await _sqsClient.GetQueueAttributesAsync( new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { "All" } }); var queueArn = queueAttributes.QueueARN; var eventSource = await _lambdaClient.ListEventSourceMappingsAsync( new ListEventSourceMappingsRequest() { FunctionName = lambdaFunctionName }); if (!eventSource.EventSourceMappings.Any()) { // Only add the event source mapping if it does not already exist. await _lambdaClient.CreateEventSourceMappingAsync( new CreateEventSourceMappingRequest() { EventSourceArn = queueArn, FunctionName = lambdaFunctionName, Enabled = true }); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the bucket to use for pipeline input and output. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task SetupBucket(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up bucket {bucketName}."); var bucketExists = await Amazon.S3.Util.AmazonS3Util.DoesS3BucketExistV2Async(_s3Client, bucketName); if (!bucketExists) { await _s3Client.PutBucketAsync(new PutBucketRequest() { BucketName = bucketName, BucketRegion = S3Region.USWest2 }); Thread.Sleep(5000); await _s3Client.PutObjectAsync(new PutObjectRequest() { BucketName = bucketName, Key = "samplefiles/latlongtest.csv", FilePath = "latlongtest.csv" }); } Console.WriteLine($"\tBucket {bucketName} ready."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Display some results from the output directory. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task<string> GetOutputResults(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Getting output results {bucketName}."); string outputKey = ""; Thread.Sleep(15000); var outputFiles = await _s3Client.ListObjectsAsync( new ListObjectsRequest() { BucketName = bucketName, Prefix = "outputfiles/" }); if (outputFiles.S3Objects.Any()) { var sampleOutput = outputFiles.S3Objects.OrderBy(s => s.LastModified).Last(); Console.WriteLine($"\tOutput file: {sampleOutput.Key}"); var outputSampleResponse = await _s3Client.GetObjectAsync( new GetObjectRequest() { BucketName = bucketName, Key = sampleOutput.Key }); outputKey = sampleOutput.Key; StreamReader reader = new StreamReader(outputSampleResponse.ResponseStream); await reader.ReadLineAsync(); Console.WriteLine("\tOutput file contents: \n"); for (int i = 0; i < 10; i++) { if (!reader.EndOfStream) { Console.WriteLine("\t" + await reader.ReadLineAsync()); } } } Console.WriteLine(new string('-', 80)); return outputKey; } /// <summary> /// Create a pipeline from the example pipeline JSON /// that includes the Lambda, callback, processing, and export jobs. /// </summary> /// <param name="roleArn">The ARN of the role for the pipeline.</param> /// <param name="functionArn">The ARN of the Lambda function for the pipeline.</param> /// <param name="pipelineName">The name for the pipeline.</param> /// <returns>The ARN of the pipeline.</returns> public static async Task<string> SetupPipeline(string roleArn, string functionArn, string pipelineName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up the pipeline."); var pipelineJson = await File.ReadAllTextAsync("GeoSpatialPipeline.json"); // Add the correct function ARN instead of the placeholder. pipelineJson = pipelineJson.Replace("*FUNCTION_ARN*", functionArn); var pipelineArn = await _sageMakerWrapper.SetupPipeline(pipelineJson, roleArn, pipelineName, "sdk example pipeline", pipelineName); Console.WriteLine($"\tPipeline set up with ARN {pipelineArn}."); Console.WriteLine(new string('-', 80)); return pipelineArn; } /// <summary> /// Start a pipeline run with job configurations. /// </summary> /// <param name="queueUrl">The URL for the queue used in the pipeline.</param> /// <param name="roleArn">The ARN of the role.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>The pipeline run ARN.</returns> public static async Task<string> ExecutePipeline( string queueUrl, string roleArn, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Starting pipeline execution."); var input = $"s3://{bucketName}/samplefiles/latlongtest.csv"; var output = $"s3://{bucketName}/outputfiles/"; var executionARN = await _sageMakerWrapper.ExecutePipeline(queueUrl, input, output, pipelineName, roleArn); Console.WriteLine($"\tRun started with ARN {executionARN}."); Console.WriteLine(new string('-', 80)); return executionARN; } /// <summary> /// Wait for a pipeline run to complete. /// </summary> /// <param name="executionArn">The pipeline run ARN.</param> /// <returns>Async task.</returns> public static async Task WaitForPipelineExecution(string executionArn) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Waiting for pipeline to finish."); PipelineExecutionStatus status; do { status = await _sageMakerWrapper.CheckPipelineExecutionStatus(executionArn); Thread.Sleep(30000); Console.WriteLine($"\tStatus is {status}."); } while (status == PipelineExecutionStatus.Executing); Console.WriteLine($"\tPipeline finished with status {status}."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <param name="askUser">True to ask the user for cleanup.</param> /// <param name="queueUrl">The URL of the queue to clean up.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>Async task.</returns> public static async Task<bool> CleanupResources( bool askUser, string queueUrl, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); if (!askUser || GetYesNoResponse($"\tDelete pipeline {pipelineName}? (y/n)")) { Console.WriteLine($"\tDeleting pipeline."); // Delete the pipeline. await _sageMakerWrapper.DeletePipelineByName(pipelineName); } if (!string.IsNullOrEmpty(queueUrl) && (!askUser || GetYesNoResponse($"\tDelete queue {queueUrl}? (y/n)"))) { Console.WriteLine($"\tDeleting queue."); // Delete the queue. await _sqsClient.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); } if (!askUser || GetYesNoResponse($"\tDelete Amazon S3 bucket {bucketName}? (y/n)")) { Console.WriteLine($"\tDeleting bucket."); // Delete all objects in the bucket. var deleteList = await _s3Client.ListObjectsV2Async(new ListObjectsV2Request() { BucketName = bucketName }); if (deleteList.KeyCount > 0) { await _s3Client.DeleteObjectsAsync(new DeleteObjectsRequest() { BucketName = bucketName, Objects = deleteList.S3Objects .Select(o => new KeyVersion { Key = o.Key }).ToList() }); } // Now delete the bucket. await _s3Client.DeleteBucketAsync(new DeleteBucketRequest() { BucketName = bucketName }); } if (!askUser || GetYesNoResponse($"\tDelete lambda {lambdaFunctionName}? (y/n)")) { Console.WriteLine($"\tDeleting lambda function."); await _lambdaClient.DeleteFunctionAsync(new DeleteFunctionRequest() { FunctionName = lambdaFunctionName }); } if (!askUser || GetYesNoResponse($"\tDelete role {lambdaRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in lambdaRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = lambdaRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = lambdaRoleName }); } if (!askUser || GetYesNoResponse($"\tDelete role {sageMakerRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in sageMakerRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = sageMakerRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = sageMakerRoleName }); } Console.WriteLine(new string('-', 80)); return true; } /// <summary> /// Helper method to get a role's ARN if it already exists. /// </summary> /// <param name="roleName">The name of the AWS Identity and Access Management (IAM) Role to look for.</param> /// <returns>The role ARN if it exists, otherwise an empty string.</returns> private static async Task<string> GetRoleArnIfExists(string roleName) { Console.WriteLine($"Checking for role named {roleName}."); try { var existingRole = await _iamClient.GetRoleAsync(new GetRoleRequest() { RoleName = lambdaRoleName }); return existingRole.Role.Arn; } catch (NoSuchEntityException) { return string.Empty; } } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } }
Java
SDK 適用於 Java 2.x
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

public class SagemakerWorkflow { public static final String DASHES = new String(new char[80]).replace("\0", "-"); private static String eventSourceMapping = ""; public static void main(String[] args) throws InterruptedException { final String usage = "\n" + "Usage:\n" + " <sageMakerRoleName> <lambdaRoleName> <functionFileLocation> <functionName> <queueName> <bucketName> <lnglatData> <spatialPipelinePath> <pipelineName>\n\n" + "Where:\n" + " sageMakerRoleName - The name of the Amazon SageMaker role.\n\n" + " lambdaRoleName - The name of the AWS Lambda role.\n\n" + " functionFileLocation - The file location where the JAR file that represents the AWS Lambda function is located.\n\n" + " functionName - The name of the AWS Lambda function (for example,SageMakerExampleFunction).\n\n" + " queueName - The name of the Amazon Simple Queue Service (Amazon SQS) queue.\n\n" + " bucketName - The name of the Amazon Simple Storage Service (Amazon S3) bucket.\n\n" + " lnglatData - The file location of the latlongtest.csv file required for this use case.\n\n" + " spatialPipelinePath - The file location of the GeoSpatialPipeline.json file required for this use case.\n\n" + " pipelineName - The name of the pipeline to create (for example, sagemaker-sdk-example-pipeline).\n\n"; if (args.length != 9) { System.out.println(usage); System.exit(1); } String sageMakerRoleName = args[0]; String lambdaRoleName = args[1]; String functionFileLocation = args[2]; String functionName = args[3]; String queueName = args[4]; String bucketName = args[5]; String lnglatData = args[6]; String spatialPipelinePath = args[7]; String pipelineName = args[8]; String handlerName = "org.example.SageMakerLambdaFunction::handleRequest"; Region region = Region.US_WEST_2; SageMakerClient sageMakerClient = SageMakerClient.builder() .region(region) .build(); IamClient iam = IamClient.builder() .region(region) .build(); LambdaClient lambdaClient = LambdaClient.builder() .region(region) .build(); SqsClient sqsClient = SqsClient.builder() .region(region) .build(); S3Client s3Client = S3Client.builder() .region(region) .build(); System.out.println(DASHES); System.out.println("Welcome to the Amazon SageMaker pipeline example scenario."); System.out.println( "\nThis example workflow will guide you through setting up and running an" + "\nAmazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an" + "\nAmazon SQS Queue. It runs a vector enrichment reverse geocode job to" + "\nreverse geocode addresses in an input file and store the results in an export file."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("First, we will set up the roles, functions, and queue needed by the SageMaker pipeline."); String lambdaRoleArn = checkLambdaRole(iam, lambdaRoleName); String sageMakerRoleArn = checkSageMakerRole(iam, sageMakerRoleName); String functionArn = checkFunction(lambdaClient, functionName, functionFileLocation, lambdaRoleArn, handlerName); String queueUrl = checkQueue(sqsClient, lambdaClient, queueName, functionName); System.out.println("The queue URL is " + queueUrl); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Setting up bucket " + bucketName); if (!checkBucket(s3Client, bucketName)) { setupBucket(s3Client, bucketName); System.out.println("Put " + lnglatData + " into " + bucketName); putS3Object(s3Client, bucketName, "latlongtest.csv", lnglatData); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("Now we can create and run our pipeline."); setupPipeline(sageMakerClient, spatialPipelinePath, sageMakerRoleArn, functionArn, pipelineName); String pipelineExecutionARN = executePipeline(sageMakerClient, bucketName, queueUrl, sageMakerRoleArn, pipelineName); System.out.println("The pipeline execution ARN value is " + pipelineExecutionARN); waitForPipelineExecution(sageMakerClient, pipelineExecutionARN); System.out.println("Getting output results " + bucketName); getOutputResults(s3Client, bucketName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("The pipeline has completed. To view the pipeline and runs " + "in SageMaker Studio, follow these instructions:" + "\nhttps://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html"); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Do you want to delete the AWS resources used in this Workflow? (y/n)"); Scanner in = new Scanner(System.in); String delResources = in.nextLine(); if (delResources.compareTo("y") == 0) { System.out.println("Lets clean up the AWS resources. Wait 30 seconds"); TimeUnit.SECONDS.sleep(30); deleteEventSourceMapping(lambdaClient); deleteSQSQueue(sqsClient, queueName); listBucketObjects(s3Client, bucketName); deleteBucket(s3Client, bucketName); deleteLambdaFunction(lambdaClient, functionName); deleteLambdaRole(iam, lambdaRoleName); deleteSagemakerRole(iam, sageMakerRoleName); deletePipeline(sageMakerClient, pipelineName); } else { System.out.println("The AWS Resources were not deleted!"); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("SageMaker pipeline scenario is complete."); System.out.println(DASHES); } private static void readObject(S3Client s3Client, String bucketName, String key) { System.out.println("Output file contents: \n"); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(key) .build(); ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(objectRequest); byte[] byteArray = objectBytes.asByteArray(); String text = new String(byteArray, StandardCharsets.UTF_8); System.out.println("Text output: " + text); } // Display some results from the output directory. public static void getOutputResults(S3Client s3Client, String bucketName) { System.out.println("Getting output results {bucketName}."); ListObjectsRequest listObjectsRequest = ListObjectsRequest.builder() .bucket(bucketName) .prefix("outputfiles/") .build(); ListObjectsResponse response = s3Client.listObjects(listObjectsRequest); List<S3Object> s3Objects = response.contents(); for (S3Object object : s3Objects) { readObject(s3Client, bucketName, object.key()); } } // Check the status of a pipeline execution. public static void waitForPipelineExecution(SageMakerClient sageMakerClient, String executionArn) throws InterruptedException { String status; int index = 0; do { DescribePipelineExecutionRequest pipelineExecutionRequest = DescribePipelineExecutionRequest.builder() .pipelineExecutionArn(executionArn) .build(); DescribePipelineExecutionResponse response = sageMakerClient .describePipelineExecution(pipelineExecutionRequest); status = response.pipelineExecutionStatusAsString(); System.out.println(index + ". The Status of the pipeline is " + status); TimeUnit.SECONDS.sleep(4); index++; } while ("Executing".equals(status)); System.out.println("Pipeline finished with status " + status); } // Delete a SageMaker pipeline by name. public static void deletePipeline(SageMakerClient sageMakerClient, String pipelineName) { DeletePipelineRequest pipelineRequest = DeletePipelineRequest.builder() .pipelineName(pipelineName) .build(); sageMakerClient.deletePipeline(pipelineRequest); System.out.println("*** Successfully deleted " + pipelineName); } // Create a pipeline from the example pipeline JSON. public static void setupPipeline(SageMakerClient sageMakerClient, String filePath, String roleArn, String functionArn, String pipelineName) { System.out.println("Setting up the pipeline."); JSONParser parser = new JSONParser(); // Read JSON and get pipeline definition. try (FileReader reader = new FileReader(filePath)) { Object obj = parser.parse(reader); JSONObject jsonObject = (JSONObject) obj; JSONArray stepsArray = (JSONArray) jsonObject.get("Steps"); for (Object stepObj : stepsArray) { JSONObject step = (JSONObject) stepObj; if (step.containsKey("FunctionArn")) { step.put("FunctionArn", functionArn); } } System.out.println(jsonObject); // Create the pipeline. CreatePipelineRequest pipelineRequest = CreatePipelineRequest.builder() .pipelineDescription("Java SDK example pipeline") .roleArn(roleArn) .pipelineName(pipelineName) .pipelineDefinition(jsonObject.toString()) .build(); sageMakerClient.createPipeline(pipelineRequest); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } catch (IOException | ParseException e) { throw new RuntimeException(e); } } // Start a pipeline run with job configurations. public static String executePipeline(SageMakerClient sageMakerClient, String bucketName, String queueUrl, String roleArn, String pipelineName) { System.out.println("Starting pipeline execution."); String inputBucketLocation = "s3://" + bucketName + "/samplefiles/latlongtest.csv"; String output = "s3://" + bucketName + "/outputfiles/"; Gson gson = new GsonBuilder() .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE) .setPrettyPrinting().create(); // Set up all parameters required to start the pipeline. List<Parameter> parameters = new ArrayList<>(); Parameter para1 = Parameter.builder() .name("parameter_execution_role") .value(roleArn) .build(); Parameter para2 = Parameter.builder() .name("parameter_queue_url") .value(queueUrl) .build(); String inputJSON = "{\n" + " \"DataSourceConfig\": {\n" + " \"S3Data\": {\n" + " \"S3Uri\": \"s3://" + bucketName + "/samplefiles/latlongtest.csv\"\n" + " },\n" + " \"Type\": \"S3_DATA\"\n" + " },\n" + " \"DocumentType\": \"CSV\"\n" + "}"; System.out.println(inputJSON); Parameter para3 = Parameter.builder() .name("parameter_vej_input_config") .value(inputJSON) .build(); // Create an ExportVectorEnrichmentJobOutputConfig object. VectorEnrichmentJobS3Data jobS3Data = VectorEnrichmentJobS3Data.builder() .s3Uri(output) .build(); ExportVectorEnrichmentJobOutputConfig outputConfig = ExportVectorEnrichmentJobOutputConfig.builder() .s3Data(jobS3Data) .build(); String gson4 = gson.toJson(outputConfig); Parameter para4 = Parameter.builder() .name("parameter_vej_export_config") .value(gson4) .build(); System.out.println("parameter_vej_export_config:" + gson.toJson(outputConfig)); // Create a VectorEnrichmentJobConfig object. ReverseGeocodingConfig reverseGeocodingConfig = ReverseGeocodingConfig.builder() .xAttributeName("Longitude") .yAttributeName("Latitude") .build(); VectorEnrichmentJobConfig jobConfig = VectorEnrichmentJobConfig.builder() .reverseGeocodingConfig(reverseGeocodingConfig) .build(); String para5JSON = "{\"MapMatchingConfig\":null,\"ReverseGeocodingConfig\":{\"XAttributeName\":\"Longitude\",\"YAttributeName\":\"Latitude\"}}"; Parameter para5 = Parameter.builder() .name("parameter_step_1_vej_config") .value(para5JSON) .build(); System.out.println("parameter_step_1_vej_config:" + gson.toJson(jobConfig)); parameters.add(para1); parameters.add(para2); parameters.add(para3); parameters.add(para4); parameters.add(para5); StartPipelineExecutionRequest pipelineExecutionRequest = StartPipelineExecutionRequest.builder() .pipelineExecutionDescription("Created using Java SDK") .pipelineExecutionDisplayName(pipelineName + "-example-execution") .pipelineParameters(parameters) .pipelineName(pipelineName) .build(); StartPipelineExecutionResponse response = sageMakerClient.startPipelineExecution(pipelineExecutionRequest); return response.pipelineExecutionArn(); } public static void deleteEventSourceMapping(LambdaClient lambdaClient) { DeleteEventSourceMappingRequest eventSourceMappingRequest = DeleteEventSourceMappingRequest.builder() .uuid(eventSourceMapping) .build(); lambdaClient.deleteEventSourceMapping(eventSourceMappingRequest); } public static void deleteSagemakerRole(IamClient iam, String roleName) { String[] sageMakerRolePolicies = getSageMakerRolePolicies(); try { for (String policy : sageMakerRolePolicies) { // First the policy needs to be detached. DetachRolePolicyRequest rolePolicyRequest = DetachRolePolicyRequest.builder() .policyArn(policy) .roleName(roleName) .build(); iam.detachRolePolicy(rolePolicyRequest); } // Delete the role. DeleteRoleRequest roleRequest = DeleteRoleRequest.builder() .roleName(roleName) .build(); iam.deleteRole(roleRequest); System.out.println("*** Successfully deleted " + roleName); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteLambdaRole(IamClient iam, String roleName) { String[] lambdaRolePolicies = getLambdaRolePolicies(); try { for (String policy : lambdaRolePolicies) { // First the policy needs to be detached. DetachRolePolicyRequest rolePolicyRequest = DetachRolePolicyRequest.builder() .policyArn(policy) .roleName(roleName) .build(); iam.detachRolePolicy(rolePolicyRequest); } // Delete the role. DeleteRoleRequest roleRequest = DeleteRoleRequest.builder() .roleName(roleName) .build(); iam.deleteRole(roleRequest); System.out.println("*** Successfully deleted " + roleName); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } // Delete the specific AWS Lambda function. public static void deleteLambdaFunction(LambdaClient awsLambda, String functionName) { try { DeleteFunctionRequest request = DeleteFunctionRequest.builder() .functionName(functionName) .build(); awsLambda.deleteFunction(request); System.out.println("*** " + functionName + " was deleted"); } catch (LambdaException e) { System.err.println(e.getMessage()); System.exit(1); } } // Delete the specific S3 bucket. public static void deleteBucket(S3Client s3Client, String bucketName) { DeleteBucketRequest deleteBucketRequest = DeleteBucketRequest.builder() .bucket(bucketName) .build(); s3Client.deleteBucket(deleteBucketRequest); System.out.println("*** " + bucketName + " was deleted."); } public static void listBucketObjects(S3Client s3, String bucketName) { try { ListObjectsRequest listObjects = ListObjectsRequest .builder() .bucket(bucketName) .build(); ListObjectsResponse res = s3.listObjects(listObjects); List<S3Object> objects = res.contents(); for (S3Object myValue : objects) { System.out.print("\n The name of the key is " + myValue.key()); deleteBucketObjects(s3, bucketName, myValue.key()); } } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteBucketObjects(S3Client s3, String bucketName, String objectName) { ArrayList<ObjectIdentifier> toDelete = new ArrayList<>(); toDelete.add(ObjectIdentifier.builder() .key(objectName) .build()); try { DeleteObjectsRequest dor = DeleteObjectsRequest.builder() .bucket(bucketName) .delete(Delete.builder() .objects(toDelete).build()) .build(); s3.deleteObjects(dor); System.out.println("*** " + bucketName + " objects were deleted."); } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } // Delete the specific Amazon SQS queue. public static void deleteSQSQueue(SqsClient sqsClient, String queueName) { try { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(queueName) .build(); String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl(); DeleteQueueRequest deleteQueueRequest = DeleteQueueRequest.builder() .queueUrl(queueUrl) .build(); sqsClient.deleteQueue(deleteQueueRequest); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void putS3Object(S3Client s3, String bucketName, String objectKey, String objectPath) { try { Map<String, String> metadata = new HashMap<>(); metadata.put("x-amz-meta-myVal", "test"); PutObjectRequest putOb = PutObjectRequest.builder() .bucket(bucketName) .key("samplefiles/" + objectKey) .metadata(metadata) .build(); s3.putObject(putOb, RequestBody.fromFile(new File(objectPath))); System.out.println("Successfully placed " + objectKey + " into bucket " + bucketName); } catch (S3Exception e) { System.err.println(e.getMessage()); System.exit(1); } } public static void setupBucket(S3Client s3Client, String bucketName) { try { S3Waiter s3Waiter = s3Client.waiter(); CreateBucketRequest bucketRequest = CreateBucketRequest.builder() .bucket(bucketName) .build(); s3Client.createBucket(bucketRequest); HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder() .bucket(bucketName) .build(); // Wait until the bucket is created and print out the response. WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait); waiterResponse.matched().response().ifPresent(System.out::println); System.out.println(bucketName + " is ready"); } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } // Set up the SQS queue to use with the pipeline. public static String setupQueue(SqsClient sqsClient, LambdaClient lambdaClient, String queueName, String lambdaName) { System.out.println("Setting up queue named " + queueName); try { Map<QueueAttributeName, String> queueAtt = new HashMap<>(); queueAtt.put(QueueAttributeName.DELAY_SECONDS, "5"); queueAtt.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, "5"); queueAtt.put(QueueAttributeName.VISIBILITY_TIMEOUT, "300"); CreateQueueRequest createQueueRequest = CreateQueueRequest.builder() .queueName(queueName) .attributes(queueAtt) .build(); sqsClient.createQueue(createQueueRequest); System.out.println("\nGet queue url"); GetQueueUrlResponse getQueueUrlResponse = sqsClient .getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()); TimeUnit.SECONDS.sleep(15); connectLambda(sqsClient, lambdaClient, getQueueUrlResponse.queueUrl(), lambdaName); System.out.println("Queue ready with Url " + getQueueUrlResponse.queueUrl()); return getQueueUrlResponse.queueUrl(); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; } // Connect the queue to the Lambda function as an event source. public static void connectLambda(SqsClient sqsClient, LambdaClient lambdaClient, String queueUrl, String lambdaName) { System.out.println("Connecting the Lambda function and queue for the pipeline."); String queueArn = ""; // Specify the attributes to retrieve. List<QueueAttributeName> atts = new ArrayList<>(); atts.add(QueueAttributeName.QUEUE_ARN); GetQueueAttributesRequest attributesRequest = GetQueueAttributesRequest.builder() .queueUrl(queueUrl) .attributeNames(atts) .build(); GetQueueAttributesResponse response = sqsClient.getQueueAttributes(attributesRequest); Map<String, String> queueAtts = response.attributesAsStrings(); for (Map.Entry<String, String> queueAtt : queueAtts.entrySet()) { System.out.println("Key = " + queueAtt.getKey() + ", Value = " + queueAtt.getValue()); queueArn = queueAtt.getValue(); } CreateEventSourceMappingRequest eventSourceMappingRequest = CreateEventSourceMappingRequest.builder() .eventSourceArn(queueArn) .functionName(lambdaName) .build(); CreateEventSourceMappingResponse response1 = lambdaClient.createEventSourceMapping(eventSourceMappingRequest); eventSourceMapping = response1.uuid(); System.out.println("The mapping between the event source and Lambda function was successful"); } // Create an AWS Lambda function. public static String createLambdaFunction(LambdaClient awsLambda, String functionName, String filePath, String role, String handler) { try { LambdaWaiter waiter = awsLambda.waiter(); InputStream is = new FileInputStream(filePath); SdkBytes fileToUpload = SdkBytes.fromInputStream(is); FunctionCode code = FunctionCode.builder() .zipFile(fileToUpload) .build(); CreateFunctionRequest functionRequest = CreateFunctionRequest.builder() .functionName(functionName) .description("SageMaker example function.") .code(code) .handler(handler) .runtime(Runtime.JAVA11) .timeout(200) .memorySize(1024) .role(role) .build(); // Create a Lambda function using a waiter. CreateFunctionResponse functionResponse = awsLambda.createFunction(functionRequest); GetFunctionRequest getFunctionRequest = GetFunctionRequest.builder() .functionName(functionName) .build(); WaiterResponse<GetFunctionResponse> waiterResponse = waiter.waitUntilFunctionExists(getFunctionRequest); waiterResponse.matched().response().ifPresent(System.out::println); System.out.println("The function ARN is " + functionResponse.functionArn()); return functionResponse.functionArn(); } catch (LambdaException | FileNotFoundException e) { System.err.println(e.getMessage()); System.exit(1); } return ""; } public static String createSageMakerRole(IamClient iam, String roleName) { String[] sageMakerRolePolicies = getSageMakerRolePolicies(); System.out.println("Creating a role to use with SageMaker."); String assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + "\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; try { CreateRoleRequest request = CreateRoleRequest.builder() .roleName(roleName) .assumeRolePolicyDocument(assumeRolePolicy) .description("Created using the AWS SDK for Java") .build(); CreateRoleResponse roleResult = iam.createRole(request); // Attach the policies to the role. for (String policy : sageMakerRolePolicies) { AttachRolePolicyRequest attachRequest = AttachRolePolicyRequest.builder() .roleName(roleName) .policyArn(policy) .build(); iam.attachRolePolicy(attachRequest); } // Allow time for the role to be ready. TimeUnit.SECONDS.sleep(15); System.out.println("Role ready with ARN " + roleResult.role().arn()); return roleResult.role().arn(); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; } private static String createLambdaRole(IamClient iam, String roleName) { String[] lambdaRolePolicies = getLambdaRolePolicies(); String assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + "\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; try { CreateRoleRequest request = CreateRoleRequest.builder() .roleName(roleName) .assumeRolePolicyDocument(assumeRolePolicy) .description("Created using the AWS SDK for Java") .build(); CreateRoleResponse roleResult = iam.createRole(request); // Attach the policies to the role. for (String policy : lambdaRolePolicies) { AttachRolePolicyRequest attachRequest = AttachRolePolicyRequest.builder() .roleName(roleName) .policyArn(policy) .build(); iam.attachRolePolicy(attachRequest); } // Allow time for the role to be ready. TimeUnit.SECONDS.sleep(15); System.out.println("Role ready with ARN " + roleResult.role().arn()); return roleResult.role().arn(); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; } public static String checkFunction(LambdaClient lambdaClient, String functionName, String filePath, String role, String handler) { System.out.println("Create an AWS Lambda function used in this workflow."); String functionArn; try { // Does this function already exist. GetFunctionRequest functionRequest = GetFunctionRequest.builder() .functionName(functionName) .build(); GetFunctionResponse response = lambdaClient.getFunction(functionRequest); functionArn = response.configuration().functionArn(); } catch (LambdaException e) { System.err.println(e.awsErrorDetails().errorMessage()); functionArn = createLambdaFunction(lambdaClient, functionName, filePath, role, handler); } return functionArn; } // Check to see if the specific S3 bucket exists. If the S3 bucket exists, this // method returns true. public static boolean checkBucket(S3Client s3, String bucketName) { try { HeadBucketRequest headBucketRequest = HeadBucketRequest.builder() .bucket(bucketName) .build(); s3.headBucket(headBucketRequest); System.out.println(bucketName + " exists"); return true; } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); } return false; } // Checks to see if the Amazon SQS queue exists. If not, this method creates a // new queue // and returns the ARN value. public static String checkQueue(SqsClient sqsClient, LambdaClient lambdaClient, String queueName, String lambdaName) { System.out.println("Creating a queue for this use case."); String queueUrl; try { GetQueueUrlRequest request = GetQueueUrlRequest.builder() .queueName(queueName) .build(); GetQueueUrlResponse response = sqsClient.getQueueUrl(request); queueUrl = response.queueUrl(); System.out.println(queueUrl); } catch (SqsException e) { System.err.println(e.awsErrorDetails().errorMessage()); queueUrl = setupQueue(sqsClient, lambdaClient, queueName, lambdaName); } return queueUrl; } // Checks to see if the Lambda role exists. If not, this method creates it. public static String checkLambdaRole(IamClient iam, String roleName) { System.out.println("Creating a role to for AWS Lambda to use."); String roleArn; try { GetRoleRequest roleRequest = GetRoleRequest.builder() .roleName(roleName) .build(); GetRoleResponse response = iam.getRole(roleRequest); roleArn = response.role().arn(); System.out.println(roleArn); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); roleArn = createLambdaRole(iam, roleName); } return roleArn; } // Checks to see if the SageMaker role exists. If not, this method creates it. public static String checkSageMakerRole(IamClient iam, String roleName) { System.out.println("Creating a role to for AWS SageMaker to use."); String roleArn; try { GetRoleRequest roleRequest = GetRoleRequest.builder() .roleName(roleName) .build(); GetRoleResponse response = iam.getRole(roleRequest); roleArn = response.role().arn(); System.out.println(roleArn); } catch (IamException e) { System.err.println(e.awsErrorDetails().errorMessage()); roleArn = createSageMakerRole(iam, roleName); } return roleArn; } private static String[] getSageMakerRolePolicies() { String[] sageMakerRolePolicies = new String[3]; sageMakerRolePolicies[0] = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"; sageMakerRolePolicies[1] = "arn:aws:iam::aws:policy/" + "AmazonSageMakerGeospatialFullAccess"; sageMakerRolePolicies[2] = "arn:aws:iam::aws:policy/AmazonSQSFullAccess"; return sageMakerRolePolicies; } private static String[] getLambdaRolePolicies() { String[] lambdaRolePolicies = new String[5]; lambdaRolePolicies[0] = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"; lambdaRolePolicies[1] = "arn:aws:iam::aws:policy/AmazonSQSFullAccess"; lambdaRolePolicies[2] = "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess"; lambdaRolePolicies[3] = "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy"; lambdaRolePolicies[4] = "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole"; return lambdaRolePolicies; } }
JavaScript
SDK 適用於 JavaScript (v3)
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

下列檔案摘錄包含使用 SageMaker 用戶端管理管道的函數。

import { readFileSync } from "node:fs"; import { CreateRoleCommand, DeleteRoleCommand, CreatePolicyCommand, DeletePolicyCommand, AttachRolePolicyCommand, DetachRolePolicyCommand, GetRoleCommand, ListPoliciesCommand, } from "@aws-sdk/client-iam"; import { PublishLayerVersionCommand, DeleteLayerVersionCommand, CreateFunctionCommand, Runtime, DeleteFunctionCommand, CreateEventSourceMappingCommand, DeleteEventSourceMappingCommand, GetFunctionCommand, } from "@aws-sdk/client-lambda"; import { PutObjectCommand, CreateBucketCommand, DeleteBucketCommand, DeleteObjectCommand, GetObjectCommand, ListObjectsV2Command, } from "@aws-sdk/client-s3"; import { CreatePipelineCommand, DeletePipelineCommand, DescribePipelineCommand, DescribePipelineExecutionCommand, PipelineExecutionStatus, StartPipelineExecutionCommand, } from "@aws-sdk/client-sagemaker"; import { VectorEnrichmentJobDocumentType } from "@aws-sdk/client-sagemaker-geospatial"; import { CreateQueueCommand, DeleteQueueCommand, GetQueueAttributesCommand, GetQueueUrlCommand, } from "@aws-sdk/client-sqs"; import { dirnameFromMetaUrl } from "@aws-doc-sdk-examples/lib/utils/util-fs.js"; import { retry } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; /** * Create the AWS IAM role that will be assumed by AWS Lambda. * @param {{ name: string, iamClient: import('@aws-sdk/client-iam').IAMClient }} props */ export async function createLambdaExecutionRole({ name, iamClient }) { const createRole = () => iamClient.send( new CreateRoleCommand({ RoleName: name, AssumeRolePolicyDocument: JSON.stringify({ Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: ["sts:AssumeRole"], Principal: { Service: ["lambda.amazonaws.com"] }, }, ], }), }), ); let role = null; try { const { Role } = await createRole(); role = Role; } catch (caught) { if ( caught instanceof Error && caught.name === "EntityAlreadyExistsException" ) { const { Role } = await iamClient.send( new GetRoleCommand({ RoleName: name }), ); role = Role; } else { throw caught; } } return { arn: role.Arn, cleanUp: async () => { await iamClient.send(new DeleteRoleCommand({ RoleName: name })); }, }; } /** * Create an AWS IAM policy that will be attached to the AWS IAM role assumed by the AWS Lambda function. * The policy grants permission to work with Amazon SQS, Amazon CloudWatch, and Amazon SageMaker. * @param {{name: string, iamClient: import('@aws-sdk/client-iam').IAMClient, pipelineExecutionRoleArn: string}} props */ export async function createLambdaExecutionPolicy({ name, iamClient, pipelineExecutionRoleArn, }) { const policyConfig = { Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: [ "sqs:ReceiveMessage", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "sagemaker-geospatial:StartVectorEnrichmentJob", "sagemaker-geospatial:GetVectorEnrichmentJob", "sagemaker:SendPipelineExecutionStepFailure", "sagemaker:SendPipelineExecutionStepSuccess", "sagemaker-geospatial:ExportVectorEnrichmentJob", ], Resource: "*", }, { Effect: "Allow", // The AWS Lambda function needs permission to pass the pipeline execution role to // the StartVectorEnrichmentCommand. This restriction prevents an AWS Lambda function // from elevating privileges. For more information, see: // https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_passrole.html Action: ["iam:PassRole"], Resource: `${pipelineExecutionRoleArn}`, Condition: { StringEquals: { "iam:PassedToService": [ "sagemaker.amazonaws.com", "sagemaker-geospatial.amazonaws.com", ], }, }, }, ], }; const createPolicy = () => iamClient.send( new CreatePolicyCommand({ PolicyDocument: JSON.stringify(policyConfig), PolicyName: name, }), ); let policy = null; try { const { Policy } = await createPolicy(); policy = Policy; } catch (caught) { if ( caught instanceof Error && caught.name === "EntityAlreadyExistsException" ) { const { Policies } = await iamClient.send(new ListPoliciesCommand({})); if (Policies) { policy = Policies.find((p) => p.PolicyName === name); } else { throw new Error("No policies found."); } } else { throw caught; } } return { arn: policy?.Arn, policyConfig, cleanUp: async () => { await iamClient.send(new DeletePolicyCommand({ PolicyArn: policy?.Arn })); }, }; } /** * Attach an AWS IAM policy to an AWS IAM role. * @param {{roleName: string, policyArn: string, iamClient: import('@aws-sdk/client-iam').IAMClient}} props */ export async function attachPolicy({ roleName, policyArn, iamClient }) { const attachPolicyCommand = new AttachRolePolicyCommand({ RoleName: roleName, PolicyArn: policyArn, }); await iamClient.send(attachPolicyCommand); return { cleanUp: async () => { await iamClient.send( new DetachRolePolicyCommand({ RoleName: roleName, PolicyArn: policyArn, }), ); }, }; } /** * Create an AWS Lambda layer that contains the Amazon SageMaker and Amazon SageMaker Geospatial clients * in the runtime. The default runtime supports v3.188.0 of the JavaScript SDK. The Amazon SageMaker * Geospatial client wasn't introduced until v3.221.0. * @param {{ name: string, lambdaClient: import('@aws-sdk/client-lambda').LambdaClient }} props */ export async function createLambdaLayer({ name, lambdaClient }) { const layerPath = `${dirnameFromMetaUrl(import.meta.url)}lambda/nodejs.zip`; const { LayerVersionArn, Version } = await lambdaClient.send( new PublishLayerVersionCommand({ LayerName: name, Content: { ZipFile: Uint8Array.from(readFileSync(layerPath)), }, }), ); return { versionArn: LayerVersionArn, version: Version, cleanUp: async () => { await lambdaClient.send( new DeleteLayerVersionCommand({ LayerName: name, VersionNumber: Version, }), ); }, }; } /** * Deploy the AWS Lambda function that will be used to respond to Amazon SageMaker pipeline * execution steps. * @param {{roleArn: string, name: string, lambdaClient: import('@aws-sdk/client-lambda').LambdaClient, layerVersionArn: string}} props */ export async function createLambdaFunction({ name, roleArn, lambdaClient, layerVersionArn, }) { const lambdaPath = `${dirnameFromMetaUrl( import.meta.url, )}lambda/dist/index.mjs.zip`; // If a function of the same name already exists, return that // function's ARN instead. By default this is // "sagemaker-wkflw-lambda-function", so collisions are // unlikely. const createFunction = async () => { try { return await lambdaClient.send( new CreateFunctionCommand({ Code: { ZipFile: Uint8Array.from(readFileSync(lambdaPath)), }, Runtime: Runtime.nodejs18x, Handler: "index.handler", Layers: [layerVersionArn], FunctionName: name, Role: roleArn, }), ); } catch (caught) { if ( caught instanceof Error && caught.name === "ResourceConflictException" ) { const { Configuration } = await lambdaClient.send( new GetFunctionCommand({ FunctionName: name }), ); return Configuration; } throw caught; } }; // Function creation fails if the Role is not ready. This retries // function creation until it succeeds or it times out. const { FunctionArn } = await retry( { intervalInMs: 1000, maxRetries: 60 }, createFunction, ); return { arn: FunctionArn, cleanUp: async () => { await lambdaClient.send( new DeleteFunctionCommand({ FunctionName: name }), ); }, }; } /** * This uploads some sample coordinate data to an Amazon S3 bucket. * The Amazon SageMaker Geospatial vector enrichment job will take the simple Lat/Long * coordinates in this file and augment them with more detailed location data. * @param {{bucketName: string, s3Client: import('@aws-sdk/client-s3').S3Client}} props */ export async function uploadCSVDataToS3({ bucketName, s3Client }) { const s3Path = `${dirnameFromMetaUrl( import.meta.url, )}../../../../../workflows/sagemaker_pipelines/resources/latlongtest.csv`; await s3Client.send( new PutObjectCommand({ Bucket: bucketName, Key: "input/sample_data.csv", Body: readFileSync(s3Path), }), ); } /** * Create the AWS IAM role that will be assumed by the Amazon SageMaker pipeline. * @param {{name: string, iamClient: import('@aws-sdk/client-iam').IAMClient, wait: (ms: number) => Promise<void>}} props */ export async function createSagemakerRole({ name, iamClient, wait }) { let role = null; const createRole = () => iamClient.send( new CreateRoleCommand({ RoleName: name, AssumeRolePolicyDocument: JSON.stringify({ Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: ["sts:AssumeRole"], Principal: { Service: [ "sagemaker.amazonaws.com", "sagemaker-geospatial.amazonaws.com", ], }, }, ], }), }), ); try { const { Role } = await createRole(); role = Role; // Wait for the role to be ready. await wait(10); } catch (caught) { if ( caught instanceof Error && caught.name === "EntityAlreadyExistsException" ) { const { Role } = await iamClient.send( new GetRoleCommand({ RoleName: name }), ); role = Role; } else { throw caught; } } return { arn: role.Arn, cleanUp: async () => { await iamClient.send(new DeleteRoleCommand({ RoleName: name })); }, }; } /** * Create the Amazon SageMaker execution policy. This policy grants permission to * invoke the AWS Lambda function, read/write to the Amazon S3 bucket, and send messages to * the Amazon SQS queue. * @param {{ name: string, sqsQueueArn: string, lambdaArn: string, iamClient: import('@aws-sdk/client-iam').IAMClient, s3BucketName: string}} props */ export async function createSagemakerExecutionPolicy({ sqsQueueArn, lambdaArn, iamClient, name, s3BucketName, }) { const policyConfig = { Version: "2012-10-17", Statement: [ { Effect: "Allow", Action: ["lambda:InvokeFunction"], Resource: lambdaArn, }, { Effect: "Allow", Action: ["s3:*"], Resource: [ `arn:aws:s3:::${s3BucketName}`, `arn:aws:s3:::${s3BucketName}/*`, ], }, { Effect: "Allow", Action: ["sqs:SendMessage"], Resource: sqsQueueArn, }, ], }; const createPolicy = () => iamClient.send( new CreatePolicyCommand({ PolicyDocument: JSON.stringify(policyConfig), PolicyName: name, }), ); let policy = null; try { const { Policy } = await createPolicy(); policy = Policy; } catch (caught) { if ( caught instanceof Error && caught.name === "EntityAlreadyExistsException" ) { const { Policies } = await iamClient.send(new ListPoliciesCommand({})); if (Policies) { policy = Policies.find((p) => p.PolicyName === name); } else { throw new Error("No policies found."); } } else { throw caught; } } return { arn: policy?.Arn, policyConfig, cleanUp: async () => { await iamClient.send(new DeletePolicyCommand({ PolicyArn: policy?.Arn })); }, }; } /** * Create the Amazon SageMaker pipeline using a JSON pipeline definition. The definition * can also be provided as an Amazon S3 object using PipelineDefinitionS3Location. * @param {{roleArn: string, name: string, sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient}} props */ export async function createSagemakerPipeline({ // Assumes an AWS IAM role has been created for this pipeline. roleArn, name, // Assumes an AWS Lambda function has been created for this pipeline. functionArn, sagemakerClient, }) { const pipelineDefinition = readFileSync( // dirnameFromMetaUrl is a local utility function. You can find its implementation // on GitHub. `${dirnameFromMetaUrl( import.meta.url, )}../../../../../workflows/sagemaker_pipelines/resources/GeoSpatialPipeline.json`, ) .toString() .replace(/\*FUNCTION_ARN\*/g, functionArn); let arn = null; const createPipeline = () => sagemakerClient.send( new CreatePipelineCommand({ PipelineName: name, PipelineDefinition: pipelineDefinition, RoleArn: roleArn, }), ); try { const { PipelineArn } = await createPipeline(); arn = PipelineArn; } catch (caught) { if ( caught instanceof Error && caught.name === "ValidationException" && caught.message.includes( "Pipeline names must be unique within an AWS account and region", ) ) { const { PipelineArn } = await sagemakerClient.send( new DescribePipelineCommand({ PipelineName: name }), ); arn = PipelineArn; } else { throw caught; } } return { arn, cleanUp: async () => { await sagemakerClient.send( new DeletePipelineCommand({ PipelineName: name }), ); }, }; } /** * Create an Amazon SQS queue. The Amazon SageMaker pipeline will send messages * to this queue that are then processed by the AWS Lambda function. * @param {{name: string, sqsClient: import('@aws-sdk/client-sqs').SQSClient}} props */ export async function createSQSQueue({ name, sqsClient }) { const createSqsQueue = () => sqsClient.send( new CreateQueueCommand({ QueueName: name, Attributes: { DelaySeconds: "5", ReceiveMessageWaitTimeSeconds: "5", VisibilityTimeout: "300", }, }), ); let queueUrl = null; try { const { QueueUrl } = await createSqsQueue(); queueUrl = QueueUrl; } catch (caught) { if (caught instanceof Error && caught.name === "QueueNameExists") { const { QueueUrl } = await sqsClient.send( new GetQueueUrlCommand({ QueueName: name }), ); queueUrl = QueueUrl; } else { throw caught; } } const { Attributes } = await retry( { intervalInMs: 1000, maxRetries: 60 }, () => sqsClient.send( new GetQueueAttributesCommand({ QueueUrl: queueUrl, AttributeNames: ["QueueArn"], }), ), ); return { queueUrl, queueArn: Attributes.QueueArn, cleanUp: async () => { await sqsClient.send(new DeleteQueueCommand({ QueueUrl: queueUrl })); }, }; } /** * Configure the AWS Lambda function to long poll for messages from the Amazon SQS * queue. * @param {{ * paginateListEventSourceMappings: () => Generator<import('@aws-sdk/client-lambda').ListEventSourceMappingsCommandOutput>, * lambdaName: string, * queueArn: string, * lambdaClient: import('@aws-sdk/client-lambda').LambdaClient}} props */ export async function configureLambdaSQSEventSource({ lambdaName, queueArn, lambdaClient, paginateListEventSourceMappings, }) { let uuid = null; const createEvenSourceMapping = () => lambdaClient.send( new CreateEventSourceMappingCommand({ EventSourceArn: queueArn, FunctionName: lambdaName, }), ); try { const { UUID } = await createEvenSourceMapping(); uuid = UUID; } catch (caught) { if ( caught instanceof Error && caught.name === "ResourceConflictException" ) { const paginator = paginateListEventSourceMappings( { client: lambdaClient }, {}, ); /** * @type {import('@aws-sdk/client-lambda').EventSourceMappingConfiguration[]} */ const eventSourceMappings = []; for await (const page of paginator) { eventSourceMappings.concat(page.EventSourceMappings || []); } const { Configuration } = await lambdaClient.send( new GetFunctionCommand({ FunctionName: lambdaName }), ); uuid = eventSourceMappings.find( (mapping) => mapping.EventSourceArn === queueArn && mapping.FunctionArn === Configuration.FunctionArn, ).UUID; } else { throw caught; } } return { cleanUp: async () => { await lambdaClient.send( new DeleteEventSourceMappingCommand({ UUID: uuid, }), ); }, }; } /** * Create an Amazon S3 bucket that will store the simple coordinate file as input * and the output of the Amazon SageMaker Geospatial vector enrichment job. * @param {{ * s3Client: import('@aws-sdk/client-s3').S3Client, * name: string, * paginateListObjectsV2: () => Generator<import('@aws-sdk/client-s3').ListObjectsCommandOutput> * }} props */ export async function createS3Bucket({ name, s3Client, paginateListObjectsV2, }) { await s3Client.send(new CreateBucketCommand({ Bucket: name })); return { cleanUp: async () => { const paginator = paginateListObjectsV2( { client: s3Client }, { Bucket: name }, ); for await (const page of paginator) { const objects = page.Contents; if (objects) { for (const object of objects) { await s3Client.send( new DeleteObjectCommand({ Bucket: name, Key: object.Key }), ); } } } await s3Client.send(new DeleteBucketCommand({ Bucket: name })); }, }; } /** * Start the execution of the Amazon SageMaker pipeline. Parameters that are * passed in are used in the AWS Lambda function. * @param {{ * name: string, * sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient, * roleArn: string, * queueUrl: string, * s3InputBucketName: string, * }} props */ export async function startPipelineExecution({ sagemakerClient, name, bucketName, roleArn, queueUrl, }) { /** * The Vector Enrichment Job requests CSV data. This configuration points to a CSV * file in an Amazon S3 bucket. * @type {import("@aws-sdk/client-sagemaker-geospatial").VectorEnrichmentJobInputConfig} */ const inputConfig = { DataSourceConfig: { S3Data: { S3Uri: `s3://${bucketName}/input/sample_data.csv`, }, }, DocumentType: VectorEnrichmentJobDocumentType.CSV, }; /** * The Vector Enrichment Job adds additional data to the source CSV. This configuration points * to an Amazon S3 prefix where the output will be stored. * @type {import("@aws-sdk/client-sagemaker-geospatial").ExportVectorEnrichmentJobOutputConfig} */ const outputConfig = { S3Data: { S3Uri: `s3://${bucketName}/output/`, }, }; /** * This job will be a Reverse Geocoding Vector Enrichment Job. Reverse Geocoding requires * latitude and longitude values. * @type {import("@aws-sdk/client-sagemaker-geospatial").VectorEnrichmentJobConfig} */ const jobConfig = { ReverseGeocodingConfig: { XAttributeName: "Longitude", YAttributeName: "Latitude", }, }; const { PipelineExecutionArn } = await sagemakerClient.send( new StartPipelineExecutionCommand({ PipelineName: name, PipelineExecutionDisplayName: `${name}-example-execution`, PipelineParameters: [ { Name: "parameter_execution_role", Value: roleArn }, { Name: "parameter_queue_url", Value: queueUrl }, { Name: "parameter_vej_input_config", Value: JSON.stringify(inputConfig), }, { Name: "parameter_vej_export_config", Value: JSON.stringify(outputConfig), }, { Name: "parameter_step_1_vej_config", Value: JSON.stringify(jobConfig), }, ], }), ); return { arn: PipelineExecutionArn, }; } /** * Poll the executing pipeline until the status is 'SUCCEEDED', 'STOPPED', or 'FAILED'. * @param {{ arn: string, sagemakerClient: import('@aws-sdk/client-sagemaker').SageMakerClient, wait: (ms: number) => Promise<void>}} props */ export async function waitForPipelineComplete({ arn, sagemakerClient, wait }) { const command = new DescribePipelineExecutionCommand({ PipelineExecutionArn: arn, }); let complete = false; const intervalInSeconds = 15; const COMPLETION_STATUSES = [ PipelineExecutionStatus.FAILED, PipelineExecutionStatus.STOPPED, PipelineExecutionStatus.SUCCEEDED, ]; do { const { PipelineExecutionStatus: status, FailureReason } = await sagemakerClient.send(command); complete = COMPLETION_STATUSES.includes(status); if (!complete) { console.log( `Pipeline is ${status}. Waiting ${intervalInSeconds} seconds before checking again.`, ); await wait(intervalInSeconds); } else if (status === PipelineExecutionStatus.FAILED) { throw new Error(`Pipeline failed because: ${FailureReason}`); } else if (status === PipelineExecutionStatus.STOPPED) { throw new Error("Pipeline was forcefully stopped."); } else { console.log(`Pipeline execution ${status}.`); } } while (!complete); } /** * Return the string value of an Amazon S3 object. * @param {{ bucket: string, key: string, s3Client: import('@aws-sdk/client-s3').S3Client}} param0 */ export async function getObject({ bucket, s3Client }) { const prefix = "output/"; const { Contents } = await s3Client.send( new ListObjectsV2Command({ MaxKeys: 1, Bucket: bucket, Prefix: prefix }), ); if (!Contents.length) { throw new Error("No objects found in bucket."); } // Find the CSV file. const outputObject = Contents.find((obj) => obj.Key.endsWith(".csv")); if (!outputObject) { throw new Error(`No CSV file found in bucket with the prefix "${prefix}".`); } const { Body } = await s3Client.send( new GetObjectCommand({ Bucket: bucket, Key: outputObject.Key, }), ); return Body.transformToString(); }

此函數是來自檔案的摘錄,該檔案使用上述程式庫函數來設定 SageMaker 管道、執行管道,以及刪除所有建立的資源。

import { retry, wait } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; import { attachPolicy, configureLambdaSQSEventSource, createLambdaExecutionPolicy, createLambdaExecutionRole, createLambdaFunction, createLambdaLayer, createS3Bucket, createSQSQueue, createSagemakerExecutionPolicy, createSagemakerPipeline, createSagemakerRole, getObject, startPipelineExecution, uploadCSVDataToS3, waitForPipelineComplete, } from "./lib.js"; import { MESSAGES } from "./messages.js"; export class SageMakerPipelinesWkflw { names = { LAMBDA_EXECUTION_ROLE: "sagemaker-wkflw-lambda-execution-role", LAMBDA_EXECUTION_ROLE_POLICY: "sagemaker-wkflw-lambda-execution-role-policy", LAMBDA_FUNCTION: "sagemaker-wkflw-lambda-function", LAMBDA_LAYER: "sagemaker-wkflw-lambda-layer", SAGE_MAKER_EXECUTION_ROLE: "sagemaker-wkflw-pipeline-execution-role", SAGE_MAKER_EXECUTION_ROLE_POLICY: "sagemaker-wkflw-pipeline-execution-role-policy", SAGE_MAKER_PIPELINE: "sagemaker-wkflw-pipeline", SQS_QUEUE: "sagemaker-wkflw-sqs-queue", S3_BUCKET: `sagemaker-wkflw-s3-bucket-${Date.now()}`, }; cleanUpFunctions = []; /** * @param {import("@aws-doc-sdk-examples/lib/prompter.js").Prompter} prompter * @param {import("@aws-doc-sdk-examples/lib/logger.js").Logger} logger * @param {{ IAM: import("@aws-sdk/client-iam").IAMClient, Lambda: import("@aws-sdk/client-lambda").LambdaClient, SageMaker: import("@aws-sdk/client-sagemaker").SageMakerClient, S3: import("@aws-sdk/client-s3").S3Client, SQS: import("@aws-sdk/client-sqs").SQSClient }} clients */ constructor(prompter, logger, clients) { this.prompter = prompter; this.logger = logger; this.clients = clients; } async run() { try { await this.startWorkflow(); } catch (err) { console.error(err); throw err; } finally { this.logger.logSeparator(); const doCleanUp = await this.prompter.confirm({ message: "Clean up resources?", }); if (doCleanUp) { await this.cleanUp(); } } } async cleanUp() { // Run all of the clean up functions. If any fail, we log the error and continue. // This ensures all clean up functions are run. for (let i = this.cleanUpFunctions.length - 1; i >= 0; i--) { await retry( { intervalInMs: 1000, maxRetries: 60, swallowError: true }, this.cleanUpFunctions[i], ); } } async startWorkflow() { this.logger.logSeparator(MESSAGES.greetingHeader); await this.logger.log(MESSAGES.greeting); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingRole.replace( "${ROLE_NAME}", this.names.LAMBDA_EXECUTION_ROLE, ), ); // Create an IAM role that will be assumed by the AWS Lambda function. This function // is triggered by Amazon SQS messages and calls SageMaker and SageMaker GeoSpatial actions. const { arn: lambdaExecutionRoleArn, cleanUp: lambdaExecutionRoleCleanUp } = await createLambdaExecutionRole({ name: this.names.LAMBDA_EXECUTION_ROLE, iamClient: this.clients.IAM, }); // Add a clean up step to a stack for every resource created. this.cleanUpFunctions.push(lambdaExecutionRoleCleanUp); await this.logger.log( MESSAGES.roleCreated.replace( "${ROLE_NAME}", this.names.LAMBDA_EXECUTION_ROLE, ), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingRole.replace( "${ROLE_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE, ), ); // Create an IAM role that will be assumed by the SageMaker pipeline. The pipeline // sends messages to an Amazon SQS queue and puts/retrieves Amazon S3 objects. const { arn: pipelineExecutionRoleArn, cleanUp: pipelineExecutionRoleCleanUp, } = await createSagemakerRole({ iamClient: this.clients.IAM, name: this.names.SAGE_MAKER_EXECUTION_ROLE, wait, }); this.cleanUpFunctions.push(pipelineExecutionRoleCleanUp); await this.logger.log( MESSAGES.roleCreated.replace( "${ROLE_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE, ), ); this.logger.logSeparator(); // Create an IAM policy that allows the AWS Lambda function to invoke SageMaker APIs. const { arn: lambdaExecutionPolicyArn, policy: lambdaPolicy, cleanUp: lambdaExecutionPolicyCleanUp, } = await createLambdaExecutionPolicy({ name: this.names.LAMBDA_EXECUTION_ROLE_POLICY, s3BucketName: this.names.S3_BUCKET, iamClient: this.clients.IAM, pipelineExecutionRoleArn, }); this.cleanUpFunctions.push(lambdaExecutionPolicyCleanUp); console.log(JSON.stringify(lambdaPolicy, null, 2), "\n"); await this.logger.log( MESSAGES.attachPolicy .replace("${POLICY_NAME}", this.names.LAMBDA_EXECUTION_ROLE_POLICY) .replace("${ROLE_NAME}", this.names.LAMBDA_EXECUTION_ROLE), ); await this.prompter.checkContinue(); // Attach the Lambda execution policy to the execution role. const { cleanUp: lambdaExecutionRolePolicyCleanUp } = await attachPolicy({ roleName: this.names.LAMBDA_EXECUTION_ROLE, policyArn: lambdaExecutionPolicyArn, iamClient: this.clients.IAM, }); this.cleanUpFunctions.push(lambdaExecutionRolePolicyCleanUp); await this.logger.log(MESSAGES.policyAttached); this.logger.logSeparator(); // Create Lambda layer for SageMaker packages. const { versionArn: layerVersionArn, cleanUp: lambdaLayerCleanUp } = await createLambdaLayer({ name: this.names.LAMBDA_LAYER, lambdaClient: this.clients.Lambda, }); this.cleanUpFunctions.push(lambdaLayerCleanUp); await this.logger.log( MESSAGES.creatingFunction.replace( "${FUNCTION_NAME}", this.names.LAMBDA_FUNCTION, ), ); // Create the Lambda function with the execution role. const { arn: lambdaArn, cleanUp: lambdaCleanUp } = await createLambdaFunction({ roleArn: lambdaExecutionRoleArn, lambdaClient: this.clients.Lambda, name: this.names.LAMBDA_FUNCTION, layerVersionArn, }); this.cleanUpFunctions.push(lambdaCleanUp); await this.logger.log( MESSAGES.functionCreated.replace( "${FUNCTION_NAME}", this.names.LAMBDA_FUNCTION, ), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingSQSQueue.replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); // Create an SQS queue for the SageMaker pipeline. const { queueUrl, queueArn, cleanUp: queueCleanUp, } = await createSQSQueue({ name: this.names.SQS_QUEUE, sqsClient: this.clients.SQS, }); this.cleanUpFunctions.push(queueCleanUp); await this.logger.log( MESSAGES.sqsQueueCreated.replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.configuringLambdaSQSEventSource .replace("${LAMBDA_NAME}", this.names.LAMBDA_FUNCTION) .replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); // Configure the SQS queue as an event source for the Lambda. const { cleanUp: lambdaSQSEventSourceCleanUp } = await configureLambdaSQSEventSource({ lambdaArn, lambdaName: this.names.LAMBDA_FUNCTION, queueArn, sqsClient: this.clients.SQS, lambdaClient: this.clients.Lambda, }); this.cleanUpFunctions.push(lambdaSQSEventSourceCleanUp); await this.logger.log( MESSAGES.lambdaSQSEventSourceConfigured .replace("${LAMBDA_NAME}", this.names.LAMBDA_FUNCTION) .replace("${QUEUE_NAME}", this.names.SQS_QUEUE), ); this.logger.logSeparator(); // Create an IAM policy that allows the SageMaker pipeline to invoke AWS Lambda // and send messages to the Amazon SQS queue. const { arn: pipelineExecutionPolicyArn, policy: sagemakerPolicy, cleanUp: pipelineExecutionPolicyCleanUp, } = await createSagemakerExecutionPolicy({ sqsQueueArn: queueArn, lambdaArn, iamClient: this.clients.IAM, name: this.names.SAGE_MAKER_EXECUTION_ROLE_POLICY, s3BucketName: this.names.S3_BUCKET, }); this.cleanUpFunctions.push(pipelineExecutionPolicyCleanUp); console.log(JSON.stringify(sagemakerPolicy, null, 2)); await this.logger.log( MESSAGES.attachPolicy .replace("${POLICY_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE_POLICY) .replace("${ROLE_NAME}", this.names.SAGE_MAKER_EXECUTION_ROLE), ); await this.prompter.checkContinue(); // Attach the SageMaker execution policy to the execution role. const { cleanUp: pipelineExecutionRolePolicyCleanUp } = await attachPolicy({ roleName: this.names.SAGE_MAKER_EXECUTION_ROLE, policyArn: pipelineExecutionPolicyArn, iamClient: this.clients.IAM, }); this.cleanUpFunctions.push(pipelineExecutionRolePolicyCleanUp); // Wait for the role to be ready. If the role is used immediately, // the pipeline will fail. await wait(5); await this.logger.log(MESSAGES.policyAttached); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingPipeline.replace( "${PIPELINE_NAME}", this.names.SAGE_MAKER_PIPELINE, ), ); // Create the SageMaker pipeline. const { cleanUp: pipelineCleanUp } = await createSagemakerPipeline({ roleArn: pipelineExecutionRoleArn, functionArn: lambdaArn, sagemakerClient: this.clients.SageMaker, name: this.names.SAGE_MAKER_PIPELINE, }); this.cleanUpFunctions.push(pipelineCleanUp); await this.logger.log( MESSAGES.pipelineCreated.replace( "${PIPELINE_NAME}", this.names.SAGE_MAKER_PIPELINE, ), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.creatingS3Bucket.replace("${BUCKET_NAME}", this.names.S3_BUCKET), ); // Create an S3 bucket for storing inputs and outputs. const { cleanUp: s3BucketCleanUp } = await createS3Bucket({ name: this.names.S3_BUCKET, s3Client: this.clients.S3, }); this.cleanUpFunctions.push(s3BucketCleanUp); await this.logger.log( MESSAGES.s3BucketCreated.replace("${BUCKET_NAME}", this.names.S3_BUCKET), ); this.logger.logSeparator(); await this.logger.log( MESSAGES.uploadingInputData.replace( "${BUCKET_NAME}", this.names.S3_BUCKET, ), ); // Upload CSV Lat/Long data to S3. await uploadCSVDataToS3({ bucketName: this.names.S3_BUCKET, s3Client: this.clients.S3, }); await this.logger.log(MESSAGES.inputDataUploaded); this.logger.logSeparator(); await this.prompter.checkContinue(MESSAGES.executePipeline); // Execute the SageMaker pipeline. const { arn: pipelineExecutionArn } = await startPipelineExecution({ name: this.names.SAGE_MAKER_PIPELINE, sagemakerClient: this.clients.SageMaker, roleArn: pipelineExecutionRoleArn, bucketName: this.names.S3_BUCKET, queueUrl, }); // Wait for the pipeline execution to finish. await waitForPipelineComplete({ arn: pipelineExecutionArn, sagemakerClient: this.clients.SageMaker, wait, }); this.logger.logSeparator(); await this.logger.log(MESSAGES.outputDelay); // The getOutput function will throw an error if the output is not // found. The retry function will retry a failed function call once // ever 10 seconds for 2 minutes. const output = await retry({ intervalInMs: 10000, maxRetries: 12 }, () => getObject({ bucket: this.names.S3_BUCKET, s3Client: this.clients.S3, }), ); this.logger.logSeparator(); await this.logger.log(MESSAGES.outputDataRetrieved); console.log(output.split("\n").slice(0, 6).join("\n")); } }
Kotlin
SDK 適用於 Kotlin
注意

還有更多 。 GitHub尋找完整範例,並了解如何在 AWS 程式碼範例儲存庫中設定和執行。

val DASHES = String(CharArray(80)).replace("\u0000", "-") private var eventSourceMapping = "" suspend fun main(args: Array<String>) { val usage = """ Usage: <sageMakerRoleName> <lambdaRoleName> <functionName> <functionKey> <queueName> <bucketName> <bucketFunction> <lnglatData> <spatialPipelinePath> <pipelineName> Where: sageMakerRoleName - The name of the Amazon SageMaker role. lambdaRoleName - The name of the AWS Lambda role. functionName - The name of the AWS Lambda function (for example,SageMakerExampleFunction). functionKey - The name of the Amazon S3 key name that represents the Lambda function (for example, SageMakerLambda.zip). queueName - The name of the Amazon Simple Queue Service (Amazon SQS) queue. bucketName - The name of the Amazon Simple Storage Service (Amazon S3) bucket. bucketFunction - The name of the Amazon S3 bucket that contains the Lambda ZIP file. lnglatData - The file location of the latlongtest.csv file required for this use case. spatialPipelinePath - The file location of the GeoSpatialPipeline.json file required for this use case. pipelineName - The name of the pipeline to create (for example, sagemaker-sdk-example-pipeline). """ if (args.size != 10) { println(usage) exitProcess(1) } val sageMakerRoleName = args[0] val lambdaRoleName = args[1] val functionKey = args[2] val functionName = args[3] val queueName = args[4] val bucketName = args[5] val bucketFunction = args[6] val lnglatData = args[7] val spatialPipelinePath = args[8] val pipelineName = args[9] val handlerName = "org.example.SageMakerLambdaFunction::handleRequest" println(DASHES) println("Welcome to the Amazon SageMaker pipeline example scenario.") println( """ This example workflow will guide you through setting up and running an Amazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an Amazon SQS Queue. It runs a vector enrichment reverse geocode job to reverse geocode addresses in an input file and store the results in an export file. """.trimIndent(), ) println(DASHES) println(DASHES) println("First, we will set up the roles, functions, and queue needed by the SageMaker pipeline.") val lambdaRoleArn: String = checkLambdaRole(lambdaRoleName) val sageMakerRoleArn: String = checkSageMakerRole(sageMakerRoleName) val functionArn = checkFunction(functionName, bucketFunction, functionKey, handlerName, lambdaRoleArn) val queueUrl = checkQueue(queueName, functionName) println(DASHES) println(DASHES) println("Setting up bucket $bucketName") if (!checkBucket(bucketName)) { setupBucket(bucketName) println("Put $lnglatData into $bucketName") val objectKey = "samplefiles/latlongtest.csv" putS3Object(bucketName, objectKey, lnglatData) } println(DASHES) println(DASHES) println("Now we can create and run our pipeline.") setupPipeline(spatialPipelinePath, sageMakerRoleArn, functionArn, pipelineName) val pipelineExecutionARN = executePipeline(bucketName, queueUrl, sageMakerRoleArn, pipelineName) println("The pipeline execution ARN value is $pipelineExecutionARN") waitForPipelineExecution(pipelineExecutionARN) println("Wait 30 secs to get output results $bucketName") TimeUnit.SECONDS.sleep(30) getOutputResults(bucketName) println(DASHES) println(DASHES) println( """ The pipeline has completed. To view the pipeline and runs in SageMaker Studio, follow these instructions: https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html """.trimIndent(), ) println(DASHES) println(DASHES) println("Do you want to delete the AWS resources used in this Workflow? (y/n)") val `in` = Scanner(System.`in`) val delResources = `in`.nextLine() if (delResources.compareTo("y") == 0) { println("Lets clean up the AWS resources. Wait 30 seconds") TimeUnit.SECONDS.sleep(30) deleteEventSourceMapping(functionName) deleteSQSQueue(queueName) listBucketObjects(bucketName) deleteBucket(bucketName) delLambdaFunction(functionName) deleteLambdaRole(lambdaRoleName) deleteSagemakerRole(sageMakerRoleName) deletePipeline(pipelineName) } else { println("The AWS Resources were not deleted!") } println(DASHES) println(DASHES) println("SageMaker pipeline scenario is complete.") println(DASHES) } // Delete a SageMaker pipeline by name. suspend fun deletePipeline(pipelineNameVal: String) { val pipelineRequest = DeletePipelineRequest { pipelineName = pipelineNameVal } SageMakerClient { region = "us-west-2" }.use { sageMakerClient -> sageMakerClient.deletePipeline(pipelineRequest) println("*** Successfully deleted $pipelineNameVal") } } suspend fun deleteSagemakerRole(roleNameVal: String) { val sageMakerRolePolicies = getSageMakerRolePolicies() IamClient { region = "us-west-2" }.use { iam -> for (policy in sageMakerRolePolicies) { // First the policy needs to be detached. val rolePolicyRequest = DetachRolePolicyRequest { policyArn = policy roleName = roleNameVal } iam.detachRolePolicy(rolePolicyRequest) } // Delete the role. val roleRequest = DeleteRoleRequest { roleName = roleNameVal } iam.deleteRole(roleRequest) println("*** Successfully deleted $roleNameVal") } } suspend fun deleteLambdaRole(roleNameVal: String) { val lambdaRolePolicies = getLambdaRolePolicies() IamClient { region = "us-west-2" }.use { iam -> for (policy in lambdaRolePolicies) { // First the policy needs to be detached. val rolePolicyRequest = DetachRolePolicyRequest { policyArn = policy roleName = roleNameVal } iam.detachRolePolicy(rolePolicyRequest) } // Delete the role. val roleRequest = DeleteRoleRequest { roleName = roleNameVal } iam.deleteRole(roleRequest) println("*** Successfully deleted $roleNameVal") } } suspend fun delLambdaFunction(myFunctionName: String) { val request = DeleteFunctionRequest { functionName = myFunctionName } LambdaClient { region = "us-west-2" }.use { awsLambda -> awsLambda.deleteFunction(request) println("$myFunctionName was deleted") } } suspend fun deleteBucket(bucketName: String?) { val request = DeleteBucketRequest { bucket = bucketName } S3Client { region = "us-east-1" }.use { s3 -> s3.deleteBucket(request) println("The $bucketName was successfully deleted!") } } suspend fun deleteBucketObjects(bucketName: String, objectName: String?) { val toDelete = ArrayList<ObjectIdentifier>() val obId = ObjectIdentifier { key = objectName } toDelete.add(obId) val delOb = Delete { objects = toDelete } val dor = DeleteObjectsRequest { bucket = bucketName delete = delOb } S3Client { region = "us-east-1" }.use { s3Client -> s3Client.deleteObjects(dor) println("*** $bucketName objects were deleted.") } } suspend fun listBucketObjects(bucketNameVal: String) { val listObjects = ListObjectsRequest { bucket = bucketNameVal } S3Client { region = "us-east-1" }.use { s3Client -> val res = s3Client.listObjects(listObjects) val objects = res.contents if (objects != null) { for (myValue in objects) { println("The name of the key is ${myValue.key}") deleteBucketObjects(bucketNameVal, myValue.key) } } } } // Delete the specific Amazon SQS queue. suspend fun deleteSQSQueue(queueNameVal: String?) { val getQueueRequest = GetQueueUrlRequest { queueName = queueNameVal } SqsClient { region = "us-west-2" }.use { sqsClient -> val urlVal = sqsClient.getQueueUrl(getQueueRequest).queueUrl val deleteQueueRequest = DeleteQueueRequest { queueUrl = urlVal } sqsClient.deleteQueue(deleteQueueRequest) } } // Delete the queue event mapping. suspend fun deleteEventSourceMapping(functionNameVal: String) { if (eventSourceMapping.compareTo("") == 0) { LambdaClient { region = "us-west-2" }.use { lambdaClient -> val request = ListEventSourceMappingsRequest { functionName = functionNameVal } val response = lambdaClient.listEventSourceMappings(request) val eventList = response.eventSourceMappings if (eventList != null) { for (event in eventList) { eventSourceMapping = event.uuid.toString() } } } } val eventSourceMappingRequest = DeleteEventSourceMappingRequest { uuid = eventSourceMapping } LambdaClient { region = "us-west-2" }.use { lambdaClient -> lambdaClient.deleteEventSourceMapping(eventSourceMappingRequest) println("The event mapping is deleted!") } } // Reads the objects in the S3 bucket and displays the values. private suspend fun readObject(bucketName: String, keyVal: String?) { println("Output file contents: \n") val objectRequest = GetObjectRequest { bucket = bucketName key = keyVal } S3Client { region = "us-east-1" }.use { s3Client -> s3Client.getObject(objectRequest) { resp -> val byteArray = resp.body?.toByteArray() val text = byteArray?.let { String(it, StandardCharsets.UTF_8) } println("Text output: $text") } } } // Display the results from the output directory. suspend fun getOutputResults(bucketName: String?) { println("Getting output results $bucketName.") val listObjectsRequest = ListObjectsRequest { bucket = bucketName prefix = "outputfiles/" } S3Client { region = "us-east-1" }.use { s3Client -> val response = s3Client.listObjects(listObjectsRequest) val s3Objects: List<Object>? = response.contents if (s3Objects != null) { for (`object` in s3Objects) { if (bucketName != null) { readObject(bucketName, (`object`.key)) } } } } } suspend fun waitForPipelineExecution(executionArn: String?) { var status: String var index = 0 do { val pipelineExecutionRequest = DescribePipelineExecutionRequest { pipelineExecutionArn = executionArn } SageMakerClient { region = "us-west-2" }.use { sageMakerClient -> val response = sageMakerClient.describePipelineExecution(pipelineExecutionRequest) status = response.pipelineExecutionStatus.toString() println("$index. The status of the pipeline is $status") TimeUnit.SECONDS.sleep(4) index++ } } while ("Executing" == status) println("Pipeline finished with status $status") } // Start a pipeline run with job configurations. suspend fun executePipeline(bucketName: String, queueUrl: String?, roleArn: String?, pipelineNameVal: String): String? { println("Starting pipeline execution.") val inputBucketLocation = "s3://$bucketName/samplefiles/latlongtest.csv" val output = "s3://$bucketName/outputfiles/" val gson = GsonBuilder() .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE) .setPrettyPrinting() .create() // Set up all parameters required to start the pipeline. val parameters: MutableList<Parameter> = java.util.ArrayList<Parameter>() val para1 = Parameter { name = "parameter_execution_role" value = roleArn } val para2 = Parameter { name = "parameter_queue_url" value = queueUrl } val inputJSON = """{ "DataSourceConfig": { "S3Data": { "S3Uri": "s3://$bucketName/samplefiles/latlongtest.csv" }, "Type": "S3_DATA" }, "DocumentType": "CSV" }""" println(inputJSON) val para3 = Parameter { name = "parameter_vej_input_config" value = inputJSON } // Create an ExportVectorEnrichmentJobOutputConfig object. val jobS3Data = VectorEnrichmentJobS3Data { s3Uri = output } val outputConfig = ExportVectorEnrichmentJobOutputConfig { s3Data = jobS3Data } val gson4: String = gson.toJson(outputConfig) val para4: Parameter = Parameter { name = "parameter_vej_export_config" value = gson4 } println("parameter_vej_export_config:" + gson.toJson(outputConfig)) val para5JSON = "{\"MapMatchingConfig\":null,\"ReverseGeocodingConfig\":{\"XAttributeName\":\"Longitude\",\"YAttributeName\":\"Latitude\"}}" val para5: Parameter = Parameter { name = "parameter_step_1_vej_config" value = para5JSON } parameters.add(para1) parameters.add(para2) parameters.add(para3) parameters.add(para4) parameters.add(para5) val pipelineExecutionRequest = StartPipelineExecutionRequest { pipelineExecutionDescription = "Created using Kotlin SDK" pipelineExecutionDisplayName = "$pipelineName-example-execution" pipelineParameters = parameters pipelineName = pipelineNameVal } SageMakerClient { region = "us-west-2" }.use { sageMakerClient -> val response = sageMakerClient.startPipelineExecution(pipelineExecutionRequest) return response.pipelineExecutionArn } } // Create a pipeline from the example pipeline JSON. suspend fun setupPipeline(filePath: String?, roleArnVal: String?, functionArnVal: String?, pipelineNameVal: String?) { println("Setting up the pipeline.") val parser = JSONParser() // Read JSON and get pipeline definition. FileReader(filePath).use { reader -> val obj: Any = parser.parse(reader) val jsonObject: JSONObject = obj as JSONObject val stepsArray: JSONArray = jsonObject.get("Steps") as JSONArray for (stepObj in stepsArray) { val step: JSONObject = stepObj as JSONObject if (step.containsKey("FunctionArn")) { step.put("FunctionArn", functionArnVal) } } println(jsonObject) // Create the pipeline. val pipelineRequest = CreatePipelineRequest { pipelineDescription = "Kotlin SDK example pipeline" roleArn = roleArnVal pipelineName = pipelineNameVal pipelineDefinition = jsonObject.toString() } SageMakerClient { region = "us-west-2" }.use { sageMakerClient -> sageMakerClient.createPipeline(pipelineRequest) } } } suspend fun putS3Object(bucketName: String, objectKey: String, objectPath: String) { val request = PutObjectRequest { bucket = bucketName key = objectKey body = File(objectPath).asByteStream() } S3Client { region = "us-east-1" }.use { s3 -> s3.putObject(request) println("Successfully placed $objectKey into bucket $bucketName") } } suspend fun setupBucket(bucketName: String) { val request = CreateBucketRequest { bucket = bucketName } S3Client { region = "us-east-1" }.use { s3 -> s3.createBucket(request) println("$bucketName is ready") } } suspend fun checkBucket(bucketName: String): Boolean { try { val headBucketRequest = HeadBucketRequest { bucket = bucketName } S3Client { region = "us-east-1" }.use { s3Client -> s3Client.headBucket(headBucketRequest) println("$bucketName exists") return true } } catch (e: S3Exception) { println("Bucket does not exist") } return false } // Connect the queue to the Lambda function as an event source. suspend fun connectLambda(queueUrlVal: String?, lambdaNameVal: String?) { println("Connecting the Lambda function and queue for the pipeline.") var queueArn = "" // Specify the attributes to retrieve. val atts: MutableList<QueueAttributeName> = ArrayList() atts.add(QueueAttributeName.QueueArn) val attributesRequest = GetQueueAttributesRequest { queueUrl = queueUrlVal attributeNames = atts } SqsClient { region = "us-west-2" }.use { sqsClient -> val response = sqsClient.getQueueAttributes(attributesRequest) val queueAtts = response.attributes if (queueAtts != null) { for ((key, value) in queueAtts) { println("Key = $key, Value = $value") queueArn = value } } } val eventSourceMappingRequest = CreateEventSourceMappingRequest { eventSourceArn = queueArn functionName = lambdaNameVal } LambdaClient { region = "us-west-2" }.use { lambdaClient -> val response1 = lambdaClient.createEventSourceMapping(eventSourceMappingRequest) eventSourceMapping = response1.uuid.toString() println("The mapping between the event source and Lambda function was successful") } } // Set up the SQS queue to use with the pipeline. suspend fun setupQueue(queueNameVal: String, lambdaNameVal: String): String { println("Setting up queue named $queueNameVal") val queueAtt: MutableMap<String, String> = HashMap() queueAtt.put("DelaySeconds", "5") queueAtt.put("ReceiveMessageWaitTimeSeconds", "5") queueAtt.put("VisibilityTimeout", "300") val createQueueRequest = CreateQueueRequest { queueName = queueNameVal attributes = queueAtt } SqsClient { region = "us-west-2" }.use { sqsClient -> sqsClient.createQueue(createQueueRequest) println("\nGet queue url") val getQueueUrlResponse = sqsClient.getQueueUrl(GetQueueUrlRequest { queueName = queueNameVal }) TimeUnit.SECONDS.sleep(15) connectLambda(getQueueUrlResponse.queueUrl, lambdaNameVal) println("Queue ready with Url " + getQueueUrlResponse.queueUrl) return getQueueUrlResponse.queueUrl.toString() } } // Checks to see if the Amazon SQS queue exists. If not, this method creates a new queue // and returns the ARN value. suspend fun checkQueue(queueNameVal: String, lambdaNameVal: String): String? { println("Checking to see if the queue exists. If not, a new queue will be created for use in this workflow.") var queueUrl: String try { val request = GetQueueUrlRequest { queueName = queueNameVal } SqsClient { region = "us-west-2" }.use { sqsClient -> val response = sqsClient.getQueueUrl(request) queueUrl = response.queueUrl.toString() println(queueUrl) } } catch (e: SqsException) { println(e.message + " A new queue will be created") queueUrl = setupQueue(queueNameVal, lambdaNameVal) } return queueUrl } suspend fun createNewFunction(myFunctionName: String, s3BucketName: String, myS3Key: String, myHandler: String, myRole: String): String { val functionCode = FunctionCode { s3Bucket = s3BucketName s3Key = myS3Key } val request = CreateFunctionRequest { functionName = myFunctionName code = functionCode description = "Created by the Lambda Kotlin API" handler = myHandler role = myRole runtime = Runtime.Java11 memorySize = 1024 timeout = 200 } LambdaClient { region = "us-west-2" }.use { awsLambda -> val functionResponse = awsLambda.createFunction(request) awsLambda.waitUntilFunctionActive { functionName = myFunctionName } println("${functionResponse.functionArn} was created") return functionResponse.functionArn.toString() } } suspend fun checkFunction(myFunctionName: String, s3BucketName: String, myS3Key: String, myHandler: String, myRole: String): String { println("Checking to see if the function exists. If not, a new AWS Lambda function will be created for use in this workflow.") var functionArn: String try { // Does this function already exist. val functionRequest = GetFunctionRequest { functionName = myFunctionName } LambdaClient { region = "us-west-2" }.use { lambdaClient -> val response = lambdaClient.getFunction(functionRequest) functionArn = response.configuration?.functionArn.toString() println("$functionArn exists") } } catch (e: LambdaException) { println(e.message + " A new function will be created") functionArn = createNewFunction(myFunctionName, s3BucketName, myS3Key, myHandler, myRole) } return functionArn } // Checks to see if the SageMaker role exists. If not, this method creates it. suspend fun checkSageMakerRole(roleNameVal: String): String { println("Checking to see if the role exists. If not, a new role will be created for AWS SageMaker to use.") var roleArn: String try { val roleRequest = GetRoleRequest { roleName = roleNameVal } IamClient { region = "AWS_GLOBAL" }.use { iamClient -> val response = iamClient.getRole(roleRequest) roleArn = response.role?.arn.toString() println(roleArn) } } catch (e: IamException) { println(e.message + " A new role will be created") roleArn = createSageMakerRole(roleNameVal) } return roleArn } suspend fun createSageMakerRole(roleNameVal: String): String { val sageMakerRolePolicies = getSageMakerRolePolicies() println("Creating a role to use with SageMaker.") val assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + "\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}" val request = CreateRoleRequest { roleName = roleNameVal assumeRolePolicyDocument = assumeRolePolicy description = "Created using the AWS SDK for Kotlin" } IamClient { region = "AWS_GLOBAL" }.use { iamClient -> val roleResult = iamClient.createRole(request) // Attach the policies to the role. for (policy in sageMakerRolePolicies) { val attachRequest = AttachRolePolicyRequest { roleName = roleNameVal policyArn = policy } iamClient.attachRolePolicy(attachRequest) } // Allow time for the role to be ready. TimeUnit.SECONDS.sleep(15) System.out.println("Role ready with ARN ${roleResult.role?.arn}") return roleResult.role?.arn.toString() } } // Checks to see if the Lambda role exists. If not, this method creates it. suspend fun checkLambdaRole(roleNameVal: String): String { println("Checking to see if the role exists. If not, a new role will be created for AWS Lambda to use.") var roleArn: String val roleRequest = GetRoleRequest { roleName = roleNameVal } try { IamClient { region = "AWS_GLOBAL" }.use { iamClient -> val response = iamClient.getRole(roleRequest) roleArn = response.role?.arn.toString() println(roleArn) } } catch (e: IamException) { println(e.message + " A new role will be created") roleArn = createLambdaRole(roleNameVal) } return roleArn } private suspend fun createLambdaRole(roleNameVal: String): String { val lambdaRolePolicies = getLambdaRolePolicies() val assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + "\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}" val request = CreateRoleRequest { roleName = roleNameVal assumeRolePolicyDocument = assumeRolePolicy description = "Created using the AWS SDK for Kotlin" } IamClient { region = "AWS_GLOBAL" }.use { iamClient -> val roleResult = iamClient.createRole(request) // Attach the policies to the role. for (policy in lambdaRolePolicies) { val attachRequest = AttachRolePolicyRequest { roleName = roleNameVal policyArn = policy } iamClient.attachRolePolicy(attachRequest) } // Allow time for the role to be ready. TimeUnit.SECONDS.sleep(15) println("Role ready with ARN " + roleResult.role?.arn) return roleResult.role?.arn.toString() } } fun getLambdaRolePolicies(): Array<String?> { val lambdaRolePolicies = arrayOfNulls<String>(5) lambdaRolePolicies[0] = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess" lambdaRolePolicies[1] = "arn:aws:iam::aws:policy/AmazonSQSFullAccess" lambdaRolePolicies[2] = "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess" lambdaRolePolicies[3] = "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy" lambdaRolePolicies[4] = "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole" return lambdaRolePolicies } fun getSageMakerRolePolicies(): Array<String?> { val sageMakerRolePolicies = arrayOfNulls<String>(3) sageMakerRolePolicies[0] = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess" sageMakerRolePolicies[1] = "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess" sageMakerRolePolicies[2] = "arn:aws:iam::aws:policy/AmazonSQSFullAccess" return sageMakerRolePolicies }