自定义 Amazon MSK 配置 - Amazon Managed Streaming for Apache Kafka

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

自定义 Amazon MSK 配置

您可以使用 Amazon MSK 创建自定义MSK配置,在其中设置以下属性。未显式设置的属性将获得其在 亚马逊的默认MSK配置 中具有的值。有关配置属性的更多信息,请参阅 Apache Kafka 配置

Apache Kafka 配置属性
名称 描述
allow.everyone.if.no.acl.found 如果要将此属性设置为false,请先确保为集群定义 Apache Ka ACLs fka。如果将此属性设置为,false但没有先定义 Apache KafkaACLs,则将失去对集群的访问权限。如果发生这种情况,您可以再次更新配置并将此属性设置为 true,以重新获得对集群的访问权限。
auto.create.topics.enable 在服务器上启用主题自动创建。
compression.type 给定主题的最终压缩类型。可以将此属性设置为标准压缩编解码器(gzipsnappylz4zstd)。它还接受 uncompressed。此值等同于不压缩。如果将该值设置为 producer,则意味着保留生成器设置的原始压缩编解码器。

connections.max.idle.ms

空闲连接超时(以毫秒为单位)。如果连接的空闲时间超过您为此属性设置的值,服务器套接字处理器线程会关闭这些连接。
default.replication.factor 自动创建的主题的默认复制因子。
delete.topic.enable 启用删除主题操作。如果禁用此设置,则无法通过管理工具删除主题。
group.initial.rebalance.delay.ms 在组协调器执行第一次重新平衡之前,组协调器等待更多数据使用器加入新组的时间。更长的延迟时间意味着重新平衡可能会更少,但这会增加处理开始之前的时间。
group.max.session.timeout.ms 注册使用器的最长会话超时时间。超时时间越长,可供使用器用来处理检测信号之间的消息的时间就越多,但这会导致需要花更多时间来检测故障。
group.min.session.timeout.ms 注册使用器的最短会话超时时间。超时时间越短,故障检测的速度就会越快,但需要更频繁的使用器检测信号。这可能会使代理资源不堪重负。
leader.imbalance.per.broker.percentage 各代理允许的领导节点不平衡比率。如果各代理超过了此值,则控制器将触发领导平衡操作。此值以百分比的形式指定。
log.cleaner.delete.retention.ms 您希望 Apache Kafka 保留已删除的记录的时间量。最小值为 0。
log.cleaner.min.cleanable.ratio

此配置属性的值可介于 0 到 1 之间。此值决定日志压缩器尝试清理日志的频率(如果日志压缩已启用)。默认情况下,如果已压缩超过 50% 的日志,Apache Kafka 会避免清理日志。这一比率限制了日志因重复项浪费的最大空间(该值为 50%,这意味着最多有 50% 的日志可能是重复的)。更高的比率意味着更少、更高效的清理,但也意味着会浪费更多的日志空间。

log.cleanup.policy 超出保留时段的分段的默认清除策略。有效策略的逗号分隔列表。有效策略为 deletecompact。对于启用了分层存储的集群,有效策略为仅 delete
log.flush.interval.messages 将消息刷新到磁盘之前,日志分区上累积的消息的数量。
log.flush.interval.ms 任何主题中的消息在刷新到磁盘之前保存在内存中的最长时间(以毫秒为单位)。如果未设置此值,则使用 log.flush.scheduler.interval.ms 中的值。最小值为 0。
log.message.timestamp.difference.max.ms 代理收到消息时的时间戳与消息中指定的时间戳之间的最大时间差。如果 log.message.timestamp.type=CreateTime,则如果时间戳的差异超过此阈值,则消息将被拒绝。如果 log.message.timestamp.type LogAppendTime =,则忽略此配置。
log.message.timestamp.type 指定消息中的时间戳是消息创建时间还是日志追加时间。允许的值是 CreateTimeLogAppendTime
log.retention.bytes 删除日志前的最大日志大小。
log.retention.hours 删除日志文件前保留日志文件的小时数,它是 log.retention.ms 属性的三级属性。
log.retention.minutes 删除日志文件前保留日志文件的分钟数,它是 log.retention.ms 属性的二级属性。如果未设置此值,则使用 log.retention.hours 中的值。
log.retention.ms 删除日志文件前保留日志文件的毫秒数(以毫秒为单位)。如果未设置,则使用 log.retention.minutes 中的值。
log.roll.ms 推出新日志段之前的最长时间(以毫秒为单位)。如果未设置此属性,则使用 log.roll.hours 中的值。此属性的最小可能值为 1。
log.segment.bytes 单个日志文件的最大大小。
max.incremental.fetch.session.cache.slots 维护的增量提取会话的最大数量。
message.max.bytes

