0%

Java基于C++实现,与C++的一大区别在于它能够进行自动内存管理。

先从垃圾说起,所谓的垃圾就是无用的对象,主要存放在堆区中。对于C++来说,创建对象的时候需要malloc,用完对象之后需要free,以免无用对象占据内存空间,这一切的操作都是手动的。对于Java来说,通过Java虚拟机垃圾收集器可以自动化的实现对象内存的分配及回收

jvm-gc

那么,如何标识哪些内存对象是无用的呢?

判断对象是否应该存活的算法有:引用计数算法、根搜索算法。

引用计数算法,给每一个对象添加一个引用计数器,只要一个对象被引用了,那么计数值就加1,如果引用失效了,那么计数值就减1,直到为0,那么这个对象就可以被回收了,这种方式还是比较简单高效的。但是,如果存在循环引用引用计数算法就无能为力了,引用会一直存在,计数值不会为0,从而造成内存无法被回收。

jvm-ref-d

根搜索算法,通过一系列名为GC Roots的对象为起始点,从这些节点向下搜索,如果对象是可达的,那么这个对象就是存活的,反之,就是不存活的,这个对象就是可回收的。

jvm-ref-r

决定对象是否应该存活的直接影响因素就是引用,在Java中存在有四种引用:强引用、软引用、弱引用、虚引用。

强引用 普遍存在,类似Object obj = new Object(),只要对象引用还存在,对象的内存就不会被回收。
软引用 表示有用但非必需的对象,当内存不够的时候,软引用对象就可以被回收。
弱引用 表示非必需对象,只要发生了GC,弱引用对象就可能被回收。
虚引用 用于在对象被回收的时候能够收到通知。

在标识对象存活的基础上,需要利用垃圾收集算法来实现内存的回收。

首先是标记-清除算法,分为标记清除两个阶段。标记的对象是可回收对象,先进行标记,然后再去执行清除。这种算法的缺点就是会产生大量的内存碎片,使得连续内存不足,导致GC的触发,并且标记、清除阶段效率都不高。

jvm-ms

为了解决标记-清除算法的缺点,又产生了复制算法。将内存空间分为2个区域,各占50%,1个区域正常使用,1个区域空闲着,每次发生GC的时候,会将使用着的区域的存活对象复制到另一个区域,并按照顺序存放。通过这种方式,解决了内存碎片的问题,但同时又浪费了50%的内存空间。

jvm-gc-cp由于新生代的对象,98%左右都是朝生夕死的,将新生代分为Eden区、Surviror0区、Survivor1区,并且比例为8:1:1,使用复制算法也不至于浪费过多内存空间,并且不会产生内存碎片。

对于老年代里头的对象,一般存活率都是比较高的,如果采用标记-清除算法,那还不如复制算法呢,但如果采用复制算法,又因为老年代对象存活率高,频繁的移动内存中的对象,难免会造成回收效率的下降。因此,又产生了一种标记-整理算法,这种算法不会产生内存碎片,但效率比起复制算法来说也不高,但适用于老年代。

jvm-mc

根据以上信息可以得知,复制算法更适用于新生代的内存回收,而标记-清除算法标记-整理算法更适用于老年代的内存回收,因此,垃圾收集器基本都是基于分代收集算法,将内存区域划分为不同年代,按照每个区域合适的垃圾回收算法回收内存。

实际上,垃圾收集器在分代收集算法的基础上以串行、并行、并发方式提供。
新生代 老年代
串行 Serial收集器 Serial Old收集器
并行 ParNew收集器、Parallel Scavenge收集器 Parallel Old收集器
并发 CMS收集器
其它 G1收集器 G1收集器

新生代的内存回收(YGC、Minor GC、Young GC)使用的复制算法。Serial收集器是单线程的,适用于Client模式,而ParNew收集器与Serial收集器相比除了采用多线程没有多大区别,适用于Server模式,Parallel Scavenge收集器表面上看起来和ParNew收集器没有多大区别,但实际上Parallel Scavenge收集器更关注的是吞吐量,通过减少Stop The World时间来提高吞吐量,同时可能导致GC次数更加频繁

jvm-gc-new

老年代的内存回收(Major GC、FGC、Full GC同时也会回收新生代)使用的标记-整理算法、标记-清除算法。Serial Old收集器是单线程的,采用标记-整理算法,适用于Client模式。Parallel Old收集器是Parallel Scavenge收集器的老年代版本,是多线程的,注重吞吐量,采用的标记-整理算法。CMS收集器采用的标记-清除算法,注重低停顿,在初始标记重新标记阶段会Stop The World,而并发标记并发清除阶段与用户线程并行存在,由于采用的标记-清除算法,因此会产生内存碎片,导致出现Concurrent Mode Failure,触发Serial Old收集器来执行标记-整理。

