There are more AWS SDK examples available in the AWS Doc SDK Examples
Get started with HealthImaging image sets and image frames using an AWS SDK
The following code examples show how to import DICOM files and download image frames in HealthImaging.
The implementation is structured as a workflow command-line application.
Set up resources for a DICOM import.
Import DICOM files into a data store.
Retrieve the image set IDs for the import job.
Retrieve the image frame IDs for the image sets.
Download, decode and verify the image frames.
Clean up resources.
- C++
-
- SDK for C++
-
Create an AWS CloudFormation stack with the necessary resources.
Aws::String inputBucketName; Aws::String outputBucketName; Aws::String dataStoreId; Aws::String roleArn; Aws::String stackName; if (askYesNoQuestion( "Would you like to let this workflow create the resources for you? (y/n) ")) { stackName = askQuestion( "Enter a name for the AWS CloudFormation stack to create. "); Aws::String dataStoreName = askQuestion( "Enter a name for the HealthImaging datastore to create. "); Aws::Map<Aws::String, Aws::String> outputs = createCloudFormationStack( stackName, dataStoreName, clientConfiguration); if (!retrieveOutputs(outputs, dataStoreId, inputBucketName, outputBucketName, roleArn)) { return false; } std::cout << "The following resources have been created." << std::endl; std::cout << "A HealthImaging datastore with ID: " << dataStoreId << "." << std::endl; std::cout << "An Amazon S3 input bucket named: " << inputBucketName << "." << std::endl; std::cout << "An Amazon S3 output bucket named: " << outputBucketName << "." << std::endl; std::cout << "An IAM role with the ARN: " << roleArn << "." << std::endl; askQuestion("Enter return to continue.", alwaysTrueTest); } else { std::cout << "You have chosen to use preexisting resources:" << std::endl; dataStoreId = askQuestion( "Enter the data store ID of the HealthImaging datastore you wish to use: "); inputBucketName = askQuestion( "Enter the name of the S3 input bucket you wish to use: "); outputBucketName = askQuestion( "Enter the name of the S3 output bucket you wish to use: "); roleArn = askQuestion( "Enter the ARN for the IAM role with the proper permissions to import a DICOM series: "); }
Copy DICOM files to the Amazon S3 import bucket.
std::cout << "This workflow uses DICOM files from the National Cancer Institute Imaging Data\n" << "Commons (IDC) Collections." << std::endl; std::cout << "Here is the link to their website." << std::endl; std::cout << "https://registry.opendata.aws/nci-imaging-data-commons/" << std::endl; std::cout << "We will use DICOM files stored in an S3 bucket managed by the IDC." << std::endl; std::cout << "First one of the DICOM folders in the IDC collection must be copied to your\n" "input S3 bucket." << std::endl; std::cout << "You have the choice of one of the following " << IDC_ImageChoices.size() << " folders to copy." << std::endl; int index = 1; for (auto &idcChoice: IDC_ImageChoices) { std::cout << index << " - " << idcChoice.mDescription << std::endl; index++; } int choice = askQuestionForIntRange("Choose DICOM files to import: ", 1, 4); Aws::String fromDirectory = IDC_ImageChoices[choice - 1].mDirectory; Aws::String inputDirectory = "input"; std::cout << "The files in the directory '" << fromDirectory << "' in the bucket '" << IDC_S3_BucketName << "' will be copied " << std::endl; std::cout << "to the folder '" << inputDirectory << "/" << fromDirectory << "' in the bucket '" << inputBucketName << "'." << std::endl; askQuestion("Enter return to start the copy.", alwaysTrueTest); if (!AwsDoc::Medical_Imaging::copySeriesBetweenBuckets( IDC_S3_BucketName, fromDirectory, inputBucketName, inputDirectory, clientConfiguration)) { std::cerr << "This workflow will exit because of an error." << std::endl; cleanup(stackName, dataStoreId, clientConfiguration); return false; }
Import the DICOM files to the Amazon S3 data store.
bool AwsDoc::Medical_Imaging::startDicomImport(const Aws::String &dataStoreID, const Aws::String &inputBucketName, const Aws::String &inputDirectory, const Aws::String &outputBucketName, const Aws::String &outputDirectory, const Aws::String &roleArn, Aws::String &importJobId, const Aws::Client::ClientConfiguration &clientConfiguration) { bool result = false; if (startDICOMImportJob(dataStoreID, inputBucketName, inputDirectory, outputBucketName, outputDirectory, roleArn, importJobId, clientConfiguration)) { std::cout << "DICOM import job started with job ID " << importJobId << "." << std::endl; result = waitImportJobCompleted(dataStoreID, importJobId, clientConfiguration); if (result) { std::cout << "DICOM import job completed." << std::endl; } } return result; } //! Routine which starts a HealthImaging import job. /*! \param dataStoreID: The HealthImaging data store ID. \param inputBucketName: The name of the Amazon S3 bucket containing the DICOM files. \param inputDirectory: The directory in the S3 bucket containing the DICOM files. \param outputBucketName: The name of the S3 bucket for the output. \param outputDirectory: The directory in the S3 bucket to store the output. \param roleArn: The ARN of the IAM role with permissions for the import. \param importJobId: A string to receive the import job ID. \param clientConfig: Aws client configuration. \return bool: Function succeeded. */ bool AwsDoc::Medical_Imaging::startDICOMImportJob( const Aws::String &dataStoreID, const Aws::String &inputBucketName, const Aws::String &inputDirectory, const Aws::String &outputBucketName, const Aws::String &outputDirectory, const Aws::String &roleArn, Aws::String &importJobId, const Aws::Client::ClientConfiguration &clientConfig) { Aws::MedicalImaging::MedicalImagingClient medicalImagingClient(clientConfig); Aws::String inputURI = "s3://" + inputBucketName + "/" + inputDirectory + "/"; Aws::String outputURI = "s3://" + outputBucketName + "/" + outputDirectory + "/"; Aws::MedicalImaging::Model::StartDICOMImportJobRequest startDICOMImportJobRequest; startDICOMImportJobRequest.SetDatastoreId(dataStoreID); startDICOMImportJobRequest.SetDataAccessRoleArn(roleArn); startDICOMImportJobRequest.SetInputS3Uri(inputURI); startDICOMImportJobRequest.SetOutputS3Uri(outputURI); Aws::MedicalImaging::Model::StartDICOMImportJobOutcome startDICOMImportJobOutcome = medicalImagingClient.StartDICOMImportJob( startDICOMImportJobRequest); if (startDICOMImportJobOutcome.IsSuccess()) { importJobId = startDICOMImportJobOutcome.GetResult().GetJobId(); } else { std::cerr << "Failed to start DICOM import job because " << startDICOMImportJobOutcome.GetError().GetMessage() << std::endl; } return startDICOMImportJobOutcome.IsSuccess(); } //! Routine which waits for a DICOM import job to complete. /*! * @param dataStoreID: The HealthImaging data store ID. * @param importJobId: The import job ID. * @param clientConfiguration : Aws client configuration. * @return bool: Function succeeded. */ bool AwsDoc::Medical_Imaging::waitImportJobCompleted(const Aws::String &datastoreID, const Aws::String &importJobId, const Aws::Client::ClientConfiguration &clientConfiguration) { Aws::MedicalImaging::Model::JobStatus jobStatus = Aws::MedicalImaging::Model::JobStatus::IN_PROGRESS; while (jobStatus == Aws::MedicalImaging::Model::JobStatus::IN_PROGRESS) { std::this_thread::sleep_for(std::chrono::seconds(1)); Aws::MedicalImaging::Model::GetDICOMImportJobOutcome getDicomImportJobOutcome = getDICOMImportJob( datastoreID, importJobId, clientConfiguration); if (getDicomImportJobOutcome.IsSuccess()) { jobStatus = getDicomImportJobOutcome.GetResult().GetJobProperties().GetJobStatus(); std::cout << "DICOM import job status: " << Aws::MedicalImaging::Model::JobStatusMapper::GetNameForJobStatus( jobStatus) << std::endl; } else { std::cerr << "Failed to get import job status because " << getDicomImportJobOutcome.GetError().GetMessage() << std::endl; return false; } } return jobStatus == Aws::MedicalImaging::Model::JobStatus::COMPLETED; } //! Routine which gets a HealthImaging DICOM import job's properties. /*! \param dataStoreID: The HealthImaging data store ID. \param importJobID: The DICOM import job ID \param clientConfig: Aws client configuration. \return GetDICOMImportJobOutcome: The import job outcome. */ Aws::MedicalImaging::Model::GetDICOMImportJobOutcome AwsDoc::Medical_Imaging::getDICOMImportJob(const Aws::String &dataStoreID, const Aws::String &importJobID, const Aws::Client::ClientConfiguration &clientConfig) { Aws::MedicalImaging::MedicalImagingClient client(clientConfig); Aws::MedicalImaging::Model::GetDICOMImportJobRequest request; request.SetDatastoreId(dataStoreID); request.SetJobId(importJobID); Aws::MedicalImaging::Model::GetDICOMImportJobOutcome outcome = client.GetDICOMImportJob( request); if (!outcome.IsSuccess()) { std::cerr << "GetDICOMImportJob error: " << outcome.GetError().GetMessage() << std::endl; } return outcome; }
Get image sets created by the DICOM import job.
bool AwsDoc::Medical_Imaging::getImageSetsForDicomImportJob(const Aws::String &datastoreID, const Aws::String &importJobId, Aws::Vector<Aws::String> &imageSets, const Aws::Client::ClientConfiguration &clientConfiguration) { Aws::MedicalImaging::Model::GetDICOMImportJobOutcome getDicomImportJobOutcome = getDICOMImportJob( datastoreID, importJobId, clientConfiguration); bool result = false; if (getDicomImportJobOutcome.IsSuccess()) { auto outputURI = getDicomImportJobOutcome.GetResult().GetJobProperties().GetOutputS3Uri(); Aws::Http::URI uri(outputURI); const Aws::String &bucket = uri.GetAuthority(); Aws::String key = uri.GetPath(); Aws::S3::S3Client s3Client(clientConfiguration); Aws::S3::Model::GetObjectRequest objectRequest; objectRequest.SetBucket(bucket); objectRequest.SetKey(key + "/" + IMPORT_JOB_MANIFEST_FILE_NAME); auto getObjectOutcome = s3Client.GetObject(objectRequest); if (getObjectOutcome.IsSuccess()) { auto &data = getObjectOutcome.GetResult().GetBody(); std::stringstream stringStream; stringStream << data.rdbuf(); try { // Use JMESPath to extract the image set IDs. // https://jmespath.org/specification.html std::string jmesPathExpression = "jobSummary.imageSetsSummary[].imageSetId"; jsoncons::json doc = jsoncons::json::parse(stringStream.str()); jsoncons::json imageSetsJson = jsoncons::jmespath::search(doc, jmesPathExpression);\ for (auto &imageSet: imageSetsJson.array_range()) { imageSets.push_back(imageSet.as_string()); } result = true; } catch (const std::exception &e) { std::cerr << e.what() << '\n'; } } else { std::cerr << "Failed to get object because " << getObjectOutcome.GetError().GetMessage() << std::endl; } } else { std::cerr << "Failed to get import job status because " << getDicomImportJobOutcome.GetError().GetMessage() << std::endl; } return result; }
Get image frame information for image sets.
bool AwsDoc::Medical_Imaging::getImageFramesForImageSet(const Aws::String &dataStoreID, const Aws::String &imageSetID, const Aws::String &outDirectory, Aws::Vector<ImageFrameInfo> &imageFrames, const Aws::Client::ClientConfiguration &clientConfiguration) { Aws::String fileName = outDirectory + "/" + imageSetID + "_metadata.json.gzip"; bool result = false; if (getImageSetMetadata(dataStoreID, imageSetID, "", // Empty string for version ID. fileName, clientConfiguration)) { try { std::string metadataGZip; { std::ifstream inFileStream(fileName.c_str(), std::ios::binary); if (!inFileStream) { throw std::runtime_error("Failed to open file " + fileName); } std::stringstream stringStream; stringStream << inFileStream.rdbuf(); metadataGZip = stringStream.str(); } std::string metadataJson = gzip::decompress(metadataGZip.data(), metadataGZip.size()); // Use JMESPath to extract the image set IDs. // https://jmespath.org/specification.html jsoncons::json doc = jsoncons::json::parse(metadataJson); std::string jmesPathExpression = "Study.Series.*.Instances[].*[]"; jsoncons::json instances = jsoncons::jmespath::search(doc, jmesPathExpression); for (auto &instance: instances.array_range()) { jmesPathExpression = "DICOM.RescaleSlope"; std::string rescaleSlope = jsoncons::jmespath::search(instance, jmesPathExpression).to_string(); jmesPathExpression = "DICOM.RescaleIntercept"; std::string rescaleIntercept = jsoncons::jmespath::search(instance, jmesPathExpression).to_string(); jmesPathExpression = "ImageFrames[][]"; jsoncons::json imageFramesJson = jsoncons::jmespath::search(instance, jmesPathExpression); for (auto &imageFrame: imageFramesJson.array_range()) { ImageFrameInfo imageFrameIDs; imageFrameIDs.mImageSetId = imageSetID; imageFrameIDs.mImageFrameId = imageFrame.find( "ID")->value().as_string(); imageFrameIDs.mRescaleIntercept = rescaleIntercept; imageFrameIDs.mRescaleSlope = rescaleSlope; imageFrameIDs.MinPixelValue = imageFrame.find( "MinPixelValue")->value().as_string(); imageFrameIDs.MaxPixelValue = imageFrame.find( "MaxPixelValue")->value().as_string(); jmesPathExpression = "max_by(PixelDataChecksumFromBaseToFullResolution, &Width).Checksum"; jsoncons::json checksumJson = jsoncons::jmespath::search(imageFrame, jmesPathExpression); imageFrameIDs.mFullResolutionChecksum = checksumJson.as_integer<uint32_t>(); imageFrames.emplace_back(imageFrameIDs); } } result = true; } catch (const std::exception &e) { std::cerr << "getImageFramesForImageSet failed because " << e.what() << std::endl; } } return result; } //! Routine which gets a HealthImaging image set's metadata. /*! \param dataStoreID: The HealthImaging data store ID. \param imageSetID: The HealthImaging image set ID. \param versionID: The HealthImaging image set version ID, ignored if empty. \param outputFilePath: The path where the metadata will be stored as gzipped json. \param clientConfig: Aws client configuration. \\return bool: Function succeeded. */ bool AwsDoc::Medical_Imaging::getImageSetMetadata(const Aws::String &dataStoreID, const Aws::String &imageSetID, const Aws::String &versionID, const Aws::String &outputFilePath, const Aws::Client::ClientConfiguration &clientConfig) { Aws::MedicalImaging::Model::GetImageSetMetadataRequest request; request.SetDatastoreId(dataStoreID); request.SetImageSetId(imageSetID); if (!versionID.empty()) { request.SetVersionId(versionID); } Aws::MedicalImaging::MedicalImagingClient client(clientConfig); Aws::MedicalImaging::Model::GetImageSetMetadataOutcome outcome = client.GetImageSetMetadata( request); if (outcome.IsSuccess()) { std::ofstream file(outputFilePath, std::ios::binary); auto &metadata = outcome.GetResult().GetImageSetMetadataBlob(); file << metadata.rdbuf(); } else { std::cerr << "Failed to get image set metadata: " << outcome.GetError().GetMessage() << std::endl; } return outcome.IsSuccess(); }
Download, decode and verify image frames.
bool AwsDoc::Medical_Imaging::downloadDecodeAndCheckImageFrames( const Aws::String &dataStoreID, const Aws::Vector<ImageFrameInfo> &imageFrames, const Aws::String &outDirectory, const Aws::Client::ClientConfiguration &clientConfiguration) { Aws::Client::ClientConfiguration clientConfiguration1(clientConfiguration); clientConfiguration1.executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>( "executor", 25); Aws::MedicalImaging::MedicalImagingClient medicalImagingClient( clientConfiguration1); Aws::Utils::Threading::Semaphore semaphore(0, 1); std::atomic<size_t> count(imageFrames.size()); bool result = true; for (auto &imageFrame: imageFrames) { Aws::MedicalImaging::Model::GetImageFrameRequest getImageFrameRequest; getImageFrameRequest.SetDatastoreId(dataStoreID); getImageFrameRequest.SetImageSetId(imageFrame.mImageSetId); Aws::MedicalImaging::Model::ImageFrameInformation imageFrameInformation; imageFrameInformation.SetImageFrameId(imageFrame.mImageFrameId); getImageFrameRequest.SetImageFrameInformation(imageFrameInformation); auto getImageFrameAsyncLambda = [&semaphore, &result, &count, imageFrame, outDirectory]( const Aws::MedicalImaging::MedicalImagingClient *client, const Aws::MedicalImaging::Model::GetImageFrameRequest &request, Aws::MedicalImaging::Model::GetImageFrameOutcome outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &context) { if (!handleGetImageFrameResult(outcome, outDirectory, imageFrame)) { std::cerr << "Failed to download and convert image frame: " << imageFrame.mImageFrameId << " from image set: " << imageFrame.mImageSetId << std::endl; result = false; } count--; if (count <= 0) { semaphore.ReleaseAll(); } }; // End of 'getImageFrameAsyncLambda' lambda. medicalImagingClient.GetImageFrameAsync(getImageFrameRequest, getImageFrameAsyncLambda); } if (count > 0) { semaphore.WaitOne(); } if (result) { std::cout << imageFrames.size() << " image files were downloaded." << std::endl; } return result; } bool AwsDoc::Medical_Imaging::decodeJPHFileAndValidateWithChecksum( const Aws::String &jphFile, uint32_t crc32Checksum) { opj_image_t *outputImage = jphImageToOpjBitmap(jphFile); if (!outputImage) { return false; } bool result = true; if (!verifyChecksumForImage(outputImage, crc32Checksum)) { std::cerr << "The checksum for the image does not match the expected value." << std::endl; std::cerr << "File :" << jphFile << std::endl; result = false; } opj_image_destroy(outputImage); return result; } opj_image * AwsDoc::Medical_Imaging::jphImageToOpjBitmap(const Aws::String &jphFile) { opj_stream_t *inFileStream = nullptr; opj_codec_t *decompressorCodec = nullptr; opj_image_t *outputImage = nullptr; try { std::shared_ptr<opj_dparameters> decodeParameters = std::make_shared<opj_dparameters>(); memset(decodeParameters.get(), 0, sizeof(opj_dparameters)); opj_set_default_decoder_parameters(decodeParameters.get()); decodeParameters->decod_format = 1; // JP2 image format. decodeParameters->cod_format = 2; // BMP image format. std::strncpy(decodeParameters->infile, jphFile.c_str(), OPJ_PATH_LEN); inFileStream = opj_stream_create_default_file_stream( decodeParameters->infile, true); if (!inFileStream) { throw std::runtime_error( "Unable to create input file stream for file '" + jphFile + "'."); } decompressorCodec = opj_create_decompress(OPJ_CODEC_JP2); if (!decompressorCodec) { throw std::runtime_error("Failed to create decompression codec."); } int decodeMessageLevel = 1; if (!setupCodecLogging(decompressorCodec, &decodeMessageLevel)) { std::cerr << "Failed to setup codec logging." << std::endl; } if (!opj_setup_decoder(decompressorCodec, decodeParameters.get())) { throw std::runtime_error("Failed to setup decompression codec."); } if (!opj_codec_set_threads(decompressorCodec, 4)) { throw std::runtime_error("Failed to set decompression codec threads."); } if (!opj_read_header(inFileStream, decompressorCodec, &outputImage)) { throw std::runtime_error("Failed to read header."); } if (!opj_decode(decompressorCodec, inFileStream, outputImage)) { throw std::runtime_error("Failed to decode."); } if (DEBUGGING) { std::cout << "image width : " << outputImage->x1 - outputImage->x0 << std::endl; std::cout << "image height : " << outputImage->y1 - outputImage->y0 << std::endl; std::cout << "number of channels: " << outputImage->numcomps << std::endl; std::cout << "colorspace : " << outputImage->color_space << std::endl; } } catch (const std::exception &e) { std::cerr << e.what() << std::endl; if (outputImage) { opj_image_destroy(outputImage); outputImage = nullptr; } } if (inFileStream) { opj_stream_destroy(inFileStream); } if (decompressorCodec) { opj_destroy_codec(decompressorCodec); } return outputImage; } //! Template function which converts a planar image bitmap to an interleaved image bitmap and //! then verifies the checksum of the bitmap. /*! * @param image: The OpenJPEG image struct. * @param crc32Checksum: The CRC32 checksum. * @return bool: Function succeeded. */ template<class myType> bool verifyChecksumForImageForType(opj_image_t *image, uint32_t crc32Checksum) { uint32_t width = image->x1 - image->x0; uint32_t height = image->y1 - image->y0; uint32_t numOfChannels = image->numcomps; // Buffer for interleaved bitmap. std::vector<myType> buffer(width * height * numOfChannels); // Convert planar bitmap to interleaved bitmap. for (uint32_t channel = 0; channel < numOfChannels; channel++) { for (uint32_t row = 0; row < height; row++) { uint32_t fromRowStart = row / image->comps[channel].dy * width / image->comps[channel].dx; uint32_t toIndex = (row * width) * numOfChannels + channel; for (uint32_t col = 0; col < width; col++) { uint32_t fromIndex = fromRowStart + col / image->comps[channel].dx; buffer[toIndex] = static_cast<myType>(image->comps[channel].data[fromIndex]); toIndex += numOfChannels; } } } // Verify checksum. boost::crc_32_type crc32; crc32.process_bytes(reinterpret_cast<char *>(buffer.data()), buffer.size() * sizeof(myType)); bool result = crc32.checksum() == crc32Checksum; if (!result) { std::cerr << "verifyChecksumForImage, checksum mismatch, expected - " << crc32Checksum << ", actual - " << crc32.checksum() << std::endl; } return result; } //! Routine which verifies the checksum of an OpenJPEG image struct. /*! * @param image: The OpenJPEG image struct. * @param crc32Checksum: The CRC32 checksum. * @return bool: Function succeeded. */ bool AwsDoc::Medical_Imaging::verifyChecksumForImage(opj_image_t *image, uint32_t crc32Checksum) { uint32_t channels = image->numcomps; bool result = false; if (0 < channels) { // Assume the precision is the same for all channels. uint32_t precision = image->comps[0].prec; bool signedData = image->comps[0].sgnd; uint32_t bytes = (precision + 7) / 8; if (signedData) { switch (bytes) { case 1 : result = verifyChecksumForImageForType<int8_t>(image, crc32Checksum); break; case 2 : result = verifyChecksumForImageForType<int16_t>(image, crc32Checksum); break; case 4 : result = verifyChecksumForImageForType<int32_t>(image, crc32Checksum); break; default: std::cerr << "verifyChecksumForImage, unsupported data type, signed bytes - " << bytes << std::endl; break; } } else { switch (bytes) { case 1 : result = verifyChecksumForImageForType<uint8_t>(image, crc32Checksum); break; case 2 : result = verifyChecksumForImageForType<uint16_t>(image, crc32Checksum); break; case 4 : result = verifyChecksumForImageForType<uint32_t>(image, crc32Checksum); break; default: std::cerr << "verifyChecksumForImage, unsupported data type, unsigned bytes - " << bytes << std::endl; break; } } if (!result) { std::cerr << "verifyChecksumForImage, error bytes " << bytes << " signed " << signedData << std::endl; } } else { std::cerr << "'verifyChecksumForImage', no channels in the image." << std::endl; } return result; }
Clean up resources.
bool AwsDoc::Medical_Imaging::cleanup(const Aws::String &stackName, const Aws::String &dataStoreId, const Aws::Client::ClientConfiguration &clientConfiguration) { bool result = true; if (!stackName.empty() && askYesNoQuestion( "Would you like to delete the stack " + stackName + "? (y/n)")) { std::cout << "Deleting the image sets in the stack." << std::endl; result &= emptyDatastore(dataStoreId, clientConfiguration); printAsterisksLine(); std::cout << "Deleting the stack." << std::endl; result &= deleteStack(stackName, clientConfiguration); } return result; } bool AwsDoc::Medical_Imaging::emptyDatastore(const Aws::String &datastoreID, const Aws::Client::ClientConfiguration &clientConfiguration) { Aws::MedicalImaging::Model::SearchCriteria emptyCriteria; Aws::Vector<Aws::String> imageSetIDs; bool result = false; if (searchImageSets(datastoreID, emptyCriteria, imageSetIDs, clientConfiguration)) { result = true; for (auto &imageSetID: imageSetIDs) { result &= deleteImageSet(datastoreID, imageSetID, clientConfiguration); } } return result; }
-
For API details, see the following topics in AWS SDK for C++ API Reference.
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. -
- JavaScript
-
- SDK for JavaScript (v3)
-
Orchestrate steps (index.js).
import { parseScenarioArgs, Scenario, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; import { saveState, loadState, } from "@aws-doc-sdk-examples/lib/scenario/steps-common.js"; import { createStack, deployStack, getAccountId, getDatastoreName, getStackName, outputState, waitForStackCreation, } from "./deploy-steps.js"; import { doCopy, selectDataset, copyDataset, outputCopiedObjects, } from "./dataset-steps.js"; import { doImport, outputImportJobStatus, startDICOMImport, waitForImportJobCompletion, } from "./import-steps.js"; import { getManifestFile, outputImageSetIds, parseManifestFile, } from "./image-set-steps.js"; import { getImageSetMetadata, outputImageFrameIds, } from "./image-frame-steps.js"; import { decodeAndVerifyImages, doVerify } from "./verify-steps.js"; import { confirmCleanup, deleteImageSets, deleteStack, } from "./clean-up-steps.js"; const context = {}; const scenarios = { deploy: new Scenario( "Deploy Resources", [ deployStack, getStackName, getDatastoreName, getAccountId, createStack, waitForStackCreation, outputState, saveState, ], context, ), demo: new Scenario( "Run Demo", [ loadState, doCopy, selectDataset, copyDataset, outputCopiedObjects, doImport, startDICOMImport, waitForImportJobCompletion, outputImportJobStatus, getManifestFile, parseManifestFile, outputImageSetIds, getImageSetMetadata, outputImageFrameIds, doVerify, decodeAndVerifyImages, saveState, ], context, ), destroy: new Scenario( "Clean Up Resources", [loadState, confirmCleanup, deleteImageSets, deleteStack], context, ), }; // Call function if run directly import { fileURLToPath } from "node:url"; if (process.argv[1] === fileURLToPath(import.meta.url)) { parseScenarioArgs(scenarios, { name: "Health Imaging Workflow", description: "Work with DICOM images using an AWS Health Imaging data store.", synopsis: "node index.js --scenario <deploy | demo | destroy> [-h|--help] [-y|--yes] [-v|--verbose]", }); }
Deploy resources (deploy-steps.js).
import fs from "node:fs/promises"; import path from "node:path"; import { CloudFormationClient, CreateStackCommand, DescribeStacksCommand, } from "@aws-sdk/client-cloudformation"; import { STSClient, GetCallerIdentityCommand } from "@aws-sdk/client-sts"; import { ScenarioAction, ScenarioInput, ScenarioOutput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; import { retry } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; const cfnClient = new CloudFormationClient({}); const stsClient = new STSClient({}); const __dirname = path.dirname(new URL(import.meta.url).pathname); const cfnTemplatePath = path.join( __dirname, "../../../../../workflows/healthimaging_image_sets/resources/cfn_template.yaml", ); export const deployStack = new ScenarioInput( "deployStack", "Do you want to deploy the CloudFormation stack?", { type: "confirm" }, ); export const getStackName = new ScenarioInput( "getStackName", "Enter a name for the CloudFormation stack:", { type: "input", skipWhen: (/** @type {{}} */ state) => !state.deployStack }, ); export const getDatastoreName = new ScenarioInput( "getDatastoreName", "Enter a name for the HealthImaging datastore:", { type: "input", skipWhen: (/** @type {{}} */ state) => !state.deployStack }, ); export const getAccountId = new ScenarioAction( "getAccountId", async (/** @type {{}} */ state) => { const command = new GetCallerIdentityCommand({}); const response = await stsClient.send(command); state.accountId = response.Account; }, { skipWhen: (/** @type {{}} */ state) => !state.deployStack, }, ); export const createStack = new ScenarioAction( "createStack", async (/** @type {{}} */ state) => { const stackName = state.getStackName; const datastoreName = state.getDatastoreName; const accountId = state.accountId; const command = new CreateStackCommand({ StackName: stackName, TemplateBody: await fs.readFile(cfnTemplatePath, "utf8"), Capabilities: ["CAPABILITY_IAM"], Parameters: [ { ParameterKey: "datastoreName", ParameterValue: datastoreName, }, { ParameterKey: "userAccountID", ParameterValue: accountId, }, ], }); const response = await cfnClient.send(command); state.stackId = response.StackId; }, { skipWhen: (/** @type {{}} */ state) => !state.deployStack }, ); export const waitForStackCreation = new ScenarioAction( "waitForStackCreation", async (/** @type {{}} */ state) => { const command = new DescribeStacksCommand({ StackName: state.stackId, }); await retry({ intervalInMs: 10000, maxRetries: 60 }, async () => { const response = await cfnClient.send(command); const stack = response.Stacks?.find( (s) => s.StackName === state.getStackName, ); if (!stack || stack.StackStatus === "CREATE_IN_PROGRESS") { throw new Error("Stack creation is still in progress"); } if (stack.StackStatus === "CREATE_COMPLETE") { state.stackOutputs = stack.Outputs?.reduce((acc, output) => { acc[output.OutputKey] = output.OutputValue; return acc; }, {}); } else { throw new Error( `Stack creation failed with status: ${stack.StackStatus}`, ); } }); }, { skipWhen: (/** @type {{}} */ state) => !state.deployStack, }, ); export const outputState = new ScenarioOutput( "outputState", (/** @type {{}} */ state) => { /** * @type {{ stackOutputs: { DatastoreID: string, BucketName: string, RoleArn: string }}} */ const { stackOutputs } = state; return `Stack creation completed. Output values: Datastore ID: ${stackOutputs?.DatastoreID} Bucket Name: ${stackOutputs?.BucketName} Role ARN: ${stackOutputs?.RoleArn} `; }, { skipWhen: (/** @type {{}} */ state) => !state.deployStack }, );
Copy DICOM files (dataset-steps.js).
import { S3Client, CopyObjectCommand, ListObjectsV2Command, } from "@aws-sdk/client-s3"; import { ScenarioAction, ScenarioInput, ScenarioOutput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; const s3Client = new S3Client({}); const datasetOptions = [ { name: "CT of chest (2 images)", value: "00029d25-fb18-4d42-aaa5-a0897d1ac8f7", }, { name: "CT of pelvis (57 images)", value: "00025d30-ef8f-4135-a35a-d83eff264fc1", }, { name: "MRI of head (192 images)", value: "0002d261-8a5d-4e63-8e2e-0cbfac87b904", }, { name: "MRI of breast (92 images)", value: "0002dd07-0b7f-4a68-a655-44461ca34096", }, ]; /** * @typedef {{ stackOutputs: { * BucketName: string, * DatastoreID: string, * doCopy: boolean * }}} State */ export const selectDataset = new ScenarioInput( "selectDataset", (state) => { if (!state.doCopy) { process.exit(0); } return "Select a DICOM dataset to import:"; }, { type: "select", choices: datasetOptions, }, ); export const doCopy = new ScenarioInput( "doCopy", "Do you want to copy images from the public dataset into your bucket?", { type: "confirm", }, ); export const copyDataset = new ScenarioAction( "copyDataset", async (/** @type { State } */ state) => { const inputBucket = state.stackOutputs.BucketName; const inputPrefix = "input/"; const selectedDatasetId = state.selectDataset; const sourceBucket = "idc-open-data"; const sourcePrefix = `${selectedDatasetId}`; const listObjectsCommand = new ListObjectsV2Command({ Bucket: sourceBucket, Prefix: sourcePrefix, }); const objects = await s3Client.send(listObjectsCommand); const copyPromises = objects.Contents.map((object) => { const sourceKey = object.Key; const destinationKey = `${inputPrefix}${sourceKey .split("/") .slice(1) .join("/")}`; const copyCommand = new CopyObjectCommand({ Bucket: inputBucket, CopySource: `/${sourceBucket}/${sourceKey}`, Key: destinationKey, }); return s3Client.send(copyCommand); }); const results = await Promise.all(copyPromises); state.copiedObjects = results.length; }, ); export const outputCopiedObjects = new ScenarioOutput( "outputCopiedObjects", (state) => `${state.copiedObjects} DICOM files were copied.`, );
Start import into datastore (import-steps.js).
import { MedicalImagingClient, StartDICOMImportJobCommand, GetDICOMImportJobCommand, } from "@aws-sdk/client-medical-imaging"; import { ScenarioAction, ScenarioOutput, ScenarioInput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; import { retry } from "@aws-doc-sdk-examples/lib/utils/util-timers.js"; /** * @typedef {{ stackOutputs: { * BucketName: string, * DatastoreID: string, * RoleArn: string * }}} State */ export const doImport = new ScenarioInput( "doImport", "Do you want to import DICOM images into your datastore?", { type: "confirm", default: true, }, ); export const startDICOMImport = new ScenarioAction( "startDICOMImport", async (/** @type {State} */ state) => { if (!state.doImport) { process.exit(0); } const medicalImagingClient = new MedicalImagingClient({}); const inputS3Uri = `s3://${state.stackOutputs.BucketName}/input/`; const outputS3Uri = `s3://${state.stackOutputs.BucketName}/output/`; const command = new StartDICOMImportJobCommand({ dataAccessRoleArn: state.stackOutputs.RoleArn, datastoreId: state.stackOutputs.DatastoreID, inputS3Uri, outputS3Uri, }); const response = await medicalImagingClient.send(command); state.importJobId = response.jobId; }, ); export const waitForImportJobCompletion = new ScenarioAction( "waitForImportJobCompletion", async (/** @type {State} */ state) => { const medicalImagingClient = new MedicalImagingClient({}); const command = new GetDICOMImportJobCommand({ datastoreId: state.stackOutputs.DatastoreID, jobId: state.importJobId, }); await retry({ intervalInMs: 10000, maxRetries: 60 }, async () => { const response = await medicalImagingClient.send(command); const jobStatus = response.jobProperties?.jobStatus; if (!jobStatus || jobStatus === "IN_PROGRESS") { throw new Error("Import job is still in progress"); } if (jobStatus === "COMPLETED") { state.importJobOutputS3Uri = response.jobProperties.outputS3Uri; } else { throw new Error(`Import job failed with status: ${jobStatus}`); } }); }, ); export const outputImportJobStatus = new ScenarioOutput( "outputImportJobStatus", (state) => `DICOM import job completed. Output location: ${state.importJobOutputS3Uri}`, );
Get image set IDs (image-set-steps.js - ).
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3"; import { ScenarioAction, ScenarioOutput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; /** * @typedef {{ stackOutputs: { * BucketName: string, * DatastoreID: string, * RoleArn: string * }, importJobId: string, * importJobOutputS3Uri: string, * imageSetIds: string[], * manifestContent: { jobSummary: { imageSetsSummary: { imageSetId: string }[] } } * }} State */ const s3Client = new S3Client({}); export const getManifestFile = new ScenarioAction( "getManifestFile", async (/** @type {State} */ state) => { const bucket = state.stackOutputs.BucketName; const prefix = `output/${state.stackOutputs.DatastoreID}-DicomImport-${state.importJobId}/`; const key = `${prefix}job-output-manifest.json`; const command = new GetObjectCommand({ Bucket: bucket, Key: key, }); const response = await s3Client.send(command); const manifestContent = await response.Body.transformToString(); state.manifestContent = JSON.parse(manifestContent); }, ); export const parseManifestFile = new ScenarioAction( "parseManifestFile", (/** @type {State} */ state) => { const imageSetIds = state.manifestContent.jobSummary.imageSetsSummary.reduce((ids, next) => { return Object.assign({}, ids, { [next.imageSetId]: next.imageSetId, }); }, {}); state.imageSetIds = Object.keys(imageSetIds); }, ); export const outputImageSetIds = new ScenarioOutput( "outputImageSetIds", (/** @type {State} */ state) => `The image sets created by this import job are: \n${state.imageSetIds .map((id) => `Image set: ${id}`) .join("\n")}`, );
Get image frame IDs (image-frame-steps.js).
import { MedicalImagingClient, GetImageSetMetadataCommand, } from "@aws-sdk/client-medical-imaging"; import { gunzip } from "node:zlib"; import { promisify } from "node:util"; import { ScenarioAction, ScenarioOutput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; const gunzipAsync = promisify(gunzip); /** * @typedef {Object} DICOMValueRepresentation * @property {string} name * @property {string} type * @property {string} value */ /** * @typedef {Object} ImageFrameInformation * @property {string} ID * @property {Array<{ Checksum: number, Height: number, Width: number }>} PixelDataChecksumFromBaseToFullResolution * @property {number} MinPixelValue * @property {number} MaxPixelValue * @property {number} FrameSizeInBytes */ /** * @typedef {Object} DICOMMetadata * @property {Object} DICOM * @property {DICOMValueRepresentation[]} DICOMVRs * @property {ImageFrameInformation[]} ImageFrames */ /** * @typedef {Object} Series * @property {{ [key: string]: DICOMMetadata }} Instances */ /** * @typedef {Object} Study * @property {Object} DICOM * @property {Series[]} Series */ /** * @typedef {Object} Patient * @property {Object} DICOM */ /** * @typedef {{ * SchemaVersion: string, * DatastoreID: string, * ImageSetID: string, * Patient: Patient, * Study: Study * }} ImageSetMetadata */ /** * @typedef {{ stackOutputs: { * BucketName: string, * DatastoreID: string, * RoleArn: string * }, imageSetIds: string[] }} State */ const medicalImagingClient = new MedicalImagingClient({}); export const getImageSetMetadata = new ScenarioAction( "getImageSetMetadata", async (/** @type {State} */ state) => { const outputMetadata = []; for (const imageSetId of state.imageSetIds) { const command = new GetImageSetMetadataCommand({ datastoreId: state.stackOutputs.DatastoreID, imageSetId, }); const response = await medicalImagingClient.send(command); const compressedMetadataBlob = await response.imageSetMetadataBlob.transformToByteArray(); const decompressedMetadata = await gunzipAsync(compressedMetadataBlob); const imageSetMetadata = JSON.parse(decompressedMetadata.toString()); outputMetadata.push(imageSetMetadata); } state.imageSetMetadata = outputMetadata; }, ); export const outputImageFrameIds = new ScenarioOutput( "outputImageFrameIds", (/** @type {State & { imageSetMetadata: ImageSetMetadata[] }} */ state) => { let output = ""; for (const metadata of state.imageSetMetadata) { const imageSetId = metadata.ImageSetID; /** @type {DICOMMetadata[]} */ const instances = Object.values(metadata.Study.Series).flatMap( (series) => { return Object.values(series.Instances); }, ); const imageFrameIds = instances.flatMap((instance) => instance.ImageFrames.map((frame) => frame.ID), ); output += `Image set ID: ${imageSetId}\nImage frame IDs:\n${imageFrameIds.join( "\n", )}\n\n`; } return output; }, );
Verify image frames (verify-steps.js). The AWS HealthImaging Pixel Data Verification
library was used for verification. import { spawn } from "node:child_process"; import { ScenarioAction, ScenarioInput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; /** * @typedef {Object} DICOMValueRepresentation * @property {string} name * @property {string} type * @property {string} value */ /** * @typedef {Object} ImageFrameInformation * @property {string} ID * @property {Array<{ Checksum: number, Height: number, Width: number }>} PixelDataChecksumFromBaseToFullResolution * @property {number} MinPixelValue * @property {number} MaxPixelValue * @property {number} FrameSizeInBytes */ /** * @typedef {Object} DICOMMetadata * @property {Object} DICOM * @property {DICOMValueRepresentation[]} DICOMVRs * @property {ImageFrameInformation[]} ImageFrames */ /** * @typedef {Object} Series * @property {{ [key: string]: DICOMMetadata }} Instances */ /** * @typedef {Object} Study * @property {Object} DICOM * @property {Series[]} Series */ /** * @typedef {Object} Patient * @property {Object} DICOM */ /** * @typedef {{ * SchemaVersion: string, * DatastoreID: string, * ImageSetID: string, * Patient: Patient, * Study: Study * }} ImageSetMetadata */ /** * @typedef {{ stackOutputs: { * BucketName: string, * DatastoreID: string, * RoleArn: string * }, imageSetMetadata: ImageSetMetadata[] }} State */ export const doVerify = new ScenarioInput( "doVerify", "Do you want to verify the imported images?", { type: "confirm", default: true, }, ); export const decodeAndVerifyImages = new ScenarioAction( "decodeAndVerifyImages", async (/** @type {State} */ state) => { if (!state.doVerify) { process.exit(0); } const verificationTool = "./pixel-data-verification/index.js"; for (const metadata of state.imageSetMetadata) { const datastoreId = state.stackOutputs.DatastoreID; const imageSetId = metadata.ImageSetID; for (const [seriesInstanceUid, series] of Object.entries( metadata.Study.Series, )) { for (const [sopInstanceUid, _] of Object.entries(series.Instances)) { console.log( `Verifying image set ${imageSetId} with series ${seriesInstanceUid} and sop ${sopInstanceUid}`, ); const child = spawn( "node", [ verificationTool, datastoreId, imageSetId, seriesInstanceUid, sopInstanceUid, ], { stdio: "inherit" }, ); await new Promise((resolve, reject) => { child.on("exit", (code) => { if (code === 0) { resolve(); } else { reject( new Error( `Verification tool exited with code ${code} for image set ${imageSetId}`, ), ); } }); }); } } } }, );
Destroy resources (clean-up-steps.js).
import { CloudFormationClient, DeleteStackCommand, } from "@aws-sdk/client-cloudformation"; import { MedicalImagingClient, DeleteImageSetCommand, } from "@aws-sdk/client-medical-imaging"; import { ScenarioAction, ScenarioInput, } from "@aws-doc-sdk-examples/lib/scenario/index.js"; /** * @typedef {Object} DICOMValueRepresentation * @property {string} name * @property {string} type * @property {string} value */ /** * @typedef {Object} ImageFrameInformation * @property {string} ID * @property {Array<{ Checksum: number, Height: number, Width: number }>} PixelDataChecksumFromBaseToFullResolution * @property {number} MinPixelValue * @property {number} MaxPixelValue * @property {number} FrameSizeInBytes */ /** * @typedef {Object} DICOMMetadata * @property {Object} DICOM * @property {DICOMValueRepresentation[]} DICOMVRs * @property {ImageFrameInformation[]} ImageFrames */ /** * @typedef {Object} Series * @property {{ [key: string]: DICOMMetadata }} Instances */ /** * @typedef {Object} Study * @property {Object} DICOM * @property {Series[]} Series */ /** * @typedef {Object} Patient * @property {Object} DICOM */ /** * @typedef {{ * SchemaVersion: string, * DatastoreID: string, * ImageSetID: string, * Patient: Patient, * Study: Study * }} ImageSetMetadata */ /** * @typedef {{ stackOutputs: { * BucketName: string, * DatastoreID: string, * RoleArn: string * }, imageSetMetadata: ImageSetMetadata[] }} State */ const cfnClient = new CloudFormationClient({}); const medicalImagingClient = new MedicalImagingClient({}); export const confirmCleanup = new ScenarioInput( "confirmCleanup", "Do you want to delete the created resources?", { type: "confirm" }, ); export const deleteImageSets = new ScenarioAction( "deleteImageSets", async (/** @type {State} */ state) => { const datastoreId = state.stackOutputs.DatastoreID; for (const metadata of state.imageSetMetadata) { const command = new DeleteImageSetCommand({ datastoreId, imageSetId: metadata.ImageSetID, }); try { await medicalImagingClient.send(command); console.log(`Successfully deleted image set ${metadata.ImageSetID}`); } catch (e) { if (e instanceof Error) { if (e.name === "ConflictException") { console.log(`Image set ${metadata.ImageSetID} already deleted`); } } } } }, { skipWhen: (/** @type {{}} */ state) => !state.confirmCleanup, }, ); export const deleteStack = new ScenarioAction( "deleteStack", async (/** @type {State} */ state) => { const stackName = state.getStackName; const command = new DeleteStackCommand({ StackName: stackName, }); await cfnClient.send(command); console.log(`Stack ${stackName} deletion initiated`); }, { skipWhen: (/** @type {{}} */ state) => !state.confirmCleanup, }, );
-
For API details, see the following topics in AWS SDK for JavaScript API Reference.
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. -
- Python
-
- SDK for Python (Boto3)
-
Create an AWS CloudFormation stack with the necessary resources.
def deploy(self): """ Deploys prerequisite resources used by the scenario. The resources are defined in the associated `setup.yaml` AWS CloudFormation script and are deployed as a CloudFormation stack, so they can be easily managed and destroyed. """ print("\t\tLet's deploy the stack for resource creation.") stack_name = q.ask("\t\tEnter a name for the stack: ", q.non_empty) data_store_name = q.ask( "\t\tEnter a name for the Health Imaging Data Store: ", q.non_empty ) account_id = boto3.client("sts").get_caller_identity()["Account"] with open( "../../../../workflows/healthimaging_image_sets/resources/cfn_template.yaml" ) as setup_file: setup_template = setup_file.read() print(f"\t\tCreating {stack_name}.") stack = self.cf_resource.create_stack( StackName=stack_name, TemplateBody=setup_template, Capabilities=["CAPABILITY_NAMED_IAM"], Parameters=[ { "ParameterKey": "datastoreName", "ParameterValue": data_store_name, }, { "ParameterKey": "userAccountID", "ParameterValue": account_id, }, ], ) print("\t\tWaiting for stack to deploy. This typically takes a minute or two.") waiter = self.cf_resource.meta.client.get_waiter("stack_create_complete") waiter.wait(StackName=stack.name) stack.load() print(f"\t\tStack status: {stack.stack_status}") outputs_dictionary = { output["OutputKey"]: output["OutputValue"] for output in stack.outputs } self.input_bucket_name = outputs_dictionary["BucketName"] self.output_bucket_name = outputs_dictionary["BucketName"] self.role_arn = outputs_dictionary["RoleArn"] self.data_store_id = outputs_dictionary["DatastoreID"] return stack
Copy DICOM files to the Amazon S3 import bucket.
def copy_single_object(self, key, source_bucket, target_bucket, target_directory): """ Copies a single object from a source to a target bucket. :param key: The key of the object to copy. :param source_bucket: The source bucket for the copy. :param target_bucket: The target bucket for the copy. :param target_directory: The target directory for the copy. """ new_key = target_directory + "/" + key copy_source = {"Bucket": source_bucket, "Key": key} self.s3_client.copy_object( CopySource=copy_source, Bucket=target_bucket, Key=new_key ) print(f"\n\t\tCopying {key}.") def copy_images( self, source_bucket, source_directory, target_bucket, target_directory ): """ Copies the images from the source to the target bucket using multiple threads. :param source_bucket: The source bucket for the images. :param source_directory: Directory within the source bucket. :param target_bucket: The target bucket for the images. :param target_directory: Directory within the target bucket. """ # Get list of all objects in source bucket. list_response = self.s3_client.list_objects_v2( Bucket=source_bucket, Prefix=source_directory ) objs = list_response["Contents"] keys = [obj["Key"] for obj in objs] # Copy the objects in the bucket. for key in keys: self.copy_single_object(key, source_bucket, target_bucket, target_directory) print("\t\tDone copying all objects.")
Import the DICOM files to the Amazon S3 data store.
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def start_dicom_import_job( self, data_store_id, input_bucket_name, input_directory, output_bucket_name, output_directory, role_arn, ): """ Routine which starts a HealthImaging import job. :param data_store_id: The HealthImaging data store ID. :param input_bucket_name: The name of the Amazon S3 bucket containing the DICOM files. :param input_directory: The directory in the S3 bucket containing the DICOM files. :param output_bucket_name: The name of the S3 bucket for the output. :param output_directory: The directory in the S3 bucket to store the output. :param role_arn: The ARN of the IAM role with permissions for the import. :return: The job ID of the import. """ input_uri = f"s3://{input_bucket_name}/{input_directory}/" output_uri = f"s3://{output_bucket_name}/{output_directory}/" try: job = self.medical_imaging_client.start_dicom_import_job( jobName="examplejob", datastoreId=data_store_id, dataAccessRoleArn=role_arn, inputS3Uri=input_uri, outputS3Uri=output_uri, ) except ClientError as err: logger.error( "Couldn't start DICOM import job. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return job["jobId"]
Get image sets created by the DICOM import job.
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def get_image_sets_for_dicom_import_job(self, datastore_id, import_job_id): """ Retrieves the image sets created for an import job. :param datastore_id: The HealthImaging data store ID :param import_job_id: The import job ID :return: List of image set IDs """ import_job = self.medical_imaging_client.get_dicom_import_job( datastoreId=datastore_id, jobId=import_job_id ) output_uri = import_job["jobProperties"]["outputS3Uri"] bucket = output_uri.split("/")[2] key = "/".join(output_uri.split("/")[3:]) # Try to get the manifest. retries = 3 while retries > 0: try: obj = self.s3_client.get_object( Bucket=bucket, Key=key + "job-output-manifest.json" ) body = obj["Body"] break except ClientError as error: retries = retries - 1 time.sleep(3) try: data = json.load(body) expression = jmespath.compile("jobSummary.imageSetsSummary[].imageSetId") image_sets = expression.search(data) except json.decoder.JSONDecodeError as error: image_sets = import_job["jobProperties"] return image_sets def get_image_set(self, datastore_id, image_set_id, version_id=None): """ Get the properties of an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The optional version of the image set. :return: The image set properties. """ try: if version_id: image_set = self.medical_imaging_client.get_image_set( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, ) else: image_set = self.medical_imaging_client.get_image_set( imageSetId=image_set_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't get image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return image_set
Get image frame information for image sets.
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def get_image_frames_for_image_set(self, datastore_id, image_set_id, out_directory): """ Get the image frames for an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param out_directory: The directory to save the file. :return: The image frames. """ image_frames = [] file_name = os.path.join(out_directory, f"{image_set_id}_metadata.json.gzip") file_name = file_name.replace("/", "\\\\") self.get_image_set_metadata(file_name, datastore_id, image_set_id) try: with gzip.open(file_name, "rb") as f_in: doc = json.load(f_in) instances = jmespath.search("Study.Series.*.Instances[].*[]", doc) for instance in instances: rescale_slope = jmespath.search("DICOM.RescaleSlope", instance) rescale_intercept = jmespath.search("DICOM.RescaleIntercept", instance) image_frames_json = jmespath.search("ImageFrames[][]", instance) for image_frame in image_frames_json: checksum_json = jmespath.search( "max_by(PixelDataChecksumFromBaseToFullResolution, &Width)", image_frame, ) image_frame_info = { "imageSetId": image_set_id, "imageFrameId": image_frame["ID"], "rescaleIntercept": rescale_intercept, "rescaleSlope": rescale_slope, "minPixelValue": image_frame["MinPixelValue"], "maxPixelValue": image_frame["MaxPixelValue"], "fullResolutionChecksum": checksum_json["Checksum"], } image_frames.append(image_frame_info) return image_frames except TypeError: return {} except ClientError as err: logger.error( "Couldn't get image frames for image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return image_frames def get_image_set_metadata( self, metadata_file, datastore_id, image_set_id, version_id=None ): """ Get the metadata of an image set. :param metadata_file: The file to store the JSON gzipped metadata. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param version_id: The version of the image set. """ try: if version_id: image_set_metadata = self.medical_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id, versionId=version_id, ) else: image_set_metadata = self.medical_imaging_client.get_image_set_metadata( imageSetId=image_set_id, datastoreId=datastore_id ) with open(metadata_file, "wb") as f: for chunk in image_set_metadata["imageSetMetadataBlob"].iter_chunks(): if chunk: f.write(chunk) except ClientError as err: logger.error( "Couldn't get image metadata. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
Download, decode and verify image frames.
class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def get_pixel_data( self, file_path_to_write, datastore_id, image_set_id, image_frame_id ): """ Get an image frame's pixel data. :param file_path_to_write: The path to write the image frame's HTJ2K encoded pixel data. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. :param image_frame_id: The ID of the image frame. """ try: image_frame = self.medical_imaging_client.get_image_frame( datastoreId=datastore_id, imageSetId=image_set_id, imageFrameInformation={"imageFrameId": image_frame_id}, ) with open(file_path_to_write, "wb") as f: for chunk in image_frame["imageFrameBlob"].iter_chunks(): f.write(chunk) except ClientError as err: logger.error( "Couldn't get image frame. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def download_decode_and_check_image_frames( self, data_store_id, image_frames, out_directory ): """ Downloads image frames, decodes them, and uses the checksum to validate the decoded images. :param data_store_id: The HealthImaging data store ID. :param image_frames: A list of dicts containing image frame information. :param out_directory: A directory for the downloaded images. :return: True if the function succeeded; otherwise, False. """ total_result = True for image_frame in image_frames: image_file_path = f"{out_directory}/image_{image_frame['imageFrameId']}.jph" self.get_pixel_data( image_file_path, data_store_id, image_frame["imageSetId"], image_frame["imageFrameId"], ) image_array = self.jph_image_to_opj_bitmap(image_file_path) crc32_checksum = image_frame["fullResolutionChecksum"] # Verify checksum. crc32_calculated = zlib.crc32(image_array) image_result = crc32_checksum == crc32_calculated print( f"\t\tImage checksum verified for {image_frame['imageFrameId']}: {image_result }" ) total_result = total_result and image_result return total_result @staticmethod def jph_image_to_opj_bitmap(jph_file): """ Decode the image to a bitmap using an OPENJPEG library. :param jph_file: The file to decode. :return: The decoded bitmap as an array. """ # Use format 2 for the JPH file. params = openjpeg.utils.get_parameters(jph_file, 2) print(f"\n\t\tImage parameters for {jph_file}: \n\t\t{params}") image_array = openjpeg.utils.decode(jph_file, 2) return image_array
Clean up resources.
def destroy(self, stack): """ Destroys the resources managed by the CloudFormation stack, and the CloudFormation stack itself. :param stack: The CloudFormation stack that manages the example resources. """ print(f"\t\tCleaning up resources and {stack.name}.") data_store_id = None for oput in stack.outputs: if oput["OutputKey"] == "DatastoreID": data_store_id = oput["OutputValue"] if data_store_id is not None: print(f"\t\tDeleting image sets in data store {data_store_id}.") image_sets = self.medical_imaging_wrapper.search_image_sets( data_store_id, {} ) image_set_ids = [image_set["imageSetId"] for image_set in image_sets] for image_set_id in image_set_ids: self.medical_imaging_wrapper.delete_image_set( data_store_id, image_set_id ) print(f"\t\tDeleted image set with id : {image_set_id}") print(f"\t\tDeleting {stack.name}.") stack.delete() print("\t\tWaiting for stack removal. This may take a few minutes.") waiter = self.cf_resource.meta.client.get_waiter("stack_delete_complete") waiter.wait(StackName=stack.name) print("\t\tStack delete complete.") class MedicalImagingWrapper: """Encapsulates AWS HealthImaging functionality.""" def __init__(self, medical_imaging_client, s3_client): """ :param medical_imaging_client: A Boto3 Amazon MedicalImaging client. :param s3_client: A Boto3 S3 client. """ self.medical_imaging_client = medical_imaging_client self.s3_client = s3_client @classmethod def from_client(cls): medical_imaging_client = boto3.client("medical-imaging") s3_client = boto3.client("s3") return cls(medical_imaging_client, s3_client) def search_image_sets(self, datastore_id, search_filter): """ Search for image sets. :param datastore_id: The ID of the data store. :param search_filter: The search filter. For example: {"filters" : [{ "operator": "EQUAL", "values": [{"DICOMPatientId": "3524578"}]}]}. :return: The list of image sets. """ try: paginator = self.medical_imaging_client.get_paginator("search_image_sets") page_iterator = paginator.paginate( datastoreId=datastore_id, searchCriteria=search_filter ) metadata_summaries = [] for page in page_iterator: metadata_summaries.extend(page["imageSetsMetadataSummaries"]) except ClientError as err: logger.error( "Couldn't search image sets. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return metadata_summaries def delete_image_set(self, datastore_id, image_set_id): """ Delete an image set. :param datastore_id: The ID of the data store. :param image_set_id: The ID of the image set. """ try: delete_results = self.medical_imaging_client.delete_image_set( imageSetId=image_set_id, datastoreId=datastore_id ) except ClientError as err: logger.error( "Couldn't delete image set. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
-
For API details, see the following topics in AWS SDK for Python (Boto3) API Reference.
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. -