Kafka 允许的最大记录批处理大小。如果增加此值,并且存在大于 0.10.2 的使用器,则使用器的提取大小也必须增加,以便它们能够提取如此大的记录批处理。

最新的消息格式版本总是将消息分组到批处理中来提高效率。以前的消息格式版本不会将未压缩的记录分组到批处理中,在此情况下,此限制仅适用于单条记录。

可使用主题级别 max.message.bytes 配置为每个主题设置此值。

min.insync.replicas

当生成器将 acks 设置为 "all"(或 "-1")时,min.insync.replicas 中的值会指定为使写入被视为成功而必须确认写入的最小副本数。如果无法达到此最低限度,则生产者会引发异常( NotEnoughReplicas 或 NotEnoughReplicasAfterAppend)。

您可以使用 min.insync.replicas 和 acks 中的值来强制执行更大的持久性保证。例如,您可以创建复制因子为 3 的主题,将 min.insync.replicas 设置为 2,并在 acks 为 "all" 的情况下进行生成。这可确保在大多数副本未收到写操作时,创建器将引发异常。

num.io.threads 服务器用于处理请求的线程的数目,其中可能包括磁盘 I/O。
num.network.threads 服务器用于接收来自网络的请求并向其发送响应的线程数量。
num.partitions 每个主题的默认日志分区数。
num.recovery.threads.per.data.dir 在启动时用于日志恢复以及在关闭时用于刷新的每个数据目录的线程数量。
num.replica.fetchers 用于从源代理复制消息的提取器线程数。增大此值会增加跟踪器代理中的 I/O 并行度。
offsets.retention.minutes 当一个使用器组丢失其所有使用器(即变空)后,其偏移量将在此保留期内保留,然后被丢弃。对于独立使用器(即,使用手动分配的使用器),偏移量会在最后一次提交时间加上此保留期后过期。
offsets.topic.replication.factor 偏移量主题的复制因子。将此值设置得更高可以确保可用性。内部主题创建失败,直到集群大小满足此复制因子要求。
replica.fetch.max.bytes 尝试为每个分区提取的消息的字节数。这不是绝对最大值。如果提取的第一个非空分区中的第一个记录批处理大于此值,则将返回该记录批处理以确保取得进展。message.max.bytes(代理配置)或 max.message.bytes(主题配置)定义代理接受的最大记录批处理大小。
replica.fetch.response.max.bytes 整个提取响应预期的最大字节数。记录是分批提取的,如果提取的第一个非空分区中的第一个记录批处理大于此值,则仍将返回该记录批处理以确保取得进展。这不是绝对最大值。message.max.bytes(代理配置)或 max.message.bytes(主题配置)属性指定代理接受的最大记录批处理大小。
replica.lag.time.max.ms 如果关注者没有发送任何提取请求,或者至少在这段毫秒内没有消耗到领导者的日志结束偏移量,则领导者将该关注者从中移除。ISR

MinValue: 10000

MaxValue = 30000

