kafka 源码分析 4 : broker 处理生产请求

Kafka broker上对于produce生产者生产消息的处理

Kafka Server处理生成者请求

入口在KafkaApis.scala, 通过request.header.apikey判断消息类型

def handle(request: RequestChannel.Request) {
try {
  trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
    format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
  ApiKeys.forId(request.header.apiKey) match {
    case ApiKeys.PRODUCE => handleProduceRequest(request)


// call the replica manager to append messages to the replicas
    timeout = produceRequest.timeout.toLong,
    requiredAcks = produceRequest.acks,
    internalTopicsAllowed = internalTopicsAllowed,
    isFromClient = true,
    entriesPerPartition = authorizedRequestInfo,
    responseCallback = sendResponseCallback)
  // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
  // hence we clear its data here inorder to let GC re-claim its memory since it is already appended to log

appendRecords 先写消息到partition的leader上,如果requireAcks==-1说明需要所有isr都写入成功才返回response,而isr同样作为leader的消费者来拉取的

  * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
  * the callback function will be triggered either when timeout or the required acks are satisfied;
  * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
 def appendRecords(timeout: Long,
                   requiredAcks: Short,
                   internalTopicsAllowed: Boolean,
                   isFromClient: Boolean,
                   entriesPerPartition: Map[TopicPartition, MemoryRecords],
                   responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                   delayedProduceLock: Option[Object] = None) {
   if (isValidRequiredAcks(requiredAcks)) {
     val sTime = time.milliseconds
     val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
       isFromClient = isFromClient, entriesPerPartition, requiredAcks)
     debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
     val produceStatus = localProduceResults.map { case (topicPartition, result) =>
       topicPartition ->
                 result.info.lastOffset + 1, // required offset
                 new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
     // 1. required acks = -1
     // 2. there is data to append
     // 3. at least one partition append was successful (fewer errors than partitions)
     if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
       // create delayed produce operation
       val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
       val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
       // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
       val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
       // try to complete the request immediately, otherwise put it into the purgatory
       // this is because while the delayed produce operation is being created, new
       // requests may arrive and hence make this operation completable.
       delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
     } else {
       // we can respond immediately
       val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
   } else {
     // If required.acks is outside accepted range, something is wrong with the client
     // Just return an error and don't handle the request at all
     val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
       topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
         LogAppendInfo.UnknownLogAppendInfo.firstOffset, RecordBatch.NO_TIMESTAMP)


  * Append the messages to the local replica logs
 private def appendToLocalLog(internalTopicsAllowed: Boolean,
                              isFromClient: Boolean,
                              entriesPerPartition: Map[TopicPartition, MemoryRecords],
                              requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
   trace("Append [%s] to local log ".format(entriesPerPartition))
   entriesPerPartition.map { case (topicPartition, records) =>
     // reject appending to internal topics if it is not allowed
     if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
       (topicPartition, LogAppendResult(
         Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
     } else {
       try {
         val partitionOpt = getPartition(topicPartition)
         val info = partitionOpt match {
           case Some(partition) =>
             if (partition eq ReplicaManager.OfflinePartition)
               throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId")
             partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
           case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d"
             .format(topicPartition, localBrokerId))
         val numAppendedMessages =
           if (info.firstOffset == -1L || info.lastOffset == -1L)
             info.lastOffset - info.firstOffset + 1
         // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
         trace("%d bytes written to log %s-%d beginning at offset %d and ending at offset %d"
           .format(records.sizeInBytes, topicPartition.topic, topicPartition.partition, info.firstOffset, info.lastOffset))
         (topicPartition, LogAppendResult(info))
       } catch {
         // NOTE: Failed produce requests metric is not incremented for known exceptions
         // it is supposed to indicate un-expected failures of a broker in handling a produce request
         case e@ (_: UnknownTopicOrPartitionException |
                  _: NotLeaderForPartitionException |
                  _: RecordTooLargeException |
                  _: RecordBatchTooLargeException |
                  _: CorruptRecordException |
                  _: KafkaStorageException |
                  _: InvalidTimestampException) =>
           (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
         case t: Throwable =>
           error("Error processing append operation on partition %s".format(topicPartition), t)
           (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(t)))


def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = {
    val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
      leaderReplicaIfLocal match {
        case Some(leaderReplica) =>
          val log = leaderReplica.log.get
          val minIsr = log.config.minInSyncReplicas
          val inSyncSize = inSyncReplicas.size
          // Avoid writing to leader if there are not enough insync replicas to make it safe
          if (inSyncSize < minIsr && requiredAcks == -1) {
            throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
              .format(topicPartition, inSyncSize, minIsr))
          val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
          // probably unblock some follower fetch requests since log end offset has been updated
          replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
          // we may need to increment high watermark since ISR could be down to 1
          (info, maybeIncrementLeaderHW(leaderReplica))
        case None =>
          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
            .format(topicPartition, localBrokerId))
    // some delayed operations may be unblocked after HW changed
    if (leaderHWIncremented)


private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
   maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
     val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
     // return if we have no valid messages or if this is a duplicate of the last appended entry
     if (appendInfo.shallowCount == 0)
       return appendInfo
     // trim any invalid bytes or partial messages before appending it to the on-disk log
     var validRecords = trimInvalidBytes(records, appendInfo)
     // they are valid, insert them in the log
     lock synchronized {
       if (assignOffsets) {
         // assign offsets to the message set
         val offset = new LongRef(nextOffsetMetadata.messageOffset)
         appendInfo.firstOffset = offset.value
         val now = time.milliseconds
         val validateAndOffsetAssignResult = try {
         } catch {
           case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
         validRecords = validateAndOffsetAssignResult.validatedRecords
         appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
         appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
         appendInfo.lastOffset = offset.value - 1
         if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
           appendInfo.logAppendTime = now
         // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
         // format conversion)
         if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
           for (batch <- validRecords.batches.asScala) {
             if (batch.sizeInBytes > config.maxMessageSize) {
               // we record the original message set size instead of the trimmed size
               // to be consistent with pre-compression bytesRejectedRate recording
               throw new RecordTooLargeException("Message batch size is %d bytes which exceeds the maximum configured size of %d."
                 .format(batch.sizeInBytes, config.maxMessageSize))
       } else {
         // we are taking the offsets we are given
         if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
           throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
       // update the epoch cache with the epoch stamped onto the message by the leader
       validRecords.batches.asScala.foreach { batch =>
         if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
           leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
       // check messages set size may be exceed config.segmentSize
       if (validRecords.sizeInBytes > config.segmentSize) {
         throw new RecordBatchTooLargeException("Message batch size is %d bytes which exceeds the maximum configured segment size of %d."
           .format(validRecords.sizeInBytes, config.segmentSize))
       // now that we have valid records, offsets assigned, and timestamps updated, we need to
       // validate the idempotent/transactional state of the producers and collect some metadata
       val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(validRecords, isFromClient)
       maybeDuplicate.foreach { duplicate =>
         appendInfo.firstOffset = duplicate.firstOffset
         appendInfo.lastOffset = duplicate.lastOffset
         appendInfo.logAppendTime = duplicate.timestamp
         return appendInfo
       // 如果segment满了则换一个新的segment
       // maybe roll the log if this segment is full
       val segment = maybeRoll(messagesSize = validRecords.sizeInBytes,
         maxTimestampInMessages = appendInfo.maxTimestamp,
         maxOffsetInMessages = appendInfo.lastOffset)
       val logOffsetMetadata = LogOffsetMetadata(
         messageOffset = appendInfo.firstOffset,
         segmentBaseOffset = segment.baseOffset,
         relativePositionInSegment = segment.size)
       // 由segment写入
       segment.append(firstOffset = appendInfo.firstOffset,
         largestOffset = appendInfo.lastOffset,
         largestTimestamp = appendInfo.maxTimestamp,
         shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
         records = validRecords)
       // update the producer state
       for ((producerId, producerAppendInfo) <- updatedProducers) {
       // update the transaction index with the true last stable offset. The last offset visible
       // to consumers using READ_COMMITTED will be limited by this value and the high watermark.
       for (completedTxn <- completedTxns) {
         val lastStableOffset = producerStateManager.completeTxn(completedTxn)
         segment.updateTxnIndex(completedTxn, lastStableOffset)
       // always update the last producer id map offset so that the snapshot reflects the current offset
       // even if there isn't any idempotent data being written
       producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
       // increment the log end offset
       updateLogEndOffset(appendInfo.lastOffset + 1)
       // update the first unstable offset (which is used to compute LSO)
       trace("Appended message set to log %s with first offset: %d, next offset: %d, and messages: %s"
         .format(this.name, appendInfo.firstOffset, nextOffsetMetadata.messageOffset, validRecords))
       // 如果超过了刷新间隔,则调用一次fsync
       if (unflushedMessages >= config.flushInterval)


def append(firstOffset: Long,
         largestOffset: Long,
         largestTimestamp: Long,
         shallowOffsetOfMaxTimestamp: Long,
         records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
  trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at shallow offset %d"
      .format(records.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, shallowOffsetOfMaxTimestamp))
  val physicalPosition = log.sizeInBytes()
  if (physicalPosition == 0)
    rollingBasedTimestamp = Some(largestTimestamp)
  // append the messages
  require(canConvertToRelativeOffset(largestOffset), "largest offset in message set can not be safely converted to relative offset.")
  val appendedBytes = log.append(records)
  trace(s"Appended $appendedBytes to ${log.file()} at offset $firstOffset")
  // Update the in memory max timestamp and corresponding offset.
  if (largestTimestamp > maxTimestampSoFar) {
    maxTimestampSoFar = largestTimestamp
    offsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp
  // append an entry to the index (if needed)
  if(bytesSinceLastIndexEntry > indexIntervalBytes) {
    index.append(firstOffset, physicalPosition)
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
    bytesSinceLastIndexEntry = 0
  bytesSinceLastIndexEntry += records.sizeInBytes


public int append(MemoryRecords records) throws IOException {
    int written = records.writeFullyTo(channel);
    return written;

通过FileChannel write到磁盘

 * Write all records to the given channel (including partial records).
 * @param channel The channel to write to
 * @return The number of bytes written
 * @throws IOException For any IO errors writing to the channel
public int writeFullyTo(GatheringByteChannel channel) throws IOException {
    int written = 0;
    while (written < sizeInBytes())
        written += channel.write(buffer);
    return written;




Comment form

(*) 表示必填项