jvm-gc-old在CMS收集器的基础上又产生了G1收集器,与之不同的是G1收集器采用的标记-整理算法,不会产生内存碎片并且没有明显的分代概念,而是将内存划分为若干个固定大小区域,可以在保证吞吐量的同时完成低停顿的内存回收,在回收内存的时候不会全区域的去回收,而是优先回收内存垃圾比较多的区域。

jvm-gc-g1

内存的分配也随着JDK的发展与各种技术的提升而更加智能。

对象优先在Eden区分配
大部分的对象都具备朝生夕死的特点,更适合在新生代中分配,并且Minor GC速度比较快。

大对象直接进入老年代
由于新生代采用的复制算法,Minor GC会比较频繁,因此大对象最好直接进老年代,避免发生频繁的内存复制。当然,也有一些朝生夕死的大对象,如果过多这种大对象进入了老年代可能会导致Major GC的频繁发生,甚至导致Full GC的出现。可以通过配置-XX:PretenureSizeThreshold参数来定义直接进入老年代的大对象的大小门槛。

长期存活对象将进入老年代
如果对象长期存活,那么可能是有用的,最好是晋升到老年代,否则随着多次的Minor GC会不断的被复制来复制去,同时也比较占据新生代的内存空间。可以通过配置-XX:MaxTenuringThreshold参数来修改新生代中的对象晋升到老年代的历经GC次数,默认是15次。

动态年龄判断
通过动态年龄判断,只要Surviror区中相同年龄的对象大小总和超过Surviror区大小的一半,那么就允许新生代对象不必历经MaxTenuringThreshold配置的GC次数提前晋升到老年代。

空间分配担保
在发生新生代Minor GC的时候,会去判断要晋升到老年代的对象大小总和是否超过老年代剩余空间的大小,如果超过了,并且配置了HandlePromotionFailure,那么就会进行Minor GC,否则的话,会执行一次Full GC。

逃逸分析、栈上分配
逃逸分析分为线程逃逸、方法逃逸。对于线程逃逸来说,如果一个对象只在一个线程中使用,那么这个对象就逃逸了,不需要在堆中分配内存。对于方法逃逸来说,如果一个对象只在方法内部使用,并且被外部方法所引用,那么这个对象就逃逸了,不需要在堆中分配内存。

Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同的数据区域。

jvm

这些区域包含:虚拟机栈、本地方法栈、程序计数器、堆、方法区。
每个区域都有各自的生命周期以及各自的作用范围及功能。

首先是虚拟机栈,它表示的是Java方法在运行时候的内存模型

jvm-stack

方法被调用的时候都会去创建栈帧,从方法执行的开始结束又对应着栈帧虚拟机栈中的入栈出栈
栈帧被用来保存局部变量表操作数栈动态链接方法出口等信息。
jvm-stack-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.sankuai.test;

import com.alibaba.fastjson.JSON;

public class Test {
public static void main(String[] args) {
String json = "{\"id\":1,\"name\":\"1\"}";
int a = 1, b = 2;
int c = a + b + 6;
System.out.println(a + b);
System.out.println(c);
System.out.println(JSON.parseObject(json, User.class));
}
}

局部变量表中包含了基本数据类型对象引用类型返回地址

  • 基本数据类型:byte、short、int、long、float、double、boolean、char。
  • 对象引用类型:对象的起始地址(数组)或者对象的句柄(类)。
  • 返回地址:指向的是一条字节码指令的地址。

操作数栈,本质也是一个栈,入栈和出栈代表着变量的操作,比如:两个变量相加。
动态链接,所调用的对象会在常量池中存储并用标号标识,而常量池会指向对象的真实地址。
jvm-cp-0jvm-cp方法出口,当栈帧出栈之后,对应的这个方法应该返回到调用它的地方。

接着是本地方法栈,它跟虚拟机栈是类似的,只不过它调用的不是Java方法,而是Native本地方法
另外,虚拟机栈本地方法栈都是线程私有的,生命周期跟随着线程。
当线程越多、栈的深度越深,就可能造成栈溢出StackOverFlowError,甚至是OutOfMemoryError

程序计数器标识的是线程正在执行的字节码地址,由于多线程的存在及CPU可能会来回切换线程。
多个线程之间的程序计数器是相互独立的,可以看作是各自的行号指示器

是Java虚拟机中内存占用最大的一块区域,也被所有的线程所共享,主要是用来存放对象实例。所有的对象实例和数组都要在上分配内存,不过后来出现了栈上分配、标量替换、逃逸分析等技术,使得对象不一定需要在堆上分配内存。一个堆包含了:新生代(Eden、S0、S1)老年代,比例通常是1:2

