0%

Kafka消费端采用pull模式,为消费端提供了更多的控制权。

如果broker采用push模式,能更快的将消息推送给消费端,但无法适应消费端的消费能力,使消费端崩溃。

采用pull模式,可自主控制消费速率及消费的方式(批量、单条),并且可以表述不同的传输语义。

Kafka中包含了三种传输语义:

  • At Most Once 消息可能会丢,但不会重复。

kafka-amo

在这种模式下,先Commit Offset,再去处理消息。
如果Commit成功了,此时消费端宕机了,那么下次恢复的时候,消息不会再下发,也就丢了

  • At Least Once 消息不会丢,但可能会重复(默认模式)。

kafka-alo

在这种模式下,先处理消息,再Commit Offset。
如果Commit失败了,那么这条消息还会继续下发,直到Offset Commit成功,也就重了

  • Exactly Once 消息不会丢且不会重复。

kafka-eo

At Least Once 为基础,让下游保证幂等,并且保存消息处理状态、Offset提交状态,间接实现Exactly Once。而要真正实现Exactly Once,需要引入两阶段事务处理,对于消息乱序、消息重复,采用类似TCP三次握手的ACK机制,对于单Session的,可以这么简单处理,对于多Session的,需要基于事务来实现类似分布式锁、分布式Session,记录所有事务状态、事务进度、判断是否合法,要么全部成功,要么全部失败。

Topic是Kafka的基本组织单位。

Kafka的消息是以Topic作为基本单位来组织的,而多个Topic之间又是相互独立的。

形象的说,可以将Kafka当成一个队列集合,每一个Topic相当于一个队列

每个Topic又分为多个Partition,也就是分区,分别存储了一部分的消息,也起到了负载均衡的作用。

创建Topic的时候,可以指定partition的个数,最终以文件形式存储在磁盘上。

1
./kafka-topics.sh --create --partitions 3 --topic test_p_3 --zookeeper localhost:2181 --replication-factor 1

kafka-tps.png

每个partition以Topic名称-Partition序号来命名,底下又分为多个Segment,也就是分段

生产消息的时候,需要指定Topic,而Topic根据Partition路由算法,路由到具体Partition。

kafka-p-j

同一个Topic,每一个partition各自处理各自的消息。

ukafka-partition.png

每一个segment包含:日志(用来存储消息数据)和索引(用来记录offset相关映射信息)

通过分段,可以减少文件的大小以及通过稀疏索引进一步实现数据的快速查找。

可以通过配置log.dirs的值来修改数据存储的位置,默认是在/tmp/kafka-logs目录下。

kafka-logs

以相同基准offset命名的日志、索引文件,称为段。

如果一个partition只有一个数据文件的话:

  • 对于写入
    由于是顺序追加写入,时间复杂度为O(1)

  • 对于查找
    要查找某个offset,采用的是顺序查找,时间复杂度达到了O(n)

通过对数据进行分段、加索引,解决了痛点。

在Kafka中,索引文件分为:

nametypesuffix
偏移量索引OffsetIndex.index
时间戳索引TimeIndex.timeindex
事务索引TransactionIndex.txnindex

日志文件分为:

namesuffixdesc
数据文件.log用于存储消息
交换文件.swap用于segment的恢复
延迟待删文件.deleted用于标识要删除的文件
快照文件.snapshot用于记录producer的事务信息
清理临时文件.cleaned用于标识正在删除的文件

分段

假设有100条消息,offset取值范围在:0 - 99,如果分为4个段,那么每一个段各存储25条消息,取值范围分别为:0 - 24、25 - 49、 50 - 74、75 - 99,而每一段的文件以20位起始offset来命名,不足20位,用0补齐,要查找某个offset只需要找到对应范围内的文件,然后再通过二分查找算法去文件查找即可。

1
2
# 通过kafka.tools.DumpLogSegments可以查看log文件的内容
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test_p_3-0/00000000000000000000.log

kafka-sg

索引

通过分段可以减少数据文件大小,并且加快查询速度,但仍然存在顺序查找的最差情况。

为了进一步提高数据查找的速度,Kafka通过建立稀疏索引的方式,每隔一定间隔建立一条索引。

1
2
# 通过kafka.tools.DumpLogSegments可以查看index文件的内容
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test_p_3-0/00000000000000000000.index

kafka-index

偏移量索引,记录着 < offset , position > 偏移量 - 物理地址

时间戳索引,记录着 < timestamp , offset >时间戳 - 偏移量

事务索引,记录着 < offset , AbortedTxn > 偏移量 - 被中断事务

消息查找:offset、timestamp

在Kafka中,segment通过一个跳跃表来维护。

1
2
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

每一次,只有一个activeSegment可以进行写入消息,其它的segment是只读的。

1
2
3
4
/**
* The active segment that is currently taking appends
*/
def activeSegment = segments.lastEntry.getValue

基于偏移量查找

OffsetIndex的数据格式为:

1
2
3
offset: 33 position 4158
offset: 66 position 8316
offset: 99 position 12474

在Kafka的IndexEntry.scala中定义为:

1
2
3
4
5
6
7
8
9
/**
* The mapping between a logical log offset and the physical position
* in some log file of the beginning of the message set entry with the
* given offset.
*/
case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
override def indexKey = offset
override def indexValue = position.toLong
}