replica.selector.class 实现 ReplicaSelector的完全限定类名。代理使用此值来查找首选读取副本。如果您使用的是 Apache Kafka 版本 2.4.1 或更高版本,并且希望允许使用器从最近的副本提取,请将此属性设置为 org.apache.kafka.common.replica.RackAwareReplicaSelector。有关更多信息,请参阅 Apache Kafka 版本 2.4.1(改用 2.4.1.1 版)
replica.socket.receive.buffer.bytes 网络请求的套接字接收缓冲区。
socket.receive.buffer.bytes 套接字服务器套接字的 SO_ RCVBUF 缓冲区。可为此属性设置的最小值为 -1。如果该值为 -1,则 Amazon MSK 使用默认操作系统。
socket.request.max.bytes 套接字请求中的最大字节数。
socket.send.buffer.bytes 套接字服务器套接字的 SO_ SNDBUF 缓冲区。可为此属性设置的最小值为 -1。如果该值为 -1,则 Amazon MSK 使用默认操作系统。
transaction.max.timeout.ms 事务的最大超时时间。如果客户请求的交易时间超过此值,则经纪商会在中返回错误 InitProducerIdRequest。这可防止客户端的超时时间过长,且此情况可能会导致使用器无法阅读事务中包含的主题。
transaction.state.log.min.isr 事务主题的已覆盖 min.insync.replicas 配置。
transaction.state.log.replication.factor 事务主题的复制因子。将此属性设置为较高的值可提高可用性。内部主题创建失败,直到集群大小满足此复制因子要求。
transactional.id.expiration.ms 事务协调器在其事务 ID 过期之前,等待接收当前事务的任何事务状态更新的时间(以毫秒为单位)。此设置还会影响生产者 ID 的过IDs期,因为它会导致生产者在最后一次使用给定生产者 ID 写入后过期时过期。如果由于主题的保留设置而删除了制作人 ID 的最后一次写入内容,则制作人IDs可能会提前过期。此属性的最小值为 1 毫秒。
unclean.leader.election.enable 表示不在ISR集合中的副本是否应作为主导作为最后手段,即使这可能会导致数据丢失。
zookeeper.connection.timeout.ms

ZooKeeper 模式集群。客户端等待与之建立连接的最长时间。 ZooKeeper如果未设置此值,则使用 zookeeper.session.timeout.ms 中的值。

MinValue = 6000

MaxValue (含)= 18000

zookeeper.session.timeout.ms

ZooKeeper 模式集群。Apache ZooKeeper 会话超时时间(以毫秒为单位)。

MinValue = 6000

MaxValue (含)= 18000

要了解如何创建自定义MSK配置、列出所有配置或对其进行描述,请参阅亚马逊MSK配置操作。要使用自定义MSK配置创建MSK集群,或者要使用新的自定义配置更新集群,请参阅亚马逊MSK:它是如何运作的

当您使用自定义MSK配置更新现有MSK集群时,MSKAmazon 会在必要时进行滚动重启,并使用最佳实践来最大限度地减少客户停机时间。例如,在 Amazon MSK 重启每个经纪商之后,Amazon MSK 会尝试让经纪商在配置更新期间获取该经纪商可能遗漏的数据,然后再移至下一个经纪商。

动态亚马逊MSK配置

除了 Amazon MSK 提供的配置属性外,您还可以动态设置不需要重启代理的集群级别和代理级别的配置属性。您可以动态设置一些配置属性。这些是 Apache Kafka 文档中代理配置下的表中未标记为只读的属性。有关动态配置和示例命令的信息,请参阅 Apache Kafka 文档中的更新代理配置

注意

您可以设置 advertised.listeners 属性,但不能设置 listeners 属性。

主题级的 Amazon 配置 MSK

您可以使用 Apache Kafka 命令为新主题和现有主题设置或修改主题级别的配置属性。有关主题级别的配置属性以及如何设置这些属性之示例的更多信息,请参阅 Apache Kafka 文档中的 Topic-Level Configs

亚马逊MSK配置状态

Amazon MSK 配置可以处于以下状态之一。要对配置执行操作,该配置必须处于 ACTIVEDELETE_FAILED 状态:

  • ACTIVE

  • DELETING

  • DELETE_FAILED