jvm-heap也是垃圾收集器GC的重点区域,目前普遍采用的是分代收集算法

最后是方法区,它跟堆一样,是被所有线程所共享的,主要存储的是:被虚拟机所加载的类的信息、常量、静态变量、即使编译器编译之后的代码等信息,在JDK1.8之后,方法区元空间所替代,并且被挪到了堆外
方法区中存在着一个运行时常量池,它用来存储字面量符号引用
jvm-cp-2

MySQL的复制是一种高性能高可用的解决方案,在主从之间进行复制、重放数据。

mysql-repl

复制(replication)首先会把服务器的数据更改写入binlog二进制文件中,服务器的I/O线程binlog拉取数据,存入本地的relay log中继日志中,然后由SQL线程读取并重放到从服务器中。

由于整个复制的过程是异步实时的,所以可能会存在在主从同步的延迟现象,这个延迟可能是一秒、一分、一小时,甚至一天,数据量越大,主服务器的压力就越大。

MySQL支持的复制方式包含有:row(行)、statement(语句)、mixed(行+语句)。

首先是基于的复制方式,这种方式只会在binlog文件中记录最终的修改的记录的结果,在一些复杂查询下却返回较少数据情况下比较适用,但兼容性也不好,一旦修改列的话,就会出现问题,并且随着行数的增多,binlog文件也会膨胀

接着是基于语句的复制方式,这种方式会在binlog文件中记录操作的SQL语句,相比基于行的方式来说,SQL语句更容易理解且遇到问题的时候也较好定位,在占用的体积上来说也了许多。当然,基于语句的复制方式也存在着数据操作范围不可控的风险,同样,一旦修改列的话,也可能会出现问题。

最后是mixed这种方式,结合了基于的复制方式和基于语句的复制方式,取二者的优点。

MySQL本身也是在文件系统的基础上发展而来,因为锁的存在使之有所不同。
MySQL作为一种数据库软件,难免会存在对其共享资源并发访问,为了协调和管理不同资源的并发访问,也就产生了锁机制,因为锁机制的存在为数据库提供了数据的完整性一致性

锁的级别来分锁可分为:行级锁、表级锁、页级锁。
锁的类型来分锁可分为:共享锁、排它锁(独占锁)。
为了协调行锁、表锁产生了:意向锁(表级锁)。

共享锁,允许事务去读取数据。
排它锁,允许事务去修改或删除数据。
意向锁,获取行级锁的时候,自动添加的表级锁,包含:意向共享锁、意向排它锁。

对于MyISAM存储引擎,只支持表锁,而InnoDB存储引擎则支持行锁、表锁

MyISAM存储引擎修改、删除数据的时候,会产生排它锁,锁定的整张表,并发写入性能较差,而读取的时候产生的是共享锁,不会锁定表,读取性能就比较好。

InnoDB存储引擎修改、删除数据的时候,会产生排它锁,锁定的特定索引记录,一般不会影响表中的其它行,并发写入性能较好,而读取的时候产生的是共享锁,不会锁定表和行,读取性能较好。

行锁锁定的是索引记录,而不是记录行,如果没有索引,则使用隐式索引进行锁定。

当一张表某些行已经获取了排它锁,在表中会产生一个意向排它锁,如果此时有一个事务要来锁定整张表,那么一看有意向排它锁的存在,该事务就会被阻塞,通过意向锁直接就可以知道能不能锁定表,不需要逐行去遍历检测是否有排它锁,通过意向锁高效地协调了行锁和表锁的关系。

行级锁按照锁定范围来分,又分为三种:

  • Record Lock 单行记录上的锁。
  • Gap Lock 间隙锁,锁定一个范围,不包含记录本身。
  • Next-Key Lock 锁定一个范围,包含记录本身,用于解决幻读问题。

当然,锁也是有利有弊的,也可能出现死锁的情况。
两个或两个以上的事务在执行过程中,因争夺资源而造成一种相互等待的现象,称为死锁

最后,也是因为锁的存在,丰富了后续事务的功能。

MySQL通过设计一种机制,使得数据能够完整地从一种一致性状态切换到另一种一致性状态,这种机制称为事务。

事务包含有四大特性:原子性(A)、一致性(C)、隔离性(I)、持久性(D),简称酸性。
原子性:事务中的操作,要么全部成功,要么全部失败,不可切分。
一致性:事务将数据库从一种一致性状态转变成另外一种一致性状态,并且保证数据的完整性。
隔离性:又称并发控制,事务在提交之前对于其它事务是处于不可见的状态的。
持久性:事务一旦提交,结果就是永久性的,不会因为数据库宕机而丢失数据。