如果要查找offset=35的消息,那么就先从segments跳跃表上找到对应应访问的segment文件。

然后通过二分查找在偏移量索引文件找到不大于35的offset,也就是offset: 33 position 4158

解析offset对应的position,从position处顺序查找log文件,找到offset=35的位置。

基于时间戳查找

TimeIndex的数据格式为:

1
2
3
timestamp: 1579167998000 offset 33
timestamp: 1579168197621 offset 66
timestamp: 1579168397242 offset 99

在Kafka的IndexEntry.scala中定义为:

1
2
3
4
5
6
7
8
9
10
/**
* The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater
* than that timestamp must be at or after that offset.
* @param timestamp The max timestamp before the given offset.
* @param offset The message offset.
*/
case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
override def indexKey = timestamp
override def indexValue = offset
}

如果要查找timestamp=15791679981234的消息,那么就先比较每个segment文件的largestTimestamp,找到对应的segment文件。

然后通过二分查找在时间戳索引文件找到不大于15791679981234timestamp,也就是timestamp: 1579167998000 offset 33

找到offset直接去偏移量索引文件中查找position。

从position处顺序查找log文件,找到timestamp=15791679981234的位置。

通常,应用程序的启动都是基于main函数的,Kafka也不例外。

启动Kafka的时候,通过执行kafka-server-start.sh文件,并且指定配置文件即可。

1
./kafka-server-start.sh ../config/server.properties

kafka-server-start.sh中核心的内容在于启动kafka.Kafka类中的main函数。

1
2
#...省略
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

主要流程:解析配置文件、创建KafkaServerStartable实例、启动服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main(args: Array[String]): Unit = {
try {
     //解析配置文件
val serverProps = getPropsFromArgs(args)
     //从配置文件创建实例
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
//...
//优雅退出
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
kafkaServerStartable.startup()//启动应用
kafkaServerStartable.awaitShutdown()//阻塞等待关闭
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}

1. 解析配置文件
读取指定的server.properties文件,进行一系列的解析验证,支持通过命令行传入参数,覆盖配置文件中配置的属性,最后返回Properties实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def getPropsFromArgs(args: Array[String]): Properties = {
val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])

if (args.length == 0) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}

val props = Utils.loadProps(args(0)) //加载server.properties

if (args.length > 1) {
val options = optionParser.parse(args.slice(1, args.length): _*)
     //命令行参数个数校验
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
//将命令行配置的属性存入props
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
props
}

2. 创建KafkaServerStartable实例
KafkaServerStartable主要的作用是启动Kafka的Metrics监控以及维护Kafka服务的生命周期。
包括:启动、关闭、闭锁阻塞等待关闭、设置Kafka服务状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
 //启动监控
 val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters)
}
}
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
 //初始化真正的Kafka服务对象
 private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
 //启动
 def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
 //关闭
 def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
 //设置状态,包含:NotRunning、Starting、RecoveringFromUncleanShutdown、RunningAsBroker、PendingControlledShutdown、BrokerShuttingDown
 def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}
 //阻塞进程避免退出,通过CountDownLatch实现
 def awaitShutdown(): Unit = server.awaitShutdown()
}

3. 启动服务
启动服务是通过间接调用KafkaServerstartup()方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def startup() {
try {
info("starting")
     //是否已关闭
     if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
     //是否已启动
     if (startupComplete.get)
return
     //是否可以启动
     val canStartup = isStartingUp.compareAndSet(false, true)
     if (canStartup) {//设置broker状态为Starting
       brokerState.newState(Starting)
       //连接ZK,并创建根节点
       initZkClient(time)
       //从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
       _clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
       //获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
       //规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
       val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
       //配置logger
       this.logIdent = logContext.logPrefix
       //初始化AdminZkClient,支持动态修改配置
config.dynamicConfig.initialize(zkClient)
//初始化定时任务调度器
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()

       //创建及配置监控,默认使用JMX及Yammer Metrics
       val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
_brokerTopicStats = new BrokerTopicStats
       //初始化配额管理器
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
       //用于保证kafka-log数据目录的存在
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

       //启动日志管理器,kafka的消息以日志形式存储
       logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
       //启动日志清理、刷新、校验、恢复等的定时线程
       logManager.startup()

metadataCache = new MetadataCache(config.brokerId)
       // SCRAM认证方式的token缓存
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

       //启动socket,监听9092端口,等待接收客户端请求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

       //启动副本管理器,高可用相关
       replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()

       //将broker信息注册到ZK上
       val brokerInfo = createBrokerInfo
zkClient.registerBrokerInZk(brokerInfo)

       //校验broker信息
       checkpointBrokerId(config.brokerId)

       //启动token管理器
       tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()

       //启动Kafka控制器,只有leader会与ZK建连
       kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
kafkaController.startup()
       
//admin管理器
       adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

       //启动集群群组协调器
       groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()

       //启动事务协调器
       transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()

       //ACL
       authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
       //创建拉取管理器
       val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

       //初始化KafkaApis,负责核心的请求逻辑处理
       apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
       //请求处理池
       requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

Mx4jLoader.maybeLoad()

config.dynamicConfig.addReconfigurables(this)

       //启动动态配置处理器
       dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()

       //启动请求处理线程
       socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}