Use StartLiveTail with an AWS SDK - Amazon CloudWatch Logs

Use StartLiveTail with an AWS SDK

The following code examples show how to use StartLiveTail.

.NET
AWS SDK for .NET

Include the required files.

using Amazon; using Amazon.CloudWatchLogs; using Amazon.CloudWatchLogs.Model;

Start the Live Tail session.

var client = new AmazonCloudWatchLogsClient(); var request = new StartLiveTailRequest { LogGroupIdentifiers = logGroupIdentifiers, LogStreamNames = logStreamNames, LogEventFilterPattern = filterPattern, }; var response = await client.StartLiveTailAsync(request); // Catch if request fails if (response.HttpStatusCode != System.Net.HttpStatusCode.OK) { Console.WriteLine("Failed to start live tail session"); return; }

You can handle the events from the Live Tail session in two ways:

/* Method 1 * 1). Asynchronously loop through the event stream * 2). Set a timer to dispose the stream and stop the Live Tail session at the end. */ var eventStream = response.ResponseStream; var task = Task.Run(() => { foreach (var item in eventStream) { if (item is LiveTailSessionUpdate liveTailSessionUpdate) { foreach (var sessionResult in liveTailSessionUpdate.SessionResults) { Console.WriteLine("Message : {0}", sessionResult.Message); } } if (item is LiveTailSessionStart) { Console.WriteLine("Live Tail session started"); } // On-stream exceptions are processed here if (item is CloudWatchLogsEventStreamException) { Console.WriteLine($"ERROR: {item}"); } } }); // Close the stream to stop the session after a timeout if (!task.Wait(TimeSpan.FromSeconds(10))){ eventStream.Dispose(); Console.WriteLine("End of line"); }
/* Method 2 * 1). Add event handlers to each event variable * 2). Start processing the stream and wait for a timeout using AutoResetEvent */ AutoResetEvent endEvent = new AutoResetEvent(false); var eventStream = response.ResponseStream; using (eventStream) // automatically disposes the stream to stop the session after execution finishes { eventStream.SessionStartReceived += (sender, e) => { Console.WriteLine("LiveTail session started"); }; eventStream.SessionUpdateReceived += (sender, e) => { foreach (LiveTailSessionLogEvent logEvent in e.EventStreamEvent.SessionResults){ Console.WriteLine("Message: {0}", logEvent.Message); } }; // On-stream exceptions are captured here eventStream.ExceptionReceived += (sender, e) => { Console.WriteLine($"ERROR: {e.EventStreamException.Message}"); }; eventStream.StartProcessing(); // Stream events for this amount of time. endEvent.WaitOne(TimeSpan.FromSeconds(10)); Console.WriteLine("End of line"); }
  • For API details, see StartLiveTail in AWS SDK for .NET API Reference.

Go
SDK for Go V2

Include the required files.

import ( "context" "log" "time" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types" )

Handle the events from the Live Tail session.

func handleEventStreamAsync(stream *cloudwatchlogs.StartLiveTailEventStream) { eventsChan := stream.Events() for { event := <-eventsChan switch e := event.(type) { case *types.StartLiveTailResponseStreamMemberSessionStart: log.Println("Received SessionStart event") case *types.StartLiveTailResponseStreamMemberSessionUpdate: for _, logEvent := range e.Value.SessionResults { log.Println(*logEvent.Message) } default: // Handle on-stream exceptions if err := stream.Err(); err != nil { log.Fatalf("Error occured during streaming: %v", err) } else if event == nil { log.Println("Stream is Closed") return } else { log.Fatalf("Unknown event type: %T", e) } } } }

Start the Live Tail session.

cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic("configuration error, " + err.Error()) } client := cloudwatchlogs.NewFromConfig(cfg) request := &cloudwatchlogs.StartLiveTailInput{ LogGroupIdentifiers: logGroupIdentifiers, LogStreamNames: logStreamNames, LogEventFilterPattern: logEventFilterPattern, } response, err := client.StartLiveTail(context.TODO(), request) // Handle pre-stream Exceptions if err != nil { log.Fatalf("Failed to start streaming: %v", err) } // Start a Goroutine to handle events over stream stream := response.GetStream() go handleEventStreamAsync(stream)

Stop the Live Tail session after a period of time has elapsed.

// Close the stream (which ends the session) after a timeout time.Sleep(10 * time.Second) stream.Close() log.Println("Event stream closed")
  • For API details, see StartLiveTail in AWS SDK for Go API Reference.

Java
SDK for Java 2.x

Include the required files.

import io.reactivex.FlowableSubscriber; import io.reactivex.annotations.NonNull; import org.reactivestreams.Subscription; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsAsyncClient; import software.amazon.awssdk.services.cloudwatchlogs.model.LiveTailSessionLogEvent; import software.amazon.awssdk.services.cloudwatchlogs.model.LiveTailSessionStart; import software.amazon.awssdk.services.cloudwatchlogs.model.LiveTailSessionUpdate; import software.amazon.awssdk.services.cloudwatchlogs.model.StartLiveTailRequest; import software.amazon.awssdk.services.cloudwatchlogs.model.StartLiveTailResponseHandler; import software.amazon.awssdk.services.cloudwatchlogs.model.CloudWatchLogsException; import software.amazon.awssdk.services.cloudwatchlogs.model.StartLiveTailResponseStream; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicReference;

Handle the events from the Live Tail session.

private static StartLiveTailResponseHandler getStartLiveTailResponseStreamHandler( AtomicReference<Subscription> subscriptionAtomicReference) { return StartLiveTailResponseHandler.builder() .onResponse(r -> System.out.println("Received initial response")) .onError(throwable -> { CloudWatchLogsException e = (CloudWatchLogsException) throwable.getCause(); System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); }) .subscriber(() -> new FlowableSubscriber<>() { @Override public void onSubscribe(@NonNull Subscription s) { subscriptionAtomicReference.set(s); s.request(Long.MAX_VALUE); } @Override public void onNext(StartLiveTailResponseStream event) { if (event instanceof LiveTailSessionStart) { LiveTailSessionStart sessionStart = (LiveTailSessionStart) event; System.out.println(sessionStart); } else if (event instanceof LiveTailSessionUpdate) { LiveTailSessionUpdate sessionUpdate = (LiveTailSessionUpdate) event; List<LiveTailSessionLogEvent> logEvents = sessionUpdate.sessionResults(); logEvents.forEach(e -> { long timestamp = e.timestamp(); Date date = new Date(timestamp); System.out.println("[" + date + "] " + e.message()); }); } else { throw CloudWatchLogsException.builder().message("Unknown event type").build(); } } @Override public void onError(Throwable throwable) { System.out.println(throwable.getMessage()); System.exit(1); } @Override public void onComplete() { System.out.println("Completed Streaming Session"); } }) .build(); }

Start the Live Tail session.

CloudWatchLogsAsyncClient cloudWatchLogsAsyncClient = CloudWatchLogsAsyncClient.builder() .credentialsProvider(ProfileCredentialsProvider.create()) .build(); StartLiveTailRequest request = StartLiveTailRequest.builder() .logGroupIdentifiers(logGroupIdentifiers) .logStreamNames(logStreamNames) .logEventFilterPattern(logEventFilterPattern) .build(); /* Create a reference to store the subscription */ final AtomicReference<Subscription> subscriptionAtomicReference = new AtomicReference<>(null); cloudWatchLogsAsyncClient.startLiveTail(request, getStartLiveTailResponseStreamHandler(subscriptionAtomicReference));

Stop the Live Tail session after a period of time has elapsed.

/* Set a timeout for the session and cancel the subscription. This will: * 1). Close the stream * 2). Stop the Live Tail session */ try { Thread.sleep(10000); } catch (InterruptedException e) { throw new RuntimeException(e); } if (subscriptionAtomicReference.get() != null) { subscriptionAtomicReference.get().cancel(); System.out.println("Subscription to stream closed"); }
  • For API details, see StartLiveTail in AWS SDK for Java 2.x API Reference.

JavaScript
SDK for JavaScript (v3)

Include the required files.

import { CloudWatchLogsClient, StartLiveTailCommand } from "@aws-sdk/client-cloudwatch-logs";

Handle the events from the Live Tail session.

async function handleResponseAsync(response) { try { for await (const event of response.responseStream) { if (event.sessionStart !== undefined) { console.log(event.sessionStart); } else if (event.sessionUpdate !== undefined) { for (const logEvent of event.sessionUpdate.sessionResults) { const timestamp = logEvent.timestamp; const date = new Date(timestamp); console.log("[" + date + "] " + logEvent.message); } } else { console.error("Unknown event type"); } } } catch (err) { // On-stream exceptions are captured here console.error(err) } }

Start the Live Tail session.

const client = new CloudWatchLogsClient(); const command = new StartLiveTailCommand({ logGroupIdentifiers: logGroupIdentifiers, logStreamNames: logStreamNames, logEventFilterPattern: filterPattern }); try{ const response = await client.send(command); handleResponseAsync(response); } catch (err){ // Pre-stream exceptions are captured here console.log(err); }

Stop the Live Tail session after a period of time has elapsed.

/* Set a timeout to close the client. This will stop the Live Tail session. */ setTimeout(function() { console.log("Client timeout"); client.destroy(); }, 10000);
  • For API details, see StartLiveTail in AWS SDK for JavaScript API Reference.

Kotlin
SDK for Kotlin

Include the required files.

import aws.sdk.kotlin.services.cloudwatchlogs.CloudWatchLogsClient import aws.sdk.kotlin.services.cloudwatchlogs.model.StartLiveTailRequest import aws.sdk.kotlin.services.cloudwatchlogs.model.StartLiveTailResponseStream import kotlinx.coroutines.flow.takeWhile

Start the Live Tail session.

val client = CloudWatchLogsClient.fromEnvironment() val request = StartLiveTailRequest { logGroupIdentifiers = logGroupIdentifiersVal logStreamNames = logStreamNamesVal logEventFilterPattern = logEventFilterPatternVal } val startTime = System.currentTimeMillis() try { client.startLiveTail(request) { response -> val stream = response.responseStream if (stream != null) { /* Set a timeout to unsubcribe from the flow. This will: * 1). Close the stream * 2). Stop the Live Tail session */ stream.takeWhile { System.currentTimeMillis() - startTime < 10000 }.collect { value -> if (value is StartLiveTailResponseStream.SessionStart) { println(value.asSessionStart()) } else if (value is StartLiveTailResponseStream.SessionUpdate) { for (e in value.asSessionUpdate().sessionResults!!) { println(e) } } else { throw IllegalArgumentException("Unknown event type") } } } else { throw IllegalArgumentException("No response stream") } } } catch (e: Exception) { println("Exception occurred during StartLiveTail: $e") System.exit(1) }
  • For API details, see StartLiveTail in AWS SDK for Kotlin API reference.

Python
SDK for Python (Boto3)

Include the required files.

import boto3 import time from datetime import datetime

Start the Live Tail session.

# Initialize the client client = boto3.client('logs') start_time = time.time() try: response = client.start_live_tail( logGroupIdentifiers=log_group_identifiers, logStreamNames=log_streams, logEventFilterPattern=filter_pattern ) event_stream = response['responseStream'] # Handle the events streamed back in the response for event in event_stream: # Set a timeout to close the stream. # This will end the Live Tail session. if (time.time() - start_time >= 10): event_stream.close() break # Handle when session is started if 'sessionStart' in event: session_start_event = event['sessionStart'] print(session_start_event) # Handle when log event is given in a session update elif 'sessionUpdate' in event: log_events = event['sessionUpdate']['sessionResults'] for log_event in log_events: print('[{date}] {log}'.format(date=datetime.fromtimestamp(log_event['timestamp']/1000),log=log_event['message'])) else: # On-stream exceptions are captured here raise RuntimeError(str(event)) except Exception as e: print(e)
  • For API details, see StartLiveTail in AWS SDK for Python (Boto3) API Reference.

For a complete list of AWS SDK developer guides and code examples, see Using CloudWatch Logs with an AWS SDK. This topic also includes information about getting started and details about previous SDK versions.