原子性持久性是通过redo日志实现的,一致性是通过undo日志实现的,隔离性是通过锁机制实现的。

从本质上来说,原子性也是为了配合持久性而存在的,当事务的一部分写入redo日志后,发生了崩溃、断电,那么根据原子性来说,该次事务应当恢复,那么对于已经持久化到日志文件中的数据,就必须要通过回溯来撤销。在InnoDB存储引擎中,redo重做日志对应的就是ib_logfile0ib_logfile1
redo

接着,事务要进行回滚,那就需要通过一致性来保障,而undo日志就是用来实现一致性的,在undo日志中保存了多个版本的事务的一些信息,通过undo日志,将事务rollback到修改之前的样子。

在此,不得不提的是MySQL的MVCC多版本并发控制,它也是通过undo日志来实现的。
MVCC是通过在每一数据行后头添加2个隐藏字段create versiondelete version以及每次开启一个事务会初始化一个事务id。新增一条数据的时候,create version的值就等于事务id,删除数据的时候,delete version就等于事务id,更新数据的时候会先删后增,在undo日志中就会存在2条数据,一条delete version就等于事务id,一条create version的值等于事务id

在事务执行过程中,可能会同时存在其它的事务,而多个事务之前需要相互隔离,也就是要做到并发控制,锁就是用来实现隔离性的。MySQL的事务的隔离级别包含:Read Uncommitted读未提交、Read Committed读已提交、Read Repeatable可重复读、Serializable串行化。其中,读已提交可重复读是基于MVCC多版本并发控制来实现的。

锁,为事务的并发控制带来了好处,同时也带来了坏处,包括:脏读、不可重复读、幻读。

脏读,指的是一个事务读到了另一个事务未提交的内容,一旦另一个事务回滚了,就出现了脏数据
不可重复读,指的是同一个事务使用同一句SQL进行多次读取,返回不同的结果。
幻读,指的是一个事务在进行增删的时候,某些已经确定不会出现的记录突然出现。

要解决脏读,那就需要至少设置隔离级别为:Read Committed读已提交。
要解决不可重复读,那就需要至少设置隔离级别为:Read Repeatable可重复读。
要解决幻读,那就需要设置隔离级别为:Serializable串行化或者采用Next-Key Lock间隙锁。

在MySQL数据库中,索引是一种很重要的提升查询性能的存储结构。

索引,就像是书本的目录一样,通过查询索引,可以快速的定位到数据所处位置,减少磁盘I/O次数,提高性能。
InnoDB存储引擎中,支持的索引包含:B+树全文索引哈希索引

本文将主要聚焦在由B+树实现的索引结构,全文索引基于倒排索引实现,哈希索引基于链表、哈希函数实现。

b+

首先,看一下二分查找,要从一个有序数组中查找一个元素,通过每次的折半再折半,时间复杂度为O(logn),如果直接顺序查找元素的话,时间复杂度为O(n)

mbs

接着,拓展到二叉查找树,如果二叉查找树足够平衡,那么时间复杂度还可以接近二分查找

mbst

反之,二叉查找树呈现线性状态的话,那么时间复杂度就接近于顺序查找了。

mbst2

因此,B+树既然能够提升查询性能的话,那必然是一棵平衡查找树,不能出现线性状态。
二叉查找树定义:左边节点值小于根节点值,右边节点值大于根节点值,中序遍历后为从小到大排序的列表。
平衡二叉树定义:在二叉查找树的条件下,又满足左右子树任何节点高度差不超过1。

B+树的所有记录节点都是按照顺序排列,从小到大排列在叶子节点,并且每个叶子节点之间有指针进行连接。

b+

B+树的叶子节点中,包含了所有的节点及数据,其它非叶子节点仅仅包含指针引用。

在执行范围查找的时候,只需要在B+树上进行二分查找,然后找到底层叶子节点的起始位置,再沿着叶子节点间的双向指针顺序遍历即可,一般B+树的高度在2 ~ 4之间。

除了这个优点之外,由于非叶子节点不存储数据,经过一次磁盘I/O操作,就可以加载大量的节点,然后在内存中高效执行二分查找,再去找到数据所在叶子节点即可。

由B+树来实现的索引包含:聚集索引、非聚集索引。

每张表有且仅有一个聚集索引,通常情况下,是采用主键来作为聚集索引。
聚集索引根据每张表的主键来构造B+树,在叶子节点则存储整张表的行记录数据
聚集索引是逻辑连续的,因此,在范围查找、根据主键排序的时候,效率特别高。

