Lanyon 记录下日常工作与学习哈~,还有技术分享哦。。🎉

kafka client客户端实践及原理剖析

主要描述kafka java client的一些实践,以及对client操作数据的一些原理进行剖析。

kafka对集群部署环境的一些考虑,kafkaScala 语言和 Java 语言编写而成,编译之后的源代码就是普通的“.class”文件。本来部署到哪个操作系统应该都是一样的,但是不同操作系统的差异还是给 Kafka 集群带来了相当大的影响。

主流的操作系统有3种:windowslinuxmacOS,考虑到操作系统与kafka的适配性,linux系统显然要比其它两个更加合适部署kafka,主要在I/O模式的使用、数据网络传输效率、社区支持度三个方面支持比较好。

linux中的系统调用select函数属于I/O多路复用模型,大名鼎鼎的epoll系统调用则介于I/O 多路复用、信号驱动I/O模型。因此在这一点上将kafka 部署在Linux 上是有优势的,因为能够获得更高效的 I/O性能。零拷贝(Zero Copy)技术,就是当数据在磁盘和网络进行传输时避免昂贵的内核态数据拷贝从而实现快速的数据传输,Linux 平台实现了这样的零拷贝机制。

对于磁盘I/O性能,普通环境使用机械硬盘,不需要搭建RAID。对于磁盘容量,需根据消息数、留存时间预估磁盘容量,实际使用中建议预留20%30%的磁盘空间。对于网络带宽,需根据实际带宽速度和业务SLA预估服务器数量,对于千兆网络,建议每台服务器按照700mps来计算,避免大流量下的丢包问题。

集群配置中一些重要的参数Broker端的一些参数有:

1)log.dirs指定了broker需要使用的若干个文件目录路径,而log.dir结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的。当挂载多个目录时,其好处在于提升读写性能、能够实现故障转移;

2)zookeeper的配置,zookeeper.connect可以指定它的值为zk1:2181,zk2:2181,zk3:2181

3)第三组是与broker连接相关的,listeners学名叫监听器,其实就是通过PLAINTEXT://localhost:9092协议连接kafka 服务的。advertised.listeners,和 listeners 相比多了个advertised,其是在外网连接kafka的地址。

4)第四组参数是关于 topic 管理的,auto.create.topics.enable,是否允许自动创建topicunclean.leader.election.enable:是否允许 unclean Leader 选举。auto.leader.rebalance.enable:是否允许定期进行 Leader选举。

看一些topic级别的参数,在启动kafka时设置jvm的一些参数:

1)retention.ms:规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值。

2)retention.bytes:规定了要为该 topic 预留多大的磁盘空间。当前默认值是-1,表示可以无限使用磁盘空间。

3)KAFKA_HEAP_OPTS:指定堆大小,行业经验kafka默认堆栈大小为6gKAFKA_JVM_PERFORMANCE_OPTS:指定 GC 参数。

$> export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$> export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties

生产者消息分区机制原理剖析Kafka 的消息组织方式实际上是三级结构:主题 - 分区 - 消息。其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(scalability)。

所谓分区策略是决定生产者将消息发送到哪个分区的算法,常见的分区策略有轮询策略(Round-robin)、随机策略(Randomness)、按消息键保序策略(Key-ordering)。如下为自定义分区策略,从所有分区中找出哪些Leader 副本在南方的所有分区,然后随机挑选一个进行消息发送。

List partitions = cluster.partitionsForTopic(topic);
return partitions.stream()
  .filter(p ->isSouth(p.leader().host()))
  .map(PartitionInfo::partition).findAny().get();

kafka中,压缩可能发生在两个地方:生产者端和broker端。让broker端重新压缩消息有2种例外情况,broker端指定了和producer端不同的压缩算法,broker端发生了消息格式转换。一句话总结压缩和解压缩的话,producer端压缩、broker端保持、consumer端解压缩。

客户端一些高级功能interceptor,与spring中的拦截器原理是一样的(aop),不影响真实业务逻辑调用。生产者要想添加interceptor,只需继承ProducerInterceptor<String, String>类。

