/** * This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller. * It does the following things on the become-controller state change - * 1. Registers controller epoch changed listener * 2. Increments the controller epoch * 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and * leaders for all existing partitions. * 4. Starts the controller's channel manager * 5. Starts the replica state machine * 6. Starts the partition state machine * If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller. * This ensures another controller election will be triggered and there will always be an actively serving controller */ privatedefonControllerFailover() { info("Reading controller epoch from ZooKeeper") //读取zk上/controller_epoch的值 readControllerEpochFromZooKeeper() info("Incrementing controller epoch in ZooKeeper") //将/controller_epoch的值 +1 incrementControllerEpoch() info("Registering handlers") //注册监听器,监听Topic、Broker、ISR等的变化 // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks val childChangeHandlers = Seq(brokerChangeHandler,topicChangeHandler, topicDeletionHandler,logDirEventNotificationHandler,isrChangeNotificationHandler) childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence) //删除监听器,isr变更或者日志目录变更会触发监听器,进行重分配 info("Deleting log dir event notifications") zkClient.deleteLogDirEventNotifications() info("Deleting isr change notifications") zkClient.deleteIsrChangeNotifications() info("Initializing controller context") //初始化控制器上下文 initializeControllerContext() info("Fetching topic deletions in progress") val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() info("Initializing topic deletion manager") //初始化Topic管理器 topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). info("Sending update metadata request") sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
info(s"Ready to serve as the new controller with epoch $epoch") //触发重新分配 maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet) //尝试删除无效Topic topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() //执行选举 onPreferredReplicaElection(pendingPreferredReplicaElections) info("Starting the controller scheduler") //启动kafka重分配定时任务 kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) } //启动token认证定时任务 if (config.tokenAuthEnabled) { info("starting the token expiry check scheduler") tokenCleanScheduler.startup() tokenCleanScheduler.schedule(name = "delete-expired-tokens", fun = tokenManager.expireTokens, period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS) } }