与聚集索引相反的是非聚集索引,又称为辅助索引,它的叶子节点并不存储行记录数据
每张表可以拥有多个非聚集索引,通过非聚集索引查找数据会遍历找到叶子节点,再通过叶子节点指向的聚集索引的指针,再去聚集索引上遍历然后查找对应的行记录数据,也因此通过非聚集索引查找数据比通过聚集索引慢了一倍的速度。

msindex

基于B+树两类索引实现的应用包含有:联合索引、覆盖索引。

索引既可以使用单独的列,又可以使用多个列组合起来使用,这就是联合索引
比如有这么一张表,包含有主键id及name、age组成的联合索引及一个addr普通字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
mysql> desc test;
+-------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id | int(11) | NO | PRI | 0 | |
| name | varchar(30) | YES | MUL | NULL | |
| age | int(11) | YES | | NULL | |
| addr | varchar(200) | YES | | NULL | |
+-------+--------------+------+-----+---------+-------+

mysql> show index from test;
+-------+------------+----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment |
+-------+------------+----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
| test | 0 | PRIMARY | 1 | id | A | 0 | NULL | NULL | | BTREE | | |
| test | 1 | union_x | 1 | name | A | 0 | NULL | NULL | YES | BTREE | | |
| test | 1 | union_x | 2 | age | A | 0 | NULL | NULL | YES | BTREE | | |
+-------+------------+----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+

当然,由于采用的B+树实现的,联合索引底层叶子节点的数据也是按多列顺序排列。
对于联合索引,需要符合最左原则,才可以使用到索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
mysql> explain select * from test where name = 1 and age = 1;
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| 1 | SIMPLE | test | ALL | union_x | NULL | NULL | NULL | 997182 | Using where |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+

mysql> explain select * from test where age = 1 and name = 1;
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| 1 | SIMPLE | test | ALL | union_x | NULL | NULL | NULL | 997182 | Using where |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+

mysql> explain select * from test where name = 1;
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| 1 | SIMPLE | test | ALL | union_x | NULL | NULL | NULL | 997182 | Using where |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+

mysql> explain select * from test where age = 1;
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+
| 1 | SIMPLE | test | ALL | NULL | NULL | NULL | NULL | 997182 | Using where |
+----+-------------+-------+------+---------------+------+---------+------+--------+-------------+

通过以上四个例子可以看到,联合索引如果按照其中一个列去查询的话,那就必须按照最左原则。
现在有联合索引<name,age>,通过name去查询可以用到索引,而通过age无法使用索引。
如果同时使用name和age的话,在查询的where条件后面可以调整name和age的顺序且不影响索引的使用。

在联合索引的基础上,又产生了一种覆盖索引,也就是直接从非聚集索引上就可以查到数据。

1
2
3
4
5
6
mysql>  explain select name,age from test where age = 1;
+----+-------------+-------+-------+---------------+---------+---------+------+--------+--------------------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+-------+-------+---------------+---------+---------+------+--------+--------------------------+
| 1 | SIMPLE | test | index | NULL | union_x | 38 | NULL | 997182 | Using where; Using index |
+----+-------------+-------+-------+---------------+---------+---------+------+--------+--------------------------+

由于name、age组成了联合索引,通过非聚集索引(辅助索引)就可以找到name和age。
通过覆盖索引查询则无需再去聚集索引查询数据,可以减少磁盘I/O。

上文提到B+树高度一般在2 ~ 4层,这是由InnoDB存储引擎数据组织方式决定的。

磁盘中,最小的存储单元扇区,每一个扇区大小为512字节。
文件系统中,最小的存储单元是,每一个块大小为4KB。
InnoDB中,最小的存储单元是,每一页大小为16KB。

innodb-driver

在数据表中,数据都是存储在页上,一页16KB,如果一行数据1KB,那么一页可以存储16行数据。
假设主键大小为8字节,指针大小为6字节,则一个非叶子节点页可以存储16*1024/(8+6)=1170个单元。

如果高度为2,每个单元都指向一个页,那么可以存储1170*16=18720条记录。
如果高度为3,那么可以存储1170*1170*16=21902400条记录,大概2000万条。
如果高度为4,那么可以存储1170*1170*1170*16=25625808000条记录,大概200亿条。

一般来说,一张表数据量到了2000万条算多的了,不至于到200亿条。因此,B+树高度一般在2-4层,对于磁盘来说,1秒可以进行100次磁盘I/O操作,加载3层B+树,需要3次磁盘I/O,也就是0.03秒

在计算机通信中,I/O在数据交换中起到了至关重要的作用。    

最初,只有BIO这种同步阻塞式的I/O编程通信模型。
随着计算机普及及流量的上涨,产生了NIO这种同步非阻塞式的I/O编程通信模型。
再后来,NIO出现了瓶颈,便产生了AIO这种异步非阻塞式的I/O编程通信模型。