无消息丢失配置如何实现?producer 永远要使用带有回调通知的发送 API,也就是说不要使用producer.send(msg),而要使用 producer.send(msg, callback)。Kafka 中consumer 端的消息丢失就是这么一回事。要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。

设置acks = allacksProducer 的一个参数,代表了你对“已提交”消息的定义。

设置retries 为一个较大的值。这里的retries 同样是Producer 的参数,对应前面提到的Producer自动重试。

确保消息消费完成再提交。consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。

设置unclean.leader.election.enable = false、设置replication.factor >= 3、设置 min.insync.replicas > 1的配置。

public class ProducerClient {

    /* kafka用于防止消息丢失的因素: */
    // 1) 维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。(消费者端 维持先消费, 再提交offset)

    // 2) unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。
    // 如果一个Broker落后原先的 Leader 太多,那么

    public static void main(String[] args) {
        Properties kafkaProp = new Properties();
        kafkaProp.put("bootstrap.servers", "localhost:9092");
        // 则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”
        kafkaProp.put("acks", "all");
        kafkaProp.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProp.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 开启kafka的gzip压缩, 向broker发送的每条message都是压缩的
        kafkaProp.put("compression.type", "gzip");

        // 开启生产者消息的幂等性, 保证底层message消息只会发送一次(用空间换,msg会多传一个字段 用于去重)
        kafkaProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        // 2. producer生产者启用事务(在kafka 0.11开始的支持)
        kafkaProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-transactional");

        // 设置interceptor用于统计生产者发送消息延时
        List<String> interceptor = new ArrayList<>();
        interceptor.add("com.example.kakfa.interceptor.AvgLatencyProducerInterceptor");
        kafkaProp.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptor);

        Producer<String, String> client = new KafkaProducer<>(kafkaProp);
        // 1. send调用时使用回调函数callback, exception 可判断消息是否提交成功,消费者 “位移”类似于我们看书时使用的书签
        client.send(new ProducerRecord<>("", ""), (recordMetadata, exception) -> {
//            RecordMetadata var1, Exception var2
        });

        // 2. 在kafka-client客户端中使用transactional事务机制, 用于提交kafka message消息
        client.initTransactions();
        try {
            client.beginTransaction();
            client.send(new ProducerRecord<>("topicA", ""));
            client.send(new ProducerRecord<>("topicB", ""));
            client.commitTransaction();
        } catch (ProducerFencedException ex) {
            client.abortTransaction();
        }
    }
}

kafka社区决定采用tcp而不是http,能够利用TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力,目前已知的HTTP 库在很多编程语言中都略显简陋。

何时创建TCP 连接?目前我们的结论是这样的,TCP 连接是在创建 KafkaProducer 实例时建立的。TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。

何时关闭 TCP 连接?Producer 端关闭TCP连接的方式有两种:一种是用户主动关闭,一种是 Kafka 自动关闭。

开启kafka生产者消息幂等性、producer生产者启用事务需要在producerproperties中设置以下配置:

// 开启生产者消息的幂等性, 保证底层message消息只会发送一次(用空间换,msg会多传一个字段 用于去重)
kafkaProp.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 2. producer生产者启用事务(在kafka 0.11开始的支持)
kafkaProp.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "kafka-transactional");

Consumer GroupKafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(Consumer Instance),它们共享一个公共的 ID,这个 ID 被称为 Group ID。组内的所有消费者协调在一起来消费订阅主题(Subscribed Topics)的所有分区(Partition)

Rebalance 本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。比如某个 Group 下有 20 个 Consumer 实例,它订阅了一个具有 100 个分区的 Topic。正常情况下,Kafka 平均会为每个 Consumer 分配 5 个分区。这个分配的过程就叫 Rebalance。

那么 Consumer Group 何时进行 Rebalance 呢?Rebalance 的触发条件有 3 个。

1)组成员数发生变更。比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。

2)订阅主题数发生变更。Consumer Group 可以使用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile("t.*c")) 就表明该 Group 订阅所有以字母 t 开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,你新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance

3)订阅主题的分区数发生变更。Kafka 当前只能允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance