Smart's Blog

kafka offset manager源码分析(1) offset的commit

序言

       kafka客户端消费者从分区日志文件pull消息时,是通过偏移量(offset)来决定所需要pull消息数据在日志文件中的起始位置。由于offset决定了consumer客户端拉取哪一部分数据,所以保障offset的准确性就显得尤为重要,kafka提供了一套较为可用的offset管理机制,包括offset commit、offset reset、offset存储等功能。下面我们从kafka相关部分的源码对offset commit做分析。

offset commit

       kafka consumer客户端会定时的提交自己当前的消费偏移量,以确保不会重复消费数据,客户端通过发送OffsetCommitRequest完成。而kafka server通过handleOffsetCommitRequest方法接收该请求,并对请求进行相应的处理,最后发送sendResponseCallback作为结果供consumer客户端的回调函数处理。接下来我们从java客户端和server端两个方面的源码详细分析offset commit的处理过程与相应机制。

server端源码

       下面这张图是kafka server端执行offset Commit请求的完整的流程图,kafka接收offset commit请求,然后调用相应的方法处理请求,如果offset的存储策略是存储于kafka的内部topic,就将请求内包含的offset信息append到kafka管理offset的内部topic对应的log文件内,最后返回一个response callback给客户端,表示处理结果。下面我们根据这张图,一步一步的分析下去。

image

(1)KafkaApis.handleOffsetCommitRequest

       这是第一步,KafkaApis类的作用是kafka接收各类请求,然后调用对应的方法去处理这些请求,当接收到时offset commit请求时,就调用handleOffsetCommitRequest方法处理。该方法会从客户端请求内提取出partition和group信息,并将这些信息作为参数传入第二步的handleCommitOffsets方法去执行。

:kafka会根据request.header中的apiVersion信息进行不同的处理。apiVersion=0表示offset存储于zookeeper中,否则表示offset存储于offset manager中(kafka的内部topic)。我们这里分析的是第二种情况,即内部topic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
val currentTimestamp = SystemTime.milliseconds
val defaultExpireTimestamp = offsetRetention + currentTimestamp
val partitionData = authorizedRequestInfo.mapValues { partitionData =>
val metadata =
if (partitionData.metadata == null)
OffsetMetadata.NoMetadata
else
partitionData.metadata;
new OffsetAndMetadata(
offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
commitTimestamp = currentTimestamp,
expireTimestamp = {
if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
defaultExpireTimestamp
else
offsetRetention + partitionData.timestamp
}
)
}
// call coordinator to handle commit offset
coordinator.handleCommitOffsets(
offsetCommitRequest.groupId,
offsetCommitRequest.memberId,
offsetCommitRequest.generationId,
partitionData,
sendResponseCallback)
}

(2)GroupCoordinator.handleCommitOffsets

       接下来看一下GroupCoordinator类的handleCommitOffsets方法的相关代码。该方法首先会校验Coordinator的状态和请求信息内的group信息,校验通过后进入下面这个else结构体内,

1
2
3
4
5
else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
delayedOffsetStore = Some(groupManager.prepareStoreOffsets(groupId, memberId, generationId,offsetMetadata, responseCallback))
}

       其中groupManager.prepareStoreOffsets是执行offset存储的关键方法,prepareStoreOffsets方法顾名思义,就是要做存储offset信息前的准备工作,具体做了哪些工作我们看一看prepareStoreOffsets方法内的代码:

       首先该方法对offsetMetadata进行过滤,过滤条件为size < OffsetConfig.DefaultMaxMetadataSize=4096),然后遍历过滤所得的存储offsetMetadata的map,并将处理所得的key、value、timestamp等相关信息实例化一个Message类,并转换成一个序列,messages就是这个序列的引用。接着实例化一个TopicPartition对象。最后将TopicPartition对象和由messages序列生成的ByteBufferMessageSet对象做完键值对,得到一个Map,这个Map对象的名称是offsetTopicPartition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// first filter out partitions with offset metadata size exceeding limit
val filteredOffsetMetadata = offsetMetadata.filter {
case (topicPartition, offsetAndMetadata) =>validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
// construct the message set to append
val messages = filteredOffsetMetadata.map {
case (topicAndPartition, offsetAndMetadata) =>
val (magicValue, timestamp)= getMessageFormatVersionAndTimestamp(partitionFor(groupId))
new Message(
key = GroupMetadataManager.offsetCommitKey(groupId,topicAndPartition.topic,topicAndPartition.partition),
bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata),
timestamp = timestamp,
magicValue = magicValue
)
}.toSeq
val offsetTopicPartition = new TopicPartition(
TopicConstants.GROUP_METADATA_TOPIC_NAME,partitionFor(groupId))
val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))

       做完以上工作,该方法还定义了一个回调函数putCacheCallback,这个回调函数的作用是当offset相关信息被append到log文件之后,再将相关信息写入cache,写入cache的方法是GroupMetadataManager的putOffset方法,相关代码如下:

1
2
3
4
5
6
7
8
9
/**
* Put the (already committed) offset for the given group/topic/partition into the cache.
*
* @param key The group-topic-partition
* @param offsetAndMetadata The offset/metadata to be stored
*/
private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
offsetsCache.put(key, offsetAndMetadata)
}

       最后,返回一个DelayedStore对象,参数就是之前生成的offsetTopicPartition对象和putCacheCallback回调函数。在GroupCoordinator.handleCommitOffsets方法的最后,将这个DelayedStore对象作为参数传入GroupMetadataManager.store方法中,代码如下:

1
delayedOffsetStore.foreach(groupManager.store)

(3)GroupMetadataManager.store

       这个方法很简单,入参是之前prepareStoreOffsets方法生成的DelayedStore对象,然后调用appendMessages方法:

1
2
3
4
5
6
7
8
9
def store(delayedAppend: DelayedStore) {
// call replica manager to append the group message
replicaManager.appendMessages(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
true, // allow appending to internal offset topic
delayedAppend.messageSet,
delayedAppend.callback)
}

(4)ReplicaManager.appendMessages

       这个方法的作用是将offset信息append到分区的leader replicas,并等待信息被复制到其他replicas,当超时或收到满足条件的acks数时会调用之前入参的回调函数(delayedAppend.callback).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val sTime = SystemTime.milliseconds
val localProduceResults = appendToLocalLog(internalTopicsAllowed,
messagesPerPartition,
requiredAcks)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
val produceStatus = localProduceResults.map {
case (topicPartition, result) =>
topicPartition ->
ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.errorCode, result.info.firstOffset, result.info.timestamp)
)
}

(5)ReplicaManager.appendToLocalLog

       appendToLocalLog方法内会调用partition.appendMessagesToLeader方法将offset信息append到kafka管理offset的内部topic的leader对应的本地的log文件内。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
try {
val partitionOpt = getPartition(topicPartition.topic, topicPartition.partition)
val info = partitionOpt match {
case Some(partition) =>
partition.appendMessagesToLeader(
messages.asInstanceOf[ByteBufferMessageSet],
requiredAcks
)
case None => throw new UnknownTopicOrPartitionException(
"Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))
}
val numAppendedMessages =
if (info.firstOffset == -1L || info.lastOffset == -1L)
0
else
info.lastOffset - info.firstOffset + 1
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).bytesInRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerTopicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numAppendedMessages)
trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
.format(messages.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
(topicPartition, LogAppendResult(info))
}

(6)Partition.appendMessagesToLeader

       appendMessagesToLeader方法会在确保leader存在且isr列表size大于minInSyncReplicas的情况下,调用Log.append方法进行append操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
val leaderReplicaOpt = leaderReplicaIfLocal()
leaderReplicaOpt match {
case Some(leaderReplica) =>
val log = leaderReplica.log.get
val minIsr = log.config.minInSyncReplicas
val inSyncSize = inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
.format(topic, partitionId, inSyncSize, minIsr))
}
val info = log.append(messages, assignOffsets = true)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(new TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderReplica))
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
.format(topic, partitionId, localBrokerId))
}
}

(7)Log.append

       Log.append方法主要是执行了以下两个方法,先是调用LogSegment.append方法append信息到log文件,然后再更新这个log文件的最新offset。

1
2
3
4
// now append to the log
segment.append(appendInfo.firstOffset, validMessages)
// increment the log end offset
updateLogEndOffset(appendInfo.lastOffset + 1)

(8)最后的写入文件的步骤

LogSegment.append:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}

FileMessageSet.append:

1
2
3
4
def append(messages: ByteBufferMessageSet) {
val written = messages.writeFullyTo(channel)
_size.getAndAdd(written)
}

ByteBufferMessageSet.writeFullyTo:

1
2
3
4
5
6
7
8
def writeFullyTo(channel: GatheringByteChannel): Int = {
buffer.mark()
var written = 0
while (written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
written
}

       最后使用java nio的FileChannel类的write方法,从buffer中读取offset信息并写入到channel中,最终写入到文件中。

1
2
3
4
5
6
7
8
def writeFullyTo(channel: GatheringByteChannel): Int = {
buffer.mark()
var written = 0
while (written < sizeInBytes)
written += channel.write(buffer)
buffer.reset()
written
}