对于I/O类型,分为:文件I/O、网络I/O。

文件I/O:基本本地磁盘,在内核空间与用户进程空间之间交换数据。
网络I/O:基本网络Socket通信,在不同(主机)进程之间交换数据。

对于I/O方式,分为:同步、异步。

同步方式:后续的操作需要等待前面操作完成才可以继续执行。
异步方式:后续的操作不需要等待前面操作完成,可以直接返回,通过event、callback调用。

对于I/O状态,分为:阻塞、非阻塞。

阻塞状态:当前执行的线程将处于阻塞状态,无法继续执行其他的任务。
非阻塞状态:当前执行的线程不会处于阻塞状态,可以继续其它任务,I/O操作由后台去处理。

I/O通信模型围绕着上述方式和状态分为:BIO、NIO、AIO。

首先,是BIO,同步阻塞I/O,最为原始,设计最简单,适用于并发线程少于1000的情况。
在服务端,绑定IP、监听端口,启动一个Acceptor线程,阻塞等待accept。
当客户端进发起连接请求时,创建一个线程进行Socket连接,等待与客户端之间进行读写I/O流操作。
客户端使用连接完毕之后,断开连接,销毁线程。
当然,由于创建线程成本过高,可以采用线程池进行优化线程成本。
但是,如果线程内部I/O阻塞,线程池资源也会带来瓶颈,也因此无法同时承受太多并发连接。

linux-bio

os-bio

接着,是NIO,同步非阻塞I/O,底层基于Reactor模型来实现。
BIO不同的是,在NIO中,客户端与服务端的Channel建立连接后,由Selector线程不断的去轮询,获取当前就绪的Channel。
而不是来一个请求直接启动一个线程。Selector是一个多路复用器,Channel被注册到Selector上。
客户端只与Channel进行交互,所有的读写不是基于流,而是基于Buffer,有I/O操作时才会创建线程。
通过多路复用器的轮询,而不是Acceptor的阻塞accept,实现了非阻塞I/O。
对于多路复用器轮询出可用Channel的操作,根据操作系统实现又包含有:select、poll、epoll、kqueue。

linux-nio

os-nio

AIO作为NIO的升级版,是异步非阻塞I/O,底层基于Proactor模型来实现,适用高并发场景。
整体上基于事件和回调机制,文件通道、套接字通道都是异步的,读写返回的是Future
不需要再去注册相关感兴趣的key,只需要等待事件和I/O操作结果返回即可。

linux-aio

总的来说:
BIO,基于同步阻塞,适用连接较少且连接使用时间均匀的场景,耗费服务器资源较多。
NIO,基于同步非阻塞,适用连接较多且连接使用时间短的场景,充分利用服务器资源。
AIO,基于异步非阻塞,适用连接较多且连接使用时间长的场景,充分利用操作系统来完成并发操作。

Kafka的高性能在于设计巧妙及借助操作系统特性。

Kafka消息的生产和消费都是需要指定Topic的。

首先,从表面设计方面来看。

一个Topic可以拆分为多个Partition,而这些Partition可以均匀部署在多个Broker上,充分发挥集群的作用,实现机器并行处理。每个Partition再分为多个Segment,每次只有一个Segment可以进行日志的顺序写入,其它Segment可以根据offset进行读取。

对于日志的存储路径,Kafka支持多磁盘路径,通过配置log.dirs,按逗号分隔,可以实现磁盘并行处理。

kafka-hp

接着,从底层实现方面来看。

Kafka采用了顺序写的方式,对于一些场景,顺序写磁盘随机写内存来得快。

另外,由于Segment的存在,使得Kafka删除旧数据的时候更简单,直接删除老的Segment文件,而不需要操作一个文件去删除内容,也避免了随机写的操作。

Kafka充分利用了Page Cache,如果读写速率相当,只需要操作Page Cache即可,而不需要操作磁盘,数据会由I/O调度器定时组装成大块刷入磁盘中。

通过Page Cache的方式,Kafka不需要使用JVM的堆内存,也减少了GC的负担。

数据通过网络传输然后持久化到磁盘中,也从磁盘传输到网络中,Kafka采用了NIO零拷贝机制,减少了内核空间、用户空间的数据拷贝过程以及上下文切换次数。

Kafka从Producer到Broker发送的数据并没有直接发送过去,而是先缓存起来,积累一定条数或者等待一定时间,然后合并、压缩、序列化批量发送到Broker,降低了网络负载,提高了传输效率。

Kafka的高可用主要围绕着:Data Replication、Leader Election 两方面来实现的。

Data Replication(数据备份),这种冗余数据的方式比较多见,一份数据不安全,那就来多份,再把这些数据均匀分散在多台主机上,即便是部分主机宕机,也不会影响整体。

