0%

Kafka - 启动流程分析

通常,应用程序的启动都是基于main函数的,Kafka也不例外。

启动Kafka的时候,通过执行kafka-server-start.sh文件,并且指定配置文件即可。

1
./kafka-server-start.sh ../config/server.properties

kafka-server-start.sh中核心的内容在于启动kafka.Kafka类中的main函数。

1
2
#...省略
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

主要流程:解析配置文件、创建KafkaServerStartable实例、启动服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main(args: Array[String]): Unit = {
try {
     //解析配置文件
val serverProps = getPropsFromArgs(args)
     //从配置文件创建实例
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
//...
//优雅退出
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
kafkaServerStartable.startup()//启动应用
kafkaServerStartable.awaitShutdown()//阻塞等待关闭
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}

1. 解析配置文件
读取指定的server.properties文件,进行一系列的解析验证,支持通过命令行传入参数,覆盖配置文件中配置的属性,最后返回Properties实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def getPropsFromArgs(args: Array[String]): Properties = {
val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])

if (args.length == 0) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}

val props = Utils.loadProps(args(0)) //加载server.properties

if (args.length > 1) {
val options = optionParser.parse(args.slice(1, args.length): _*)
     //命令行参数个数校验
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
//将命令行配置的属性存入props
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
props
}

2. 创建KafkaServerStartable实例
KafkaServerStartable主要的作用是启动Kafka的Metrics监控以及维护Kafka服务的生命周期。
包括:启动、关闭、闭锁阻塞等待关闭、设置Kafka服务状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
 //启动监控
 val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters)
}
}
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
 //初始化真正的Kafka服务对象
 private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
 //启动
 def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
 //关闭
 def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
 //设置状态,包含:NotRunning、Starting、RecoveringFromUncleanShutdown、RunningAsBroker、PendingControlledShutdown、BrokerShuttingDown
 def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}
 //阻塞进程避免退出,通过CountDownLatch实现
 def awaitShutdown(): Unit = server.awaitShutdown()
}

3. 启动服务
启动服务是通过间接调用KafkaServerstartup()方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def startup() {
try {
info("starting")
     //是否已关闭
     if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
     //是否已启动
     if (startupComplete.get)
return
     //是否可以启动
     val canStartup = isStartingUp.compareAndSet(false, true)
     if (canStartup) {//设置broker状态为Starting
       brokerState.newState(Starting)
       //连接ZK,并创建根节点
       initZkClient(time)
       //从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
       _clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
       //获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
       //规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
       val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
       //配置logger
       this.logIdent = logContext.logPrefix
       //初始化AdminZkClient,支持动态修改配置
config.dynamicConfig.initialize(zkClient)
//初始化定时任务调度器
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()

       //创建及配置监控,默认使用JMX及Yammer Metrics
       val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
_brokerTopicStats = new BrokerTopicStats
       //初始化配额管理器
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
       //用于保证kafka-log数据目录的存在
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

       //启动日志管理器,kafka的消息以日志形式存储
       logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
       //启动日志清理、刷新、校验、恢复等的定时线程
       logManager.startup()

metadataCache = new MetadataCache(config.brokerId)
       // SCRAM认证方式的token缓存
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

       //启动socket,监听9092端口,等待接收客户端请求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

       //启动副本管理器,高可用相关
       replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()

       //将broker信息注册到ZK上
       val brokerInfo = createBrokerInfo
zkClient.registerBrokerInZk(brokerInfo)

       //校验broker信息
       checkpointBrokerId(config.brokerId)

       //启动token管理器
       tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()

       //启动Kafka控制器,只有leader会与ZK建连
       kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
kafkaController.startup()
       
//admin管理器
       adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

       //启动集群群组协调器
       groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()

       //启动事务协调器
       transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()

       //ACL
       authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
       //创建拉取管理器
       val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

       //初始化KafkaApis,负责核心的请求逻辑处理
       apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
       //请求处理池
       requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

Mx4jLoader.maybeLoad()

config.dynamicConfig.addReconfigurables(this)

       //启动动态配置处理器
       dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()

       //启动请求处理线程
       socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}