如何解析 OpenTelemetry 1.0.0 消息 - Amazon CloudWatch

如何解析 OpenTelemetry 1.0.0 消息

本节提供了有助于您开始解析 OpenTelemetry 1.0.0 的信息。

首先,您应获得特定于语言的绑定,这样您才能够以首选语言解析 OpenTelemetry 1.0.0 消息。

获取特定于语言的绑定
  • 根据您的首选语言选择以下步骤。

    • 若要使用 Java,请将以下 Maven 依赖项添加到您的 Java 项目中:OpenTelemetry Java >> 0.14.1

    • 若要使用任何其他语言,请按照下列步骤操作:

      1. 检查生成类中的列表,确保您的语言受支持。

      2. 按照下载协议缓冲区中的步骤安装 Protobuf 编译器。

      3. Release version 1.0.0 中下载 OpenTelemetry 1.0.0 ProtoBuf 定义。

      4. 确认您位于下载的 OpenTelemetry 1.0.0 ProtoBuf 定义的根文件夹中。创建一个 src 文件夹,然后运行命令以生成特定于语言的绑定。有关更多信息,请参阅生成类

        以下示例展示了如何生成 Javascript 绑定。

        protoc --proto_path=./ --js_out=import_style=commonjs,binary:src \ opentelemetry/proto/common/v1/common.proto \ opentelemetry/proto/resource/v1/resource.proto \ opentelemetry/proto/metrics/v1/metrics.proto \ opentelemetry/proto/collector/metrics/v1/metrics_service.proto

以下部分包括使用特定于语言的绑定的示例,您可以使用前面的说明来构建这些绑定。

Java

package com.example; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; public class MyOpenTelemetryParser { public List<ExportMetricsServiceRequest> parse(InputStream inputStream) throws IOException { List<ExportMetricsServiceRequest> result = new ArrayList<>(); ExportMetricsServiceRequest request; /* A Kinesis record can contain multiple `ExportMetricsServiceRequest` records, each of them starting with a header with an UnsignedVarInt32 indicating the record length in bytes: ------ --------------------------- ------ ----------------------- |UINT32|ExportMetricsServiceRequest|UINT32|ExportMetricsService... ------ --------------------------- ------ ----------------------- */ while ((request = ExportMetricsServiceRequest.parseDelimitedFrom(inputStream)) != null) { // Do whatever we want with the parsed message result.add(request); } return result; } }

Javascript

本示例假定具有所生成的绑定的根文件夹为 ./

函数 parseRecord 的数据参数可以是以下类型之一:

  • Uint8Array,此类型最佳

  • Buffer,在节点下最佳

  • Array.number,8 位整数

const pb = require('google-protobuf') const pbMetrics = require('./opentelemetry/proto/collector/metrics/v1/metrics_service_pb') function parseRecord(data) { const result = [] // Loop until we've read all the data from the buffer while (data.length) { /* A Kinesis record can contain multiple `ExportMetricsServiceRequest` records, each of them starting with a header with an UnsignedVarInt32 indicating the record length in bytes: ------ --------------------------- ------ ----------------------- |UINT32|ExportMetricsServiceRequest|UINT32|ExportMetricsService... ------ --------------------------- ------ ----------------------- */ const reader = new pb.BinaryReader(data) const messageLength = reader.decoder_.readUnsignedVarint32() const messageFrom = reader.decoder_.cursor_ const messageTo = messageFrom + messageLength // Extract the current `ExportMetricsServiceRequest` message to parse const message = data.subarray(messageFrom, messageTo) // Parse the current message using the ProtoBuf library const parsed = pbMetrics.ExportMetricsServiceRequest.deserializeBinary(message) // Do whatever we want with the parsed message result.push(parsed.toObject()) // Shrink the remaining buffer, removing the already parsed data data = data.subarray(messageTo) } return result }

Python

您必须自行读取 var-int 分隔符或使用内部方法 _VarintBytes(size)_DecodeVarint32(buffer, position)。它们会返回刚好位于缓冲区中大小字节之后的位置。读取端构建一个新的缓冲区,该缓冲区仅限于读取消息的字节。

size = my_metric.ByteSize() f.write(_VarintBytes(size)) f.write(my_metric.SerializeToString()) msg_len, new_pos = _DecodeVarint32(buf, 0) msg_buf = buf[new_pos:new_pos+msg_len] request = metrics_service_pb.ExportMetricsServiceRequest() request.ParseFromString(msg_buf)

Go

使用 Buffer.DecodeMessage()

C#

使用 CodedInputStream。此类可以读取分隔了大小的消息。

C++

google/protobuf/util/delimited_message_util.h 中描述的函数可以读取分隔了大小的消息。

其他语言

有关使用其他语言,请参阅下载协议缓冲区

在实现解析器时,请注意,一条 Kinesis 记录可以包含多个 ExportMetricsServiceRequest 协议缓冲区消息,每个消息都以具有指示记录长度(字节)的 UnsignedVarInt32 的标头开头。