这种方式虽好,却存在着数据一致性的问题。

kafka-ha

副本与源头数据,由于网络延迟等原因导致数据不一致。

在Kafka中,源头称为Leader,副本称为Replica或者Follower,所有的读、写只会在Leader上操作,通过这种方式,可以保证数据的有序性,副本只从Leader拉取数据,可以减少数据同步通路数,降低副本设计的复杂度。当然,这种方式仍然存在数据一致性问题。

为了解决这个问题,Kafka定义了一个ISR(In-Sync Replica),所有数据同步完成或者没有落后Leader过多的副本都会被记入ISR中。当Leader宕机之后,新的Leader只会从ISR中选举。

当然,极端情况下,可能ISR中没有任何副本,这个时候就需要在一致性可用性之间选择。

如果选择一致性,那么就需要等待ISR中的副本活过来,会暂时不可用
如果选择可用性,那么就选择第一个活过来的副本,可能出现数据丢失

为了实现更好的高可用,需要将TopicPartition负载均衡到多个Broker,并且要保证Partition的个数大于等于拥有的Broker的个数,保证单BrokerPartition可靠性。

可以自定义Partition算法,默认算法是:

  • 将所有的Broker,假设n个和Partition排序。
  • 将第i个Partition分配到第 i mod n 个Broker上。
  • 将第i个Partition的第j个Replica分配到第 (i+j)mod n 个Broker上。

kafka-repl

对于Leader Election可以简单通过Zookeeper来完成,最先创建ZNode节点的就作为Leader。

但是,这种方式可能出现脑裂惊群现象,造成多主、ZK压力过大。

还有流行的少数服从多数算法,但是这种要求至少一半存活,比较不靠谱。

Kafka在初始化的时候,启动了一个KafkaController,将Leader Election交给这个控制器来完成。

