0%

Kafka的高性能在于设计巧妙及借助操作系统特性。

Kafka消息的生产和消费都是需要指定Topic的。

首先,从表面设计方面来看。

一个Topic可以拆分为多个Partition,而这些Partition可以均匀部署在多个Broker上,充分发挥集群的作用,实现机器并行处理。每个Partition再分为多个Segment,每次只有一个Segment可以进行日志的顺序写入,其它Segment可以根据offset进行读取。

对于日志的存储路径,Kafka支持多磁盘路径,通过配置log.dirs,按逗号分隔,可以实现磁盘并行处理。

kafka-hp

接着,从底层实现方面来看。

Kafka采用了顺序写的方式,对于一些场景,顺序写磁盘随机写内存来得快。

另外,由于Segment的存在,使得Kafka删除旧数据的时候更简单,直接删除老的Segment文件,而不需要操作一个文件去删除内容,也避免了随机写的操作。

Kafka充分利用了Page Cache,如果读写速率相当,只需要操作Page Cache即可,而不需要操作磁盘,数据会由I/O调度器定时组装成大块刷入磁盘中。

通过Page Cache的方式,Kafka不需要使用JVM的堆内存,也减少了GC的负担。

数据通过网络传输然后持久化到磁盘中,也从磁盘传输到网络中,Kafka采用了NIO零拷贝机制,减少了内核空间、用户空间的数据拷贝过程以及上下文切换次数。

Kafka从Producer到Broker发送的数据并没有直接发送过去,而是先缓存起来,积累一定条数或者等待一定时间,然后合并、压缩、序列化批量发送到Broker,降低了网络负载,提高了传输效率。

Kafka的高可用主要围绕着:Data Replication、Leader Election 两方面来实现的。

Data Replication(数据备份),这种冗余数据的方式比较多见,一份数据不安全,那就来多份,再把这些数据均匀分散在多台主机上,即便是部分主机宕机,也不会影响整体。

这种方式虽好,却存在着数据一致性的问题。

kafka-ha

副本与源头数据,由于网络延迟等原因导致数据不一致。

在Kafka中,源头称为Leader,副本称为Replica或者Follower,所有的读、写只会在Leader上操作,通过这种方式,可以保证数据的有序性,副本只从Leader拉取数据,可以减少数据同步通路数,降低副本设计的复杂度。当然,这种方式仍然存在数据一致性问题。

为了解决这个问题,Kafka定义了一个ISR(In-Sync Replica),所有数据同步完成或者没有落后Leader过多的副本都会被记入ISR中。当Leader宕机之后,新的Leader只会从ISR中选举。

当然,极端情况下,可能ISR中没有任何副本,这个时候就需要在一致性可用性之间选择。

如果选择一致性,那么就需要等待ISR中的副本活过来,会暂时不可用
如果选择可用性,那么就选择第一个活过来的副本,可能出现数据丢失

为了实现更好的高可用,需要将TopicPartition负载均衡到多个Broker,并且要保证Partition的个数大于等于拥有的Broker的个数,保证单BrokerPartition可靠性。

可以自定义Partition算法,默认算法是:

  • 将所有的Broker,假设n个和Partition排序。
  • 将第i个Partition分配到第 i mod n 个Broker上。
  • 将第i个Partition的第j个Replica分配到第 (i+j)mod n 个Broker上。

kafka-repl

对于Leader Election可以简单通过Zookeeper来完成,最先创建ZNode节点的就作为Leader。

但是,这种方式可能出现脑裂惊群现象,造成多主、ZK压力过大。

还有流行的少数服从多数算法,但是这种要求至少一半存活,比较不靠谱。

Kafka在初始化的时候,启动了一个KafkaController,将Leader Election交给这个控制器来完成。

