0%

Kafka - 高可用设计

Kafka的高可用主要围绕着:Data Replication、Leader Election 两方面来实现的。

Data Replication(数据备份),这种冗余数据的方式比较多见,一份数据不安全,那就来多份,再把这些数据均匀分散在多台主机上,即便是部分主机宕机,也不会影响整体。

这种方式虽好,却存在着数据一致性的问题。

kafka-ha

副本与源头数据,由于网络延迟等原因导致数据不一致。

在Kafka中,源头称为Leader,副本称为Replica或者Follower,所有的读、写只会在Leader上操作,通过这种方式,可以保证数据的有序性,副本只从Leader拉取数据,可以减少数据同步通路数,降低副本设计的复杂度。当然,这种方式仍然存在数据一致性问题。

为了解决这个问题,Kafka定义了一个ISR(In-Sync Replica),所有数据同步完成或者没有落后Leader过多的副本都会被记入ISR中。当Leader宕机之后,新的Leader只会从ISR中选举。

当然,极端情况下,可能ISR中没有任何副本,这个时候就需要在一致性可用性之间选择。

如果选择一致性,那么就需要等待ISR中的副本活过来,会暂时不可用
如果选择可用性,那么就选择第一个活过来的副本,可能出现数据丢失

为了实现更好的高可用,需要将TopicPartition负载均衡到多个Broker,并且要保证Partition的个数大于等于拥有的Broker的个数,保证单BrokerPartition可靠性。

可以自定义Partition算法,默认算法是:

  • 将所有的Broker,假设n个和Partition排序。
  • 将第i个Partition分配到第 i mod n 个Broker上。
  • 将第i个Partition的第j个Replica分配到第 (i+j)mod n 个Broker上。

kafka-repl

对于Leader Election可以简单通过Zookeeper来完成,最先创建ZNode节点的就作为Leader。

但是,这种方式可能出现脑裂惊群现象,造成多主、ZK压力过大。

还有流行的少数服从多数算法,但是这种要求至少一半存活,比较不靠谱。

Kafka在初始化的时候,启动了一个KafkaController,将Leader Election交给这个控制器来完成。

只有最先在Zookeeper上创建/controller节点的主机才可以作为控制器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
 /**
* This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
* It does the following things on the become-controller state change -
* 1. Registers controller epoch changed listener
* 2. Increments the controller epoch
* 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* leaders for all existing partitions.
* 4. Starts the controller's channel manager
* 5. Starts the replica state machine
* 6. Starts the partition state machine
* If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
private def onControllerFailover() {
info("Reading controller epoch from ZooKeeper")
//读取zk上/controller_epoch的值
   readControllerEpochFromZooKeeper()
   info("Incrementing controller epoch in ZooKeeper")
//将/controller_epoch的值 +1
   incrementControllerEpoch()
   info("Registering handlers")

//注册监听器,监听Topic、Broker、ISR等的变化
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler,topicChangeHandler, topicDeletionHandler,logDirEventNotificationHandler,isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
   //删除监听器,isr变更或者日志目录变更会触发监听器,进行重分配
   info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications()
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications()
info("Initializing controller context")
//初始化控制器上下文
   initializeControllerContext()
   info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
   info("Initializing topic deletion manager")
//初始化Topic管理器
   topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
info("Sending update metadata request")
   sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

   replicaStateMachine.startup()//启动replica状态机
   partitionStateMachine.startup()//启动partition状态机

info(s"Ready to serve as the new controller with epoch $epoch")
   //触发重新分配
   maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
   //尝试删除无效Topic
   topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
   //执行选举
   onPreferredReplicaElection(pendingPreferredReplicaElections)
info("Starting the controller scheduler")
   //启动kafka重分配定时任务
   kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
   //启动token认证定时任务
   if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
}