只有最先在Zookeeper上创建/controller节点的主机才可以作为控制器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
 /**
* This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
* It does the following things on the become-controller state change -
* 1. Registers controller epoch changed listener
* 2. Increments the controller epoch
* 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* leaders for all existing partitions.
* 4. Starts the controller's channel manager
* 5. Starts the replica state machine
* 6. Starts the partition state machine
* If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
private def onControllerFailover() {
info("Reading controller epoch from ZooKeeper")
//读取zk上/controller_epoch的值
   readControllerEpochFromZooKeeper()
   info("Incrementing controller epoch in ZooKeeper")
//将/controller_epoch的值 +1
   incrementControllerEpoch()
   info("Registering handlers")

//注册监听器,监听Topic、Broker、ISR等的变化
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler,topicChangeHandler, topicDeletionHandler,logDirEventNotificationHandler,isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
   //删除监听器,isr变更或者日志目录变更会触发监听器,进行重分配
   info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications()
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications()
info("Initializing controller context")
//初始化控制器上下文
   initializeControllerContext()
   info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
   info("Initializing topic deletion manager")
//初始化Topic管理器
   topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
info("Sending update metadata request")
   sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

   replicaStateMachine.startup()//启动replica状态机
   partitionStateMachine.startup()//启动partition状态机

info(s"Ready to serve as the new controller with epoch $epoch")
   //触发重新分配
   maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
   //尝试删除无效Topic
   topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
   //执行选举
   onPreferredReplicaElection(pendingPreferredReplicaElections)
info("Starting the controller scheduler")
   //启动kafka重分配定时任务
   kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
   //启动token认证定时任务
   if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
}

Kafka的网络通信模型是基于Java NIO的Reactor多线程模型实现的。

从Kakfa的SocketServer.scala中可以看到一段关于Kafka网络模型的说明。

1
2
3
4
5
6
/**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/

Kafka包含了1个Acceptor线程用来接收新连接N个Processor线程用来处理Socket请求、M个Handler线程用来处理业务逻辑。

首先,对比一下几种NIO模型。

  • 普通NIO
    common-nio

  • 高并发NIO
    advance-nio

  • Kafka NIO
    kafka-nio

接着,从源码层面来分析。

Broker启动的时候,会创建Acceptor以及Processor,并初始化KafkaApis请求处理池

1
2
3
4
5
6
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//启动Acceptor,绑定端口
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)
1
2
3
4
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
1
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads)

startup方法创建了连接数管理器、启动Acceptor线程及Processor线程。

1
2
3
4
5
6
7
8
9
10
def startup(startupProcessors: Boolean = true) {
this.synchronized {
   //用于维护单IP下的连接数,防止资源过载
   connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
startProcessors()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def createAcceptorAndProcessors(processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {

val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
//遍历server.properties配置的listeners属性,Kafka单机支持多协议、多端口
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
//创建Acceptor线程,配置socket buffer,并开启nioSelector,启动端口监听客户端
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
//添加连接处理器Processor
addProcessors(acceptor, endpoint, processorsPerListener)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
acceptors.put(endpoint, acceptor)
}
}

在Acceptor线程内部,不断循环,监听OP_ACCEPT事件,再将请求交给Processor去处理I/O。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor)
}
                 //获取连接
                 accept(key, processor)
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

               // 轮询到下一个Processor
               currentProcessor = currentProcessor + 1
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
//监听新连接
val socketChannel = serverSocketChannel.accept()
try {
   //增加连接数
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//Processor处理I/O事件
     processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

Processor的accept将socketChannel存放在ConcurrentLinkedQueue中。

1
2
3
4
5
6
7
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}

然后由Processor线程从队列中获取连接并交给RequestChannel处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
override def run() {
startupComplete()//CountDownLatch
try {
while (isRunning) {
try {
         //从队列中取出连接
         configureNewConnections()
         //处理responseQueue
         processNewResponses()
         //selector.poll
         poll()
         //处理requestQueue
         processCompletedReceives()
         //移除inflightResponses
         processCompletedSends()
         //移除连接
         processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug("Closing selector - processor " + id)
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}

最后,由KafkaRequestHandlerPool实现的简单线程池启动的KafkaRequestHandler线程,不断从RequestChannel中的requestQueue获取请求,然后调用KafkaApis处理业务逻辑,再返回给RequestChannelresponseQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
//线程池大小
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

 //启动KafkaRequestHandler线程
 def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}

def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to $newSize")
if (newSize > currentSize) {
for (i <- currentSize until newSize) {
createHandler(i)
}
} else if (newSize < currentSize) {
for (i <- 1 to (currentSize - newSize)) {
runnables.remove(currentSize - i).stop()
}
}
threadPoolSize.set(newSize)
}

def shutdown(): Unit = synchronized {
info("shutting down")
for (handler <- runnables)
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
info("shut down completely")
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
private val shutdownComplete = new CountDownLatch(1)
@volatile private var stopped = false

def run() {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
     //获取请求
     val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return

case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
           //由KafkaApis来处理业务逻辑
           apis.handle(request)
} catch {
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
request.releaseBuffer()
}

case null => // continue
}
}
shutdownComplete.countDown()
}

def stop(): Unit = {
stopped = true
}

def initiateShutdown(): Unit = requestChannel.sendShutdownRequest()

def awaitShutdown(): Unit = shutdownComplete.await()

}

KafkaApishandle方法逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}

每个业务处理完结果都存入RequestChannel中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
// Update error metrics for each error code in the response including Errors.NONE
responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
case None =>
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Send a response back to the socket server to be sent over the network */
def sendResponse(response: RequestChannel.Response) {
if (isTraceEnabled) {
val requestHeader = response.request.header
val message = response.responseAction match {
case SendAction =>
s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${response.responseSend.get.size} bytes."
case NoOpAction =>
s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
case CloseConnectionAction =>
s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
}
trace(message)
}

val processor = processors.get(response.processor)
// The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
if (processor != null) {
processor.enqueueResponse(response)
}
}
1
2
3
4
private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
responseQueue.put(response)
wakeup()
}

Kafka消费端采用pull模式,为消费端提供了更多的控制权。

如果broker采用push模式,能更快的将消息推送给消费端,但无法适应消费端的消费能力,使消费端崩溃。

采用pull模式,可自主控制消费速率及消费的方式(批量、单条),并且可以表述不同的传输语义。

Kafka中包含了三种传输语义:

  • At Most Once 消息可能会丢,但不会重复。

kafka-amo

在这种模式下,先Commit Offset,再去处理消息。
如果Commit成功了,此时消费端宕机了,那么下次恢复的时候,消息不会再下发,也就丢了

  • At Least Once 消息不会丢,但可能会重复(默认模式)。

kafka-alo

在这种模式下,先处理消息,再Commit Offset。
如果Commit失败了,那么这条消息还会继续下发,直到Offset Commit成功,也就重了

  • Exactly Once 消息不会丢且不会重复。

kafka-eo

At Least Once 为基础,让下游保证幂等,并且保存消息处理状态、Offset提交状态,间接实现Exactly Once。而要真正实现Exactly Once,需要引入两阶段事务处理,对于消息乱序、消息重复,采用类似TCP三次握手的ACK机制,对于单Session的,可以这么简单处理,对于多Session的,需要基于事务来实现类似分布式锁、分布式Session,记录所有事务状态、事务进度、判断是否合法,要么全部成功,要么全部失败。