只有最先在Zookeeper上创建/controller节点的主机才可以作为控制器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
 /**
* This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
* It does the following things on the become-controller state change -
* 1. Registers controller epoch changed listener
* 2. Increments the controller epoch
* 3. Initializes the controller's context object that holds cache objects for current topics, live brokers and
* leaders for all existing partitions.
* 4. Starts the controller's channel manager
* 5. Starts the replica state machine
* 6. Starts the partition state machine
* If it encounters any unexpected exception/error while becoming controller, it resigns as the current controller.
* This ensures another controller election will be triggered and there will always be an actively serving controller
*/
private def onControllerFailover() {
info("Reading controller epoch from ZooKeeper")
//读取zk上/controller_epoch的值
   readControllerEpochFromZooKeeper()
   info("Incrementing controller epoch in ZooKeeper")
//将/controller_epoch的值 +1
   incrementControllerEpoch()
   info("Registering handlers")

//注册监听器,监听Topic、Broker、ISR等的变化
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
val childChangeHandlers = Seq(brokerChangeHandler,topicChangeHandler, topicDeletionHandler,logDirEventNotificationHandler,isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
   //删除监听器,isr变更或者日志目录变更会触发监听器,进行重分配
   info("Deleting log dir event notifications")
zkClient.deleteLogDirEventNotifications()
info("Deleting isr change notifications")
zkClient.deleteIsrChangeNotifications()
info("Initializing controller context")
//初始化控制器上下文
   initializeControllerContext()
   info("Fetching topic deletions in progress")
val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
   info("Initializing topic deletion manager")
//初始化Topic管理器
   topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines
// are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before
// they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and
// partitionStateMachine.startup().
info("Sending update metadata request")
   sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

   replicaStateMachine.startup()//启动replica状态机
   partitionStateMachine.startup()//启动partition状态机

info(s"Ready to serve as the new controller with epoch $epoch")
   //触发重新分配
   maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet)
   //尝试删除无效Topic
   topicDeletionManager.tryTopicDeletion()
val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
   //执行选举
   onPreferredReplicaElection(pendingPreferredReplicaElections)
info("Starting the controller scheduler")
   //启动kafka重分配定时任务
   kafkaScheduler.startup()
if (config.autoLeaderRebalanceEnable) {
scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
}
   //启动token认证定时任务
   if (config.tokenAuthEnabled) {
info("starting the token expiry check scheduler")
tokenCleanScheduler.startup()
tokenCleanScheduler.schedule(name = "delete-expired-tokens",
fun = tokenManager.expireTokens,
period = config.delegationTokenExpiryCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
}
}

Kafka的网络通信模型是基于Java NIO的Reactor多线程模型实现的。

从Kakfa的SocketServer.scala中可以看到一段关于Kafka网络模型的说明。

1
2
3
4
5
6
/**
* An NIO socket server. The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
*/

Kafka包含了1个Acceptor线程用来接收新连接N个Processor线程用来处理Socket请求、M个Handler线程用来处理业务逻辑。

首先,对比一下几种NIO模型。

  • 普通NIO
    common-nio

  • 高并发NIO
    advance-nio

  • Kafka NIO
    kafka-nio

接着,从源码层面来分析。

Broker启动的时候,会创建Acceptor以及Processor,并初始化KafkaApis请求处理池

1
2
3
4
5
6
// Create and start the socket server acceptor threads so that the bound port is known.
// Delay starting processors until the end of the initialization sequence to ensure
// that credentials have been loaded before processing authentications.
//启动Acceptor,绑定端口
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)
1
2
3
4
/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
1
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, config.numIoThreads)

startup方法创建了连接数管理器、启动Acceptor线程及Processor线程。

