Kafka

Kafka

概念

Producer 消息的生产者

Consumer 消息的消费者

ConsumerGroup 消费者组,实现单播和广播的手段

Broker kafak服务集群节点,Kafka集群中的一台或多台服务器统称broker.

Topic Kafka处理资源的消息源(feeds of messages)的不同分类

Partition Topic 物理上的分组,一个topic可以分为多个partion,每个partion是一个有序 的队列。partion中每条消息都会被分配一个有序的Id(offset)

Message 消息,是通信的基本单位,每个producer可以向一个topic(主题)发布 一些消息

Producers 消息和数据生成者,向Kafka的一个topic发布消息的 过程叫做producers

Consumers 消息和数据的消费者,订阅topic并处理其发布的消费过程叫做consumers


kafak高吞吐原因分析

  1. 借助操作系统的文件处理,底层使用page cache 加快读写,所以kafka 服务器要加大page cache 设置提高吞吐
  2. 本身顺序读写,降低复杂度,提高吞吐
  3. 二进制格式而不是json避免序列化反序列化
  4. 对消息的批处理和压缩

特征

重播场景

高吞吐

拉数据模式,可高自定义的消费逻辑

适用于所有发布订阅的场景


Kafka 使用场景
RabbitMQ场景

生产者:

生产者生产消息不仅必须指定Topic,还可按照需求指定发往特定的分区

消费者:
Kafak消费消息后不会删除消息
 
== 消费者是通过offset偏移量来控制消费消息,offset持久化在消费者一方 ==
 
一个Topic可被一个或多个消费者消费
 
一个消费者可消费不同的多个topic
 
消费者不仅可以指定要消费的Topic,还可指定消费的分区
 
同一个Group可以定义一个或多个消费者
 
同一个Group中的多个消费者只会有一个收到消息
 
不同Group相同Topic的消费者都会收到消息(fanout)
Kafka 只保证分区内的记录是有序的,而不保证主题中不同分区的顺序
· Kafka作为一个集群,运行在一台或者多台服务器上.
· Kafka 通过 topic 对存储的流数据进行分类。
· 每条记录中包含一个key,一个value和一个timestamp(时间戳)。
Kafak争抢模式实现
多个消费者,同一个Topic同一个Group
Kafak广播模式实现
多个消费者,同一个topic,不同Group

C#

生产者
 public static async Task Main()
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "172.16.126.69:9092",
                ClientId= Dns.GetHostName(),
            };

            // Create a producer that can be used to send messages to kafka that have no key and a value of type string 
            using var producer  = new ProducerBuilder<string, string>(config).Build();

            var i = 0;
            while (true)
            {    
                // 输出内容到控制台
                Console.WriteLine("请输入Key");
                var key= Console.ReadLine();
                Console.WriteLine("请输入value");
                var value= Console.ReadLine();
                
                var message = new Message<string, string>
                {
                    Key = $"{key}",
                    Value = $"{value}"
                };

                // Send the message to our test topic in Kafka                
                var dr = await producer.ProduceAsync(new TopicPartition(topic: "test", partition: new Partition(0)), message);
            
                Console.WriteLine($"Produced message '{dr.Value}' to topic {dr.Topic}, partition {dr.Partition}, offset {dr.Offset}");
            }                                                                                              
        }
消费者

 class Program
    {
        static void Main()
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test",
                BootstrapServers = "172.16.126.69:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnableAutoCommit = false,
                //EnableAutoOffsetStore = false
                //SessionTimeoutMs = 1000 * 60 * 30, //30min
                //MaxPollIntervalMs = 1000 * 60 * 30,  //30min,30分钟不轮询kafak服务器将移除所在Group
            };

            using var consumer = new ConsumerBuilder<Ignore, string>(conf).Build();

            ////无法指定分区进行消费
            consumer.Subscribe("^admin");
            //var s = consumer.Assignment;

            //consumer.Assign(new TopicPartition("admin", new Partition(4)));

            //获取指定分区的当前偏移量
            //var unused = consumer.Position(new TopicPartition("", new Partition(0)));
            var cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) =>
            {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    //consumer.Assign(new TopicPartitionOffset("admin", new Partition(0),new Offset(391))); 
                    //consumer.Assign(new TopicPartition("admin", new Partition(0)));
                    var consumeResult = consumer.Consume(cts.Token);
                    if (consumeResult.Message.Headers.TryGetLastBytes("GameId", out byte[] lastHeader))
                    {
                        var header = Encoding.Default.GetString(lastHeader);
                        Console.WriteLine($"头为:{header}");
                    }

                    Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' from topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}");

                    //consumer.StoreOffset(consumeResult);
                    //获取当前topic分区数
                    //var count = consumer.Assignment.Count;
                    //重置分区 offset,可实现 回读,跳过
                    //consumeResult.TopicPartitionOffset = new TopicPartitionOffset("admin", new Partition(0), new Offset(321));
                    try
                    {
                        //业务执行成功后可能kafka事务提交失败,故拿到消费数据后先判断消息是否消费过
                        //(方案:为每条消息加一个标识符,业务执行成功后标识符状态也要同时(事务)改为已消费,可解决重复消费问题)
                        
                        consumer.Assign(new TopicPartitionOffset(consumeResult.Topic, new Partition(consumeResult.Partition), new Offset(consumeResult.Offset+1)));//设置本地偏移量+1,可省略
                        consumer.Commit(consumeResult);//将本地偏移量+1提交到kafka服务端
                        //等价于手动ACK,即使不提交,但是本地offset始终会+1,kafka服务端不会+1,
                        //为了让本地offset与服务端offset一致,需手动重置本地TopicPartitionOffset并调用Assign()方法重置本地kafka数据
                        
                        //consumer.StoreOffset(consumeResult);
                    }
                    catch (KafkaException e)
                    {
                        Console.WriteLine($"Commit error: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
            }
            finally
            {
                consumer.Close();
            }
        }
    }

指定分区消费

 using var consumer = new ConsumerBuilder<Ignore, string>(conf).Build();
  consumer.Assign(new TopicPartition("admin", new Partition(4)));//指定消费admin 主题中的4分区;指定分区消费后会变成广播模式,其他消费者依然会消费,不受此影响
  

ConsumerConfig属性说明
AutoCommitIntervalMs 自动提交频率,不建议使用,会较大概率发生消息丢失或者重复消费
EnableAutoCommit 是否自动提交偏移量,默认自动

注意

重新分配分区后,新增分区要等几分钟后才可被触发使用 多分区场景下,kafka服务端lag 有负数情况,目前官方修复为最多-1,此bug并不影响数据的准确性,客户端消费依然是正常

生成的消息指定了相同的key,此详细将发往同一个分区

消费者数量一定要小于分区数,否则会发生个多出来的消费者永远无法消费到消息image


为什么Kafka使用的是磁盘反而最终强于依靠内存的rabbitmq —

Kafka数据存储

索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。

position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。


rabbitmq与kafka区别


参考 —

官方文档

C#客户端使用

Apache kafka 工作原理介绍

第四章 从Kafka读取数据

Kafka为什么这么快

kafka磁盘为什么比内存快

page cache 对文件读写的影响

page cache 与kafka之间的那些事