Analyzing a video stored in an Amazon S3 bucket with Java or Python (SDK)
This procedure shows you how to detect labels in a video by using Amazon Rekognition Video label detection operations, a video stored in an Amazon S3 bucket, and an Amazon SNS topic. The procedure also shows how to use an Amazon SQS queue to get the completion status from the Amazon SNS topic. For more information, see Calling Amazon Rekognition Video operations. You aren't restricted to using an Amazon SQS queue. For example, you can use an AWS Lambda function to get the completion status. For more information, see Invoking Lambda functions using Amazon SNS notifications.
The example code in this procedure shows you how to do the following:
-
Create the Amazon SNS topic.
-
Create the Amazon SQS queue.
-
Give Amazon Rekognition Video permission to publish the completion status of a video analysis operation to the Amazon SNS topic.
-
Subscribe the Amazon SQS queue to the Amazon SNS topic.
-
Start the video analysis request by calling StartLabelDetection.
-
Get the completion status from the Amazon SQS queue. The example tracks the job identifier (
JobId
) that's returned inStartLabelDetection
and only gets the results for matching job identifiers that are read from the completion status. This is an important consideration if other applications are using the same queue and topic. For simplicity, the example deletes jobs that don't match. Consider adding them to an Amazon SQS dead-letter queue for further investigation. -
Get and display the video analysis results by calling GetLabelDetection.
Prerequisites
The example code for this procedure is provided in Java and Python. You need to have the appropriate AWS SDK installed. For more information, see Getting started with Amazon Rekognition. The AWS account that you use must have access permissions to the Amazon Rekognition API. For more information, see Actions Defined by Amazon Rekognition.
To detect labels in a video
-
Configure user access to Amazon Rekognition Video and configure Amazon Rekognition Video access to Amazon SNS. For more information, see Configuring Amazon Rekognition Video. You don't need to do steps 3, 4, 5, and 6 because the example code creates and configures the Amazon SNS topic and Amazon SQS queue.
-
Upload an MOV or MPEG-4 format video file to an Amazon S3 Bucket. For test purposes, upload a video that's no longer than 30 seconds in length.
For instructions, see Uploading Objects into Amazon S3 in the Amazon Simple Storage Service User Guide.
-
Use the following code examples to detect labels in a video.
- Java
-
In the function
main
:-
Replace
roleArn
with the ARN of the IAM service role that you created in step 7 of To configure Amazon Rekognition Video. -
Replace the values of
bucket
andvideo
with the bucket and video file name that you specified in step 2.
//Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. //PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.) package com.amazonaws.samples; import com.amazonaws.auth.policy.Policy; import com.amazonaws.auth.policy.Condition; import com.amazonaws.auth.policy.Principal; import com.amazonaws.auth.policy.Resource; import com.amazonaws.auth.policy.Statement; import com.amazonaws.auth.policy.Statement.Effect; import com.amazonaws.auth.policy.actions.SQSActions; import com.amazonaws.services.rekognition.AmazonRekognition; import com.amazonaws.services.rekognition.AmazonRekognitionClientBuilder; import com.amazonaws.services.rekognition.model.CelebrityDetail; import com.amazonaws.services.rekognition.model.CelebrityRecognition; import com.amazonaws.services.rekognition.model.CelebrityRecognitionSortBy; import com.amazonaws.services.rekognition.model.ContentModerationDetection; import com.amazonaws.services.rekognition.model.ContentModerationSortBy; import com.amazonaws.services.rekognition.model.Face; import com.amazonaws.services.rekognition.model.FaceDetection; import com.amazonaws.services.rekognition.model.FaceMatch; import com.amazonaws.services.rekognition.model.FaceSearchSortBy; import com.amazonaws.services.rekognition.model.GetCelebrityRecognitionRequest; import com.amazonaws.services.rekognition.model.GetCelebrityRecognitionResult; import com.amazonaws.services.rekognition.model.GetContentModerationRequest; import com.amazonaws.services.rekognition.model.GetContentModerationResult; import com.amazonaws.services.rekognition.model.GetFaceDetectionRequest; import com.amazonaws.services.rekognition.model.GetFaceDetectionResult; import com.amazonaws.services.rekognition.model.GetFaceSearchRequest; import com.amazonaws.services.rekognition.model.GetFaceSearchResult; import com.amazonaws.services.rekognition.model.GetLabelDetectionRequest; import com.amazonaws.services.rekognition.model.GetLabelDetectionResult; import com.amazonaws.services.rekognition.model.GetPersonTrackingRequest; import com.amazonaws.services.rekognition.model.GetPersonTrackingResult; import com.amazonaws.services.rekognition.model.Instance; import com.amazonaws.services.rekognition.model.Label; import com.amazonaws.services.rekognition.model.LabelDetection; import com.amazonaws.services.rekognition.model.LabelDetectionSortBy; import com.amazonaws.services.rekognition.model.NotificationChannel; import com.amazonaws.services.rekognition.model.Parent; import com.amazonaws.services.rekognition.model.PersonDetection; import com.amazonaws.services.rekognition.model.PersonMatch; import com.amazonaws.services.rekognition.model.PersonTrackingSortBy; import com.amazonaws.services.rekognition.model.S3Object; import com.amazonaws.services.rekognition.model.StartCelebrityRecognitionRequest; import com.amazonaws.services.rekognition.model.StartCelebrityRecognitionResult; import com.amazonaws.services.rekognition.model.StartContentModerationRequest; import com.amazonaws.services.rekognition.model.StartContentModerationResult; import com.amazonaws.services.rekognition.model.StartFaceDetectionRequest; import com.amazonaws.services.rekognition.model.StartFaceDetectionResult; import com.amazonaws.services.rekognition.model.StartFaceSearchRequest; import com.amazonaws.services.rekognition.model.StartFaceSearchResult; import com.amazonaws.services.rekognition.model.StartLabelDetectionRequest; import com.amazonaws.services.rekognition.model.StartLabelDetectionResult; import com.amazonaws.services.rekognition.model.StartPersonTrackingRequest; import com.amazonaws.services.rekognition.model.StartPersonTrackingResult; import com.amazonaws.services.rekognition.model.Video; import com.amazonaws.services.rekognition.model.VideoMetadata; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sns.model.CreateTopicRequest; import com.amazonaws.services.sns.model.CreateTopicResult; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.QueueAttributeName; import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.*; public class VideoDetect { private static String sqsQueueName=null; private static String snsTopicName=null; private static String snsTopicArn = null; private static String roleArn= null; private static String sqsQueueUrl = null; private static String sqsQueueArn = null; private static String startJobId = null; private static String bucket = null; private static String video = null; private static AmazonSQS sqs=null; private static AmazonSNS sns=null; private static AmazonRekognition rek = null; private static NotificationChannel channel= new NotificationChannel() .withSNSTopicArn(snsTopicArn) .withRoleArn(roleArn); public static void main(String[] args) throws Exception { video = ""; bucket = ""; roleArn= ""; sns = AmazonSNSClientBuilder.defaultClient(); sqs= AmazonSQSClientBuilder.defaultClient(); rek = AmazonRekognitionClientBuilder.defaultClient(); CreateTopicandQueue(); //================================================= StartLabelDetection(bucket, video); if (GetSQSMessageSuccess()==true) GetLabelDetectionResults(); //================================================= DeleteTopicandQueue(); System.out.println("Done!"); } static boolean GetSQSMessageSuccess() throws Exception { boolean success=false; System.out.println("Waiting for job: " + startJobId); //Poll queue for messages List<Message> messages=null; int dotLine=0; boolean jobFound=false; //loop until the job status is published. Ignore other messages in queue. do{ messages = sqs.receiveMessage(sqsQueueUrl).getMessages(); if (dotLine++<40){ System.out.print("."); }else{ System.out.println(); dotLine=0; } if (!messages.isEmpty()) { //Loop through messages received. for (Message message: messages) { String notification = message.getBody(); // Get status and job id from notification. ObjectMapper mapper = new ObjectMapper(); JsonNode jsonMessageTree = mapper.readTree(notification); JsonNode messageBodyText = jsonMessageTree.get("Message"); ObjectMapper operationResultMapper = new ObjectMapper(); JsonNode jsonResultTree = operationResultMapper.readTree(messageBodyText.textValue()); JsonNode operationJobId = jsonResultTree.get("JobId"); JsonNode operationStatus = jsonResultTree.get("Status"); System.out.println("Job found was " + operationJobId); // Found job. Get the results and display. if(operationJobId.asText().equals(startJobId)){ jobFound=true; System.out.println("Job id: " + operationJobId ); System.out.println("Status : " + operationStatus.toString()); if (operationStatus.asText().equals("SUCCEEDED")){ success=true; } else{ System.out.println("Video analysis failed"); } sqs.deleteMessage(sqsQueueUrl,message.getReceiptHandle()); } else{ System.out.println("Job received was not job " + startJobId); //Delete unknown message. Consider moving message to dead letter queue sqs.deleteMessage(sqsQueueUrl,message.getReceiptHandle()); } } } else { Thread.sleep(5000); } } while (!jobFound); System.out.println("Finished processing video"); return success; } private static void StartLabelDetection(String bucket, String video) throws Exception{ NotificationChannel channel= new NotificationChannel() .withSNSTopicArn(snsTopicArn) .withRoleArn(roleArn); StartLabelDetectionRequest req = new StartLabelDetectionRequest() .withVideo(new Video() .withS3Object(new S3Object() .withBucket(bucket) .withName(video))) .withMinConfidence(50F) .withJobTag("DetectingLabels") .withNotificationChannel(channel); StartLabelDetectionResult startLabelDetectionResult = rek.startLabelDetection(req); startJobId=startLabelDetectionResult.getJobId(); } private static void GetLabelDetectionResults() throws Exception{ int maxResults=10; String paginationToken=null; GetLabelDetectionResult labelDetectionResult=null; do { if (labelDetectionResult !=null){ paginationToken = labelDetectionResult.getNextToken(); } GetLabelDetectionRequest labelDetectionRequest= new GetLabelDetectionRequest() .withJobId(startJobId) .withSortBy(LabelDetectionSortBy.TIMESTAMP) .withMaxResults(maxResults) .withNextToken(paginationToken); labelDetectionResult = rek.getLabelDetection(labelDetectionRequest); VideoMetadata videoMetaData=labelDetectionResult.getVideoMetadata(); System.out.println("Format: " + videoMetaData.getFormat()); System.out.println("Codec: " + videoMetaData.getCodec()); System.out.println("Duration: " + videoMetaData.getDurationMillis()); System.out.println("FrameRate: " + videoMetaData.getFrameRate()); //Show labels, confidence and detection times List<LabelDetection> detectedLabels= labelDetectionResult.getLabels(); for (LabelDetection detectedLabel: detectedLabels) { long seconds=detectedLabel.getTimestamp(); Label label=detectedLabel.getLabel(); System.out.println("Millisecond: " + Long.toString(seconds) + " "); System.out.println(" Label:" + label.getName()); System.out.println(" Confidence:" + detectedLabel.getLabel().getConfidence().toString()); List<Instance> instances = label.getInstances(); System.out.println(" Instances of " + label.getName()); if (instances.isEmpty()) { System.out.println(" " + "None"); } else { for (Instance instance : instances) { System.out.println(" Confidence: " + instance.getConfidence().toString()); System.out.println(" Bounding box: " + instance.getBoundingBox().toString()); } } System.out.println(" Parent labels for " + label.getName() + ":"); List<Parent> parents = label.getParents(); if (parents.isEmpty()) { System.out.println(" None"); } else { for (Parent parent : parents) { System.out.println(" " + parent.getName()); } } System.out.println(); } } while (labelDetectionResult !=null && labelDetectionResult.getNextToken() != null); } // Creates an SNS topic and SQS queue. The queue is subscribed to the topic. static void CreateTopicandQueue() { //create a new SNS topic snsTopicName="AmazonRekognitionTopic" + Long.toString(System.currentTimeMillis()); CreateTopicRequest createTopicRequest = new CreateTopicRequest(snsTopicName); CreateTopicResult createTopicResult = sns.createTopic(createTopicRequest); snsTopicArn=createTopicResult.getTopicArn(); //Create a new SQS Queue sqsQueueName="AmazonRekognitionQueue" + Long.toString(System.currentTimeMillis()); final CreateQueueRequest createQueueRequest = new CreateQueueRequest(sqsQueueName); sqsQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl(); sqsQueueArn = sqs.getQueueAttributes(sqsQueueUrl, Arrays.asList("QueueArn")).getAttributes().get("QueueArn"); //Subscribe SQS queue to SNS topic String sqsSubscriptionArn = sns.subscribe(snsTopicArn, "sqs", sqsQueueArn).getSubscriptionArn(); // Authorize queue Policy policy = new Policy().withStatements( new Statement(Effect.Allow) .withPrincipals(Principal.AllUsers) .withActions(SQSActions.SendMessage) .withResources(new Resource(sqsQueueArn)) .withConditions(new Condition().withType("ArnEquals").withConditionKey("aws:SourceArn").withValues(snsTopicArn)) ); Map queueAttributes = new HashMap(); queueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson()); sqs.setQueueAttributes(new SetQueueAttributesRequest(sqsQueueUrl, queueAttributes)); System.out.println("Topic arn: " + snsTopicArn); System.out.println("Queue arn: " + sqsQueueArn); System.out.println("Queue url: " + sqsQueueUrl); System.out.println("Queue sub arn: " + sqsSubscriptionArn ); } static void DeleteTopicandQueue() { if (sqs !=null) { sqs.deleteQueue(sqsQueueUrl); System.out.println("SQS queue deleted"); } if (sns!=null) { sns.deleteTopic(snsTopicArn); System.out.println("SNS topic deleted"); } } }
-
- Python
-
In the function
main
:-
Replace
roleArn
with the ARN of the IAM service role that you created in step 7 of To configure Amazon Rekognition Video. -
Replace the values of
bucket
andvideo
with the bucket and video file name that you specified in step 2. -
Replace the value of
profile_name
in the line that creates the Rekognition session with the name of your developer profile. -
You can also include filtration criteria in the settings paramter. For example, you can use a
LabelsInclusionFilter
or aLabelsExclusionFilter
alongside a list of desired values. In the code below, you can uncomment theFeatures
andSettings
section and provide your own values to limit the returned results to just the labels your are interested in. -
In the call to
GetLabelDetection
, you can provide values for theSortBy
andAggregateBy
arguments. To sort by time, set the value of theSortBy
input parameter toTIMESTAMP
. To sort by entity, use theSortBy
input parameter with the value that's appropriate for the operation you're performing. To aggregate results by timestamp, set the value of theAggregateBy
parameter toTIMESTAMPS
. To aggregate by video segment, useSEGMENTS
.
## Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # PDX-License-Identifier: MIT-0 (For details, see https://github.com/awsdocs/amazon-rekognition-developer-guide/blob/master/LICENSE-SAMPLECODE.) import boto3 import json import sys import time class VideoDetect: jobId = '' roleArn = '' bucket = '' video = '' startJobId = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, video, client, rek, sqs, sns): self.roleArn = role self.bucket = bucket self.video = video self.client = client self.rek = rek self.sqs = sqs self.sns = sns def GetSQSMessageSuccess(self): jobFound = False succeeded = False dotLine = 0 while jobFound == False: sqsResponse = self.sqs.receive_message(QueueUrl=self.sqsQueueUrl, MessageAttributeNames=['ALL'], MaxNumberOfMessages=10) if sqsResponse: if 'Messages' not in sqsResponse: if dotLine < 40: print('.', end='') dotLine = dotLine + 1 else: print() dotLine = 0 sys.stdout.flush() time.sleep(5) continue for message in sqsResponse['Messages']: notification = json.loads(message['Body']) rekMessage = json.loads(notification['Message']) print(rekMessage['JobId']) print(rekMessage['Status']) if rekMessage['JobId'] == self.startJobId: print('Matching Job Found:' + rekMessage['JobId']) jobFound = True if (rekMessage['Status'] == 'SUCCEEDED'): succeeded = True self.sqs.delete_message(QueueUrl=self.sqsQueueUrl, ReceiptHandle=message['ReceiptHandle']) else: print("Job didn't match:" + str(rekMessage['JobId']) + ' : ' + self.startJobId) # Delete the unknown message. Consider sending to dead letter queue self.sqs.delete_message(QueueUrl=self.sqsQueueUrl, ReceiptHandle=message['ReceiptHandle']) return succeeded def StartLabelDetection(self): response = self.rek.start_label_detection(Video={'S3Object': {'Bucket': self.bucket, 'Name': self.video}}, NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}, MinConfidence=90, # Filtration options, uncomment and add desired labels to filter returned labels # Features=['GENERAL_LABELS'], # Settings={ # 'GeneralLabels': { # 'LabelInclusionFilters': ['Clothing'] # }} ) self.startJobId = response['JobId'] print('Start Job Id: ' + self.startJobId) def GetLabelDetectionResults(self): maxResults = 10 paginationToken = '' finished = False while finished == False: response = self.rek.get_label_detection(JobId=self.startJobId, MaxResults=maxResults, NextToken=paginationToken, SortBy='TIMESTAMP', AggregateBy="TIMESTAMPS") print('Codec: ' + response['VideoMetadata']['Codec']) print('Duration: ' + str(response['VideoMetadata']['DurationMillis'])) print('Format: ' + response['VideoMetadata']['Format']) print('Frame rate: ' + str(response['VideoMetadata']['FrameRate'])) print() for labelDetection in response['Labels']: label = labelDetection['Label'] print("Timestamp: " + str(labelDetection['Timestamp'])) print(" Label: " + label['Name']) print(" Confidence: " + str(label['Confidence'])) print(" Instances:") for instance in label['Instances']: print(" Confidence: " + str(instance['Confidence'])) print(" Bounding box") print(" Top: " + str(instance['BoundingBox']['Top'])) print(" Left: " + str(instance['BoundingBox']['Left'])) print(" Width: " + str(instance['BoundingBox']['Width'])) print(" Height: " + str(instance['BoundingBox']['Height'])) print() print() print("Parents:") for parent in label['Parents']: print(" " + parent['Name']) print("Aliases:") for alias in label['Aliases']: print(" " + alias['Name']) print("Categories:") for category in label['Categories']: print(" " + category['Name']) print("----------") print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonRekognitionExample" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonRekognitionQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe( TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def DeleteTopicandQueue(self): self.sqs.delete_queue(QueueUrl=self.sqsQueueUrl) self.sns.delete_topic(TopicArn=self.snsTopicArn) def main(): roleArn = 'role-arn' bucket = 'bucket-name' video = 'video-name' session = boto3.Session(profile_name='profile-name') client = session.client('rekognition') rek = boto3.client('rekognition') sqs = boto3.client('sqs') sns = boto3.client('sns') analyzer = VideoDetect(roleArn, bucket, video, client, rek, sqs, sns) analyzer.CreateTopicandQueue() analyzer.StartLabelDetection() if analyzer.GetSQSMessageSuccess() == True: analyzer.GetLabelDetectionResults() analyzer.DeleteTopicandQueue() if __name__ == "__main__": main()
-
- Node.Js
-
In the following sample code:
-
Replace the value of
REGION
with the name of your account's operating region. -
Replace the value of
bucket
with the name of the Amazon S3 bucket containing your video file. -
Replace the value of
videoName
with the name of the video file in your Amazon S3 bucket. -
Replace the value of
profile_name
in the line that creates the Rekognition session with the name of your developer profile. -
Replace
roleArn
with the ARN of the IAM service role that you created in step 7 of To configure Amazon Rekognition Video.
import { CreateQueueCommand, GetQueueAttributesCommand, GetQueueUrlCommand, SetQueueAttributesCommand, DeleteQueueCommand, ReceiveMessageCommand, DeleteMessageCommand } from "@aws-sdk/client-sqs"; import {CreateTopicCommand, SubscribeCommand, DeleteTopicCommand } from "@aws-sdk/client-sns"; import { SQSClient } from "@aws-sdk/client-sqs"; import { SNSClient } from "@aws-sdk/client-sns"; import { RekognitionClient, StartLabelDetectionCommand, GetLabelDetectionCommand } from "@aws-sdk/client-rekognition"; import { stdout } from "process"; import {fromIni} from '@aws-sdk/credential-providers'; // Set the AWS Region. const REGION = "region-name"; //e.g. "us-east-1" const profileName = "profile-name" // Create SNS service object. const sqsClient = new SQSClient({ region: REGION, credentials: fromIni({profile: profileName,}), }); const snsClient = new SNSClient({ region: REGION, credentials: fromIni({profile: profileName,}), }); const rekClient = new RekognitionClient({region: REGION, credentials: fromIni({profile: profileName,}), }); // Set bucket and video variables const bucket = "bucket-name"; const videoName = "video-name"; const roleArn = "role-arn" var startJobId = "" var ts = Date.now(); const snsTopicName = "AmazonRekognitionExample" + ts; const snsTopicParams = {Name: snsTopicName} const sqsQueueName = "AmazonRekognitionQueue-" + ts; // Set the parameters const sqsParams = { QueueName: sqsQueueName, //SQS_QUEUE_URL Attributes: { DelaySeconds: "60", // Number of seconds delay. MessageRetentionPeriod: "86400", // Number of seconds delay. }, }; const createTopicandQueue = async () => { try { // Create SNS topic const topicResponse = await snsClient.send(new CreateTopicCommand(snsTopicParams)); const topicArn = topicResponse.TopicArn console.log("Success", topicResponse); // Create SQS Queue const sqsResponse = await sqsClient.send(new CreateQueueCommand(sqsParams)); console.log("Success", sqsResponse); const sqsQueueCommand = await sqsClient.send(new GetQueueUrlCommand({QueueName: sqsQueueName})) const sqsQueueUrl = sqsQueueCommand.QueueUrl const attribsResponse = await sqsClient.send(new GetQueueAttributesCommand({QueueUrl: sqsQueueUrl, AttributeNames: ['QueueArn']})) const attribs = attribsResponse.Attributes console.log(attribs) const queueArn = attribs.QueueArn // subscribe SQS queue to SNS topic const subscribed = await snsClient.send(new SubscribeCommand({TopicArn: topicArn, Protocol:'sqs', Endpoint: queueArn})) const policy = { Version: "2012-10-17", Statement: [ { Sid: "MyPolicy", Effect: "Allow", Principal: {AWS: "*"}, Action: "SQS:SendMessage", Resource: queueArn, Condition: { ArnEquals: { 'aws:SourceArn': topicArn } } } ] }; const response = sqsClient.send(new SetQueueAttributesCommand({QueueUrl: sqsQueueUrl, Attributes: {Policy: JSON.stringify(policy)}})) console.log(response) console.log(sqsQueueUrl, topicArn) return [sqsQueueUrl, topicArn] } catch (err) { console.log("Error", err); } }; const startLabelDetection = async (roleArn, snsTopicArn) => { try { //Initiate label detection and update value of startJobId with returned Job ID const labelDetectionResponse = await rekClient.send(new StartLabelDetectionCommand({Video:{S3Object:{Bucket:bucket, Name:videoName}}, NotificationChannel:{RoleArn: roleArn, SNSTopicArn: snsTopicArn}})); startJobId = labelDetectionResponse.JobId console.log(`JobID: ${startJobId}`) return startJobId } catch (err) { console.log("Error", err); } }; const getLabelDetectionResults = async(startJobId) => { console.log("Retrieving Label Detection results") // Set max results, paginationToken and finished will be updated depending on response values var maxResults = 10 var paginationToken = '' var finished = false // Begin retrieving label detection results while (finished == false){ var response = await rekClient.send(new GetLabelDetectionCommand({JobId: startJobId, MaxResults: maxResults, NextToken: paginationToken, SortBy:'TIMESTAMP'})) // Log metadata console.log(`Codec: ${response.VideoMetadata.Codec}`) console.log(`Duration: ${response.VideoMetadata.DurationMillis}`) console.log(`Format: ${response.VideoMetadata.Format}`) console.log(`Frame Rate: ${response.VideoMetadata.FrameRate}`) console.log() // For every detected label, log label, confidence, bounding box, and timestamp response.Labels.forEach(labelDetection => { var label = labelDetection.Label console.log(`Timestamp: ${labelDetection.Timestamp}`) console.log(`Label: ${label.Name}`) console.log(`Confidence: ${label.Confidence}`) console.log("Instances:") label.Instances.forEach(instance =>{ console.log(`Confidence: ${instance.Confidence}`) console.log("Bounding Box:") console.log(`Top: ${instance.Confidence}`) console.log(`Left: ${instance.Confidence}`) console.log(`Width: ${instance.Confidence}`) console.log(`Height: ${instance.Confidence}`) console.log() }) console.log() // Log parent if found console.log(" Parents:") label.Parents.forEach(parent =>{ console.log(` ${parent.Name}`) }) console.log() // Searh for pagination token, if found, set variable to next token if (String(response).includes("NextToken")){ paginationToken = response.NextToken }else{ finished = true } }) } } // Checks for status of job completion const getSQSMessageSuccess = async(sqsQueueUrl, startJobId) => { try { // Set job found and success status to false initially var jobFound = false var succeeded = false var dotLine = 0 // while not found, continue to poll for response while (jobFound == false){ var sqsReceivedResponse = await sqsClient.send(new ReceiveMessageCommand({QueueUrl:sqsQueueUrl, MaxNumberOfMessages:'ALL', MaxNumberOfMessages:10})); if (sqsReceivedResponse){ var responseString = JSON.stringify(sqsReceivedResponse) if (!responseString.includes('Body')){ if (dotLine < 40) { console.log('.') dotLine = dotLine + 1 }else { console.log('') dotLine = 0 }; stdout.write('', () => { console.log(''); }); await new Promise(resolve => setTimeout(resolve, 5000)); continue } } // Once job found, log Job ID and return true if status is succeeded for (var message of sqsReceivedResponse.Messages){ console.log("Retrieved messages:") var notification = JSON.parse(message.Body) var rekMessage = JSON.parse(notification.Message) var messageJobId = rekMessage.JobId if (String(rekMessage.JobId).includes(String(startJobId))){ console.log('Matching job found:') console.log(rekMessage.JobId) jobFound = true console.log(rekMessage.Status) if (String(rekMessage.Status).includes(String("SUCCEEDED"))){ succeeded = true console.log("Job processing succeeded.") var sqsDeleteMessage = await sqsClient.send(new DeleteMessageCommand({QueueUrl:sqsQueueUrl, ReceiptHandle:message.ReceiptHandle})); } }else{ console.log("Provided Job ID did not match returned ID.") var sqsDeleteMessage = await sqsClient.send(new DeleteMessageCommand({QueueUrl:sqsQueueUrl, ReceiptHandle:message.ReceiptHandle})); } } } return succeeded } catch(err) { console.log("Error", err); } }; // Start label detection job, sent status notification, check for success status // Retrieve results if status is "SUCEEDED", delete notification queue and topic const runLabelDetectionAndGetResults = async () => { try { const sqsAndTopic = await createTopicandQueue(); const startLabelDetectionRes = await startLabelDetection(roleArn, sqsAndTopic[1]); const getSQSMessageStatus = await getSQSMessageSuccess(sqsAndTopic[0], startLabelDetectionRes) console.log(getSQSMessageSuccess) if (getSQSMessageSuccess){ console.log("Retrieving results:") const results = await getLabelDetectionResults(startLabelDetectionRes) } const deleteQueue = await sqsClient.send(new DeleteQueueCommand({QueueUrl: sqsAndTopic[0]})); const deleteTopic = await snsClient.send(new DeleteTopicCommand({TopicArn: sqsAndTopic[1]})); console.log("Successfully deleted.") } catch (err) { console.log("Error", err); } }; runLabelDetectionAndGetResults()
-
- Java V2
-
This code is taken from the AWS Documentation SDK examples GitHub repository. See the full example here
. import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.rekognition.RekognitionClient; import software.amazon.awssdk.services.rekognition.model.StartLabelDetectionResponse; import software.amazon.awssdk.services.rekognition.model.NotificationChannel; import software.amazon.awssdk.services.rekognition.model.S3Object; import software.amazon.awssdk.services.rekognition.model.Video; import software.amazon.awssdk.services.rekognition.model.StartLabelDetectionRequest; import software.amazon.awssdk.services.rekognition.model.GetLabelDetectionRequest; import software.amazon.awssdk.services.rekognition.model.GetLabelDetectionResponse; import software.amazon.awssdk.services.rekognition.model.RekognitionException; import software.amazon.awssdk.services.rekognition.model.LabelDetectionSortBy; import software.amazon.awssdk.services.rekognition.model.VideoMetadata; import software.amazon.awssdk.services.rekognition.model.LabelDetection; import software.amazon.awssdk.services.rekognition.model.Label; import software.amazon.awssdk.services.rekognition.model.Instance; import software.amazon.awssdk.services.rekognition.model.Parent; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import java.util.List; //snippet-end:[rekognition.java2.recognize_video_detect.import] /** * Before running this Java V2 code example, set up your development environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class VideoDetect { private static String startJobId =""; public static void main(String[] args) { final String usage = "\n" + "Usage: " + " <bucket> <video> <queueUrl> <topicArn> <roleArn>\n\n" + "Where:\n" + " bucket - The name of the bucket in which the video is located (for example, (for example, myBucket). \n\n"+ " video - The name of the video (for example, people.mp4). \n\n" + " queueUrl- The URL of a SQS queue. \n\n" + " topicArn - The ARN of the Amazon Simple Notification Service (Amazon SNS) topic. \n\n" + " roleArn - The ARN of the AWS Identity and Access Management (IAM) role to use. \n\n" ; if (args.length != 5) { System.out.println(usage); System.exit(1); } String bucket = args[0]; String video = args[1]; String queueUrl = args[2]; String topicArn = args[3]; String roleArn = args[4]; Region region = Region.US_WEST_2; RekognitionClient rekClient = RekognitionClient.builder() .region(region) .credentialsProvider(ProfileCredentialsProvider.create("profile-name")) .build(); SqsClient sqs = SqsClient.builder() .region(Region.US_WEST_2) .credentialsProvider(ProfileCredentialsProvider.create("profile-name")) .build(); NotificationChannel channel = NotificationChannel.builder() .snsTopicArn(topicArn) .roleArn(roleArn) .build(); startLabels(rekClient, channel, bucket, video); getLabelJob(rekClient, sqs, queueUrl); System.out.println("This example is done!"); sqs.close(); rekClient.close(); } // snippet-start:[rekognition.java2.recognize_video_detect.main] public static void startLabels(RekognitionClient rekClient, NotificationChannel channel, String bucket, String video) { try { S3Object s3Obj = S3Object.builder() .bucket(bucket) .name(video) .build(); Video vidOb = Video.builder() .s3Object(s3Obj) .build(); StartLabelDetectionRequest labelDetectionRequest = StartLabelDetectionRequest.builder() .jobTag("DetectingLabels") .notificationChannel(channel) .video(vidOb) .minConfidence(50F) .build(); StartLabelDetectionResponse labelDetectionResponse = rekClient.startLabelDetection(labelDetectionRequest); startJobId = labelDetectionResponse.jobId(); boolean ans = true; String status = ""; int yy = 0; while (ans) { GetLabelDetectionRequest detectionRequest = GetLabelDetectionRequest.builder() .jobId(startJobId) .maxResults(10) .build(); GetLabelDetectionResponse result = rekClient.getLabelDetection(detectionRequest); status = result.jobStatusAsString(); if (status.compareTo("SUCCEEDED") == 0) ans = false; else System.out.println(yy +" status is: "+status); Thread.sleep(1000); yy++; } System.out.println(startJobId +" status is: "+status); } catch(RekognitionException | InterruptedException e) { e.getMessage(); System.exit(1); } } public static void getLabelJob(RekognitionClient rekClient, SqsClient sqs, String queueUrl) { List<Message> messages; ReceiveMessageRequest messageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .build(); try { messages = sqs.receiveMessage(messageRequest).messages(); if (!messages.isEmpty()) { for (Message message: messages) { String notification = message.body(); // Get the status and job id from the notification ObjectMapper mapper = new ObjectMapper(); JsonNode jsonMessageTree = mapper.readTree(notification); JsonNode messageBodyText = jsonMessageTree.get("Message"); ObjectMapper operationResultMapper = new ObjectMapper(); JsonNode jsonResultTree = operationResultMapper.readTree(messageBodyText.textValue()); JsonNode operationJobId = jsonResultTree.get("JobId"); JsonNode operationStatus = jsonResultTree.get("Status"); System.out.println("Job found in JSON is " + operationJobId); DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() .queueUrl(queueUrl) .build(); String jobId = operationJobId.textValue(); if (startJobId.compareTo(jobId)==0) { System.out.println("Job id: " + operationJobId ); System.out.println("Status : " + operationStatus.toString()); if (operationStatus.asText().equals("SUCCEEDED")) GetResultsLabels(rekClient); else System.out.println("Video analysis failed"); sqs.deleteMessage(deleteMessageRequest); } else{ System.out.println("Job received was not job " + startJobId); sqs.deleteMessage(deleteMessageRequest); } } } } catch(RekognitionException e) { e.getMessage(); System.exit(1); } catch (JsonMappingException e) { e.printStackTrace(); } catch (JsonProcessingException e) { e.printStackTrace(); } } // Gets the job results by calling GetLabelDetection private static void GetResultsLabels(RekognitionClient rekClient) { int maxResults=10; String paginationToken=null; GetLabelDetectionResponse labelDetectionResult=null; try { do { if (labelDetectionResult !=null) paginationToken = labelDetectionResult.nextToken(); GetLabelDetectionRequest labelDetectionRequest= GetLabelDetectionRequest.builder() .jobId(startJobId) .sortBy(LabelDetectionSortBy.TIMESTAMP) .maxResults(maxResults) .nextToken(paginationToken) .build(); labelDetectionResult = rekClient.getLabelDetection(labelDetectionRequest); VideoMetadata videoMetaData=labelDetectionResult.videoMetadata(); System.out.println("Format: " + videoMetaData.format()); System.out.println("Codec: " + videoMetaData.codec()); System.out.println("Duration: " + videoMetaData.durationMillis()); System.out.println("FrameRate: " + videoMetaData.frameRate()); List<LabelDetection> detectedLabels= labelDetectionResult.labels(); for (LabelDetection detectedLabel: detectedLabels) { long seconds=detectedLabel.timestamp(); Label label=detectedLabel.label(); System.out.println("Millisecond: " + seconds + " "); System.out.println(" Label:" + label.name()); System.out.println(" Confidence:" + detectedLabel.label().confidence().toString()); List<Instance> instances = label.instances(); System.out.println(" Instances of " + label.name()); if (instances.isEmpty()) { System.out.println(" " + "None"); } else { for (Instance instance : instances) { System.out.println(" Confidence: " + instance.confidence().toString()); System.out.println(" Bounding box: " + instance.boundingBox().toString()); } } System.out.println(" Parent labels for " + label.name() + ":"); List<Parent> parents = label.parents(); if (parents.isEmpty()) { System.out.println(" None"); } else { for (Parent parent : parents) { System.out.println(" " + parent.name()); } } System.out.println(); } } while (labelDetectionResult !=null && labelDetectionResult.nextToken() != null); } catch(RekognitionException e) { e.getMessage(); System.exit(1); } } // snippet-end:[rekognition.java2.recognize_video_detect.main] }
-
Build and run the code. The operation might take a while to finish. After it's finished, a list of the labels detected in the video is displayed. For more information, see Detecting labels in a video.