1
2
3
4
5
6
7
8
9
10
def startup(startupProcessors: Boolean = true) {
this.synchronized {
   //用于维护单IP下的连接数,防止资源过载
   connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
if (startupProcessors) {
startProcessors()
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def createAcceptorAndProcessors(processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit = synchronized {

val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
//遍历server.properties配置的listeners属性,Kafka单机支持多协议、多端口
endpoints.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
//创建Acceptor线程,配置socket buffer,并开启nioSelector,启动端口监听客户端
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
//添加连接处理器Processor
addProcessors(acceptor, endpoint, processorsPerListener)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
acceptors.put(endpoint, acceptor)
}
}

在Acceptor线程内部,不断循环,监听OP_ACCEPT事件,再将请求交给Processor去处理I/O。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor)
}
                 //获取连接
                 accept(key, processor)
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")

               // 轮询到下一个Processor
               currentProcessor = currentProcessor + 1
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
// We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
// to a select operation on a specific channel or a bad request. We don't want
// the broker to stop responding to requests from other clients in these scenarios.
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally {
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
//监听新连接
val socketChannel = serverSocketChannel.accept()
try {
   //增加连接数
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
debug("Accepted connection from %s on %s and assigned it to processor %d, sendBufferSize [actual|requested]: [%d|%d] recvBufferSize [actual|requested]: [%d|%d]"
.format(socketChannel.socket.getRemoteSocketAddress, socketChannel.socket.getLocalSocketAddress, processor.id,
socketChannel.socket.getSendBufferSize, sendBufferSize,
socketChannel.socket.getReceiveBufferSize, recvBufferSize))
//Processor处理I/O事件
     processor.accept(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info("Rejected connection from %s, address already has the configured maximum of %d connections.".format(e.ip, e.count))
close(socketChannel)
}
}

Processor的accept将socketChannel存放在ConcurrentLinkedQueue中。

1
2
3
4
5
6
7
/**
* Queue up a new connection for reading
*/
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}

然后由Processor线程从队列中获取连接并交给RequestChannel处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
override def run() {
startupComplete()//CountDownLatch
try {
while (isRunning) {
try {
         //从队列中取出连接
         configureNewConnections()
         //处理responseQueue
         processNewResponses()
         //selector.poll
         poll()
         //处理requestQueue
         processCompletedReceives()
         //移除inflightResponses
         processCompletedSends()
         //移除连接
         processDisconnected()
} catch {
// We catch all the throwables here to prevent the processor thread from exiting. We do this because
// letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
// reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
// be either associated with a specific socket channel or a bad request. These exceptions are caught and
// processed by the individual methods above which close the failing channel and continue processing other
// channels. So this catch block should only ever see ControlThrowables.
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally {
debug("Closing selector - processor " + id)
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}

最后,由KafkaRequestHandlerPool实现的简单线程池启动的KafkaRequestHandler线程,不断从RequestChannel中的requestQueue获取请求,然后调用KafkaApis处理业务逻辑,再返回给RequestChannelresponseQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
//线程池大小
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

 //启动KafkaRequestHandler线程
 def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
}

def resizeThreadPool(newSize: Int): Unit = synchronized {
val currentSize = threadPoolSize.get
info(s"Resizing request handler thread pool size from $currentSize to $newSize")
if (newSize > currentSize) {
for (i <- currentSize until newSize) {
createHandler(i)
}
} else if (newSize < currentSize) {
for (i <- 1 to (currentSize - newSize)) {
runnables.remove(currentSize - i).stop()
}
}
threadPoolSize.set(newSize)
}

def shutdown(): Unit = synchronized {
info("shutting down")
for (handler <- runnables)
handler.initiateShutdown()
for (handler <- runnables)
handler.awaitShutdown()
info("shut down completely")
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* A thread that answers kafka requests.
*/
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
private val shutdownComplete = new CountDownLatch(1)
@volatile private var stopped = false

def run() {
while (!stopped) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = time.nanoseconds
     //获取请求
     val req = requestChannel.receiveRequest(300)
val endTime = time.nanoseconds
val idleTime = endTime - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get)

req match {
case RequestChannel.ShutdownRequest =>
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return

case request: RequestChannel.Request =>
try {
request.requestDequeueTimeNanos = endTime
trace(s"Kafka request handler $id on broker $brokerId handling request $request")
           //由KafkaApis来处理业务逻辑
           apis.handle(request)
} catch {
case e: FatalExitError =>
shutdownComplete.countDown()
Exit.exit(e.statusCode)
case e: Throwable => error("Exception when handling request", e)
} finally {
request.releaseBuffer()
}

case null => // continue
}
}
shutdownComplete.countDown()
}

def stop(): Unit = {
stopped = true
}

def initiateShutdown(): Unit = requestChannel.sendShutdownRequest()

def awaitShutdown(): Unit = shutdownComplete.await()

}

KafkaApishandle方法逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
try {
trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
case ApiKeys.END_TXN => handleEndTxnRequest(request)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => handleError(request, e)
} finally {
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}

每个业务处理完结果都存入RequestChannel中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private def sendResponse(request: RequestChannel.Request, responseOpt: Option[AbstractResponse]): Unit = {
// Update error metrics for each error code in the response including Errors.NONE
responseOpt.foreach(response => requestChannel.updateErrorMetrics(request.header.apiKey, response.errorCounts.asScala))

responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
requestChannel.sendResponse(new RequestChannel.Response(request, Some(responseSend), SendAction, responseString))
case None =>
requestChannel.sendResponse(new RequestChannel.Response(request, None, NoOpAction, None))
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/** Send a response back to the socket server to be sent over the network */
def sendResponse(response: RequestChannel.Response) {
if (isTraceEnabled) {
val requestHeader = response.request.header
val message = response.responseAction match {
case SendAction =>
s"Sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} of ${response.responseSend.get.size} bytes."
case NoOpAction =>
s"Not sending ${requestHeader.apiKey} response to client ${requestHeader.clientId} as it's not required."
case CloseConnectionAction =>
s"Closing connection for client ${requestHeader.clientId} due to error during ${requestHeader.apiKey}."
}
trace(message)
}

val processor = processors.get(response.processor)
// The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
if (processor != null) {
processor.enqueueResponse(response)
}
}
1
2
3
4
private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
responseQueue.put(response)
wakeup()
}