1. <dd id="erndk"></dd>
                1. Kafka 生產者源碼

                  互聯網 2022/5/2 9:13:25

                  2.1 初始化2.1.1 程序入口 從用戶自己編寫的 main 方法開始閱讀 package com.atguigu.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.Pr…

                  image

                  2.1 初始化

                  image

                  image

                  2.1.1 程序入口

                  從用戶自己編寫的 main 方法開始閱讀

                  package com.atguigu.kafka.producer;
                  
                  import org.apache.kafka.clients.producer.KafkaProducer;
                  import org.apache.kafka.clients.producer.ProducerConfig;
                  import org.apache.kafka.clients.producer.ProducerRecord;
                  import org.apache.kafka.common.serialization.StringSerializer;
                  
                  import java.util.Properties;
                  import java.util.stream.IntStream;
                  
                  public class CustomProducer {
                      public static void main(String[] args) {
                          // 1. 創建 kafka 生產者的配置對象
                          Properties properties = new Properties();
                          // 2. 給 kafka 配置對象添加配置信息:bootstrap.servers
                          properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
                          // key,value 序列化(必須):key.serializer,value.serializer
                          properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                          properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
                          // 3. 創建 kafka 生產者對象
                          try (KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties)) {
                              // 4. 調用 send 方法,發送消息
                              IntStream.range(0, 7)
                                      .mapToObj(i -> new ProducerRecord<>("first", i, "test", "atguigu"))
                                      .forEach(kafkaProducer::send);
                          }
                      }
                  }
                  

                  2.1.2 生產者 main 線程初始化

                  點擊 main()方法中的 KafkaProducer(),跳轉到 KafkaProducer 構造方法。

                  org.apache.kafka.clients.producer.KafkaProducer#KafkaProducer(org.apache.kafka.clients.producer.ProducerConfig, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.clients.producer.internals.ProducerMetadata, org.apache.kafka.clients.KafkaClient, org.apache.kafka.clients.producer.internals.ProducerInterceptors<K,V>, org.apache.kafka.common.utils.Time)

                  // visible for testing
                  @SuppressWarnings("unchecked")
                  KafkaProducer(ProducerConfig config,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer,
                                ProducerMetadata metadata,
                                KafkaClient kafkaClient,
                                ProducerInterceptors<K, V> interceptors,
                                Time time) {
                      try {
                          this.producerConfig = config;
                          this.time = time;
                  
                          // 獲取事務ID
                          String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
                  
                          // 獲取客戶端ID
                          this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
                  
                          LogContext logContext;
                          if (transactionalId == null)
                              logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
                          else
                              logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
                          log = logContext.logger(KafkaProducer.class);
                          log.trace("Starting the Kafka producer");
                  
                          Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
                          MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                                  .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                                  .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                                  .tags(metricTags);
                          List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                                  MetricsReporter.class,
                                  Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
                          // 監控Kafka運行情況
                          JmxReporter jmxReporter = new JmxReporter();
                          jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
                          reporters.add(jmxReporter);
                          MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
                                  config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
                          this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
                          this.producerMetrics = new KafkaProducerMetrics(metrics);
                          // 獲取分區器
                          this.partitioner = config.getConfiguredInstance(
                                  ProducerConfig.PARTITIONER_CLASS_CONFIG,
                                  Partitioner.class,
                                  Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
                          long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
                          // key和value的序列化
                          if (keySerializer == null) {
                              this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                                                                                                       Serializer.class);
                              this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
                          } else {
                              config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                              this.keySerializer = keySerializer;
                          }
                          if (valueSerializer == null) {
                              this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                                                                                                         Serializer.class);
                              this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
                          } else {
                              config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                              this.valueSerializer = valueSerializer;
                          }
                  
                          // 攔截器處理(攔截器可以有多個)
                          List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                                  ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                                  ProducerInterceptor.class,
                                  Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
                          if (interceptors != null)
                              this.interceptors = interceptors;
                          else
                              this.interceptors = new ProducerInterceptors<>(interceptorList);
                          ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                                  valueSerializer, interceptorList, reporters);
                          // 控制單條日志大小 默認1m
                          this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
                          // RecordAccumulator 緩沖區大小 默認32m
                          this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
                          // 壓縮 默認是none
                          this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
                  
                          this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
                          int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
                  
                          this.apiVersions = new ApiVersions();
                          this.transactionManager = configureTransactionState(config, logContext);
                          // 緩沖區對象 默認是32m
                          // batch.size 緩沖區一批數據最大值,默認 16k
                          // 壓縮方式 默認是none
                          // linger.ms 默認是0
                          // 重試間隔時間,默認值 100ms。
                          // delivery.timeout.ms 默認值 2 分鐘。
                          // request.timeout.ms 默認值 30s。
                          this.accumulator = new RecordAccumulator(logContext,
                                  config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                  this.compressionType,
                                  lingerMs(config),
                                  retryBackoffMs,
                                  deliveryTimeoutMs,
                                  metrics,
                                  PRODUCER_METRIC_GROUP_NAME,
                                  time,
                                  apiVersions,
                                  transactionManager,
                                  new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
                  
                          // 連接上Kafka集群地址
                          List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
                                  config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                                  config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
                          // 獲取元數據
                          if (metadata != null) {
                              this.metadata = metadata;
                          } else {
                              // metadata.max.age.ms 默認值 5 分鐘。生產者每隔多久需要更新一下自己的元數據
                              // metadata.max.idle.ms 默認值 5 分鐘。網絡最多空閑時間設置,超過該閾值,就關閉該網絡
                              this.metadata = new ProducerMetadata(retryBackoffMs,
                                      config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                                      config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                                      logContext,
                                      clusterResourceListeners,
                                      Time.SYSTEM);
                              this.metadata.bootstrap(addresses);
                          }
                          this.errors = this.metrics.sensor("errors");
                          // 初始化sender線程
                          this.sender = newSender(logContext, kafkaClient, this.metadata);
                          String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
                          // 把sender線程放到后臺
                          this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
                          // 啟動sender線程
                          this.ioThread.start();
                          config.logUnused();
                          AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
                          log.debug("Kafka producer started");
                      } catch (Throwable t) {
                          // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
                          close(Duration.ofMillis(0), true);
                          // now propagate the exception
                          throw new KafkaException("Failed to construct kafka producer", t);
                      }
                  }
                  

                  2.1.3 生產者 sender 線程初始化

                  點擊 newSender()方法,查看發送線程初始化。

                  org.apache.kafka.clients.producer.KafkaProducer#newSender

                  // visible for testing
                  Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
                      // 緩存請求的個數 默認是5個
                      int maxInflightRequests = configureInflightRequests(producerConfig);
                      // 請求超時時間 默認30s
                      int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
                      ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
                      ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
                      Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
                      // 創建一個客戶端對象
                      // clientId 客戶端id
                      // maxInflightRequests 緩存請求的個數 默認是5個
                      // RECONNECT_BACKOFF_MS_CONFIG 默認值 50ms。重試時間間隔
                      // RECONNECT_BACKOFF_MAX_MS_CONFIG 默認值 1000ms。重試的總時間。每次重試失敗時,呈指數增加重試時間,直至達到此最大值
                      // SEND_BUFFER_CONFIG 默認值 128k。 socket 發送數據的緩沖區大小
                      // RECEIVE_BUFFER_CONFIG  默認值 32k。socket 接收數據的緩沖區大小
                      // requestTimeoutMs 默認值 30s。
                      // SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG 默認值 10s。生產者和服務器通信連接建立的時間。如果在超時之前沒有建立連接,將關閉通信。
                      // SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG 默認值 30s。生產者和服務器通信,每次連續連接失敗時,連接建立超時將呈指數增加,直至達到此最大值。
                      KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                              new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                                      this.metrics, time, "producer", channelBuilder, logContext),
                              metadata,
                              clientId,
                              maxInflightRequests,
                              producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                              producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
                              producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                              producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                              requestTimeoutMs,
                              producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
                              producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
                              time,
                              true,
                              apiVersions,
                              throttleTimeSensor,
                              logContext);
                      // 0:生產者發送過來不需要應答;1:leader收到,應答;-1:leader和isr隊列里面所有的都受到了應答
                      short acks = configureAcks(producerConfig, log);
                      // 創建Sender線程
                      return new Sender(logContext,
                              client,
                              metadata,
                              this.accumulator,
                              maxInflightRequests == 1,
                              producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                              acks,
                              producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
                              metricsRegistry.senderMetrics,
                              time,
                              requestTimeoutMs,
                              producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                              this.transactionManager,
                              apiVersions);
                  }
                  

                  Sender 對象被放到了一個線程中啟動,所有需要點擊 newSender()方法中的 Sender,并 找到 sender 對象中的 run()方法。

                  org.apache.kafka.clients.producer.internals.Sender#Sender

                  /**
                   * The main run loop for the sender thread
                   */
                  @Override
                  public void run() {
                      log.debug("Starting Kafka producer I/O thread.");
                  
                      // main loop, runs until close is called
                      while (running) {
                          try {
                              // sender線程從緩沖區準備拉取數據,剛啟動拉不到數據
                  
                              runOnce();
                          } catch (Exception e) {
                              log.error("Uncaught error in kafka producer I/O thread: ", e);
                          }
                      }
                      ....
                  }
                  

                  2.2 發送數據到緩沖區

                  image

                  2.2.1 發送總體流程

                  點擊自己編寫的 CustomProducer.java 中的 send()方法。

                  IntStream.range(0, 7)
                          .mapToObj(i -> new ProducerRecord<>("first", i, "test", "atguigu"))
                          .forEach(kafkaProducer::send);
                  
                  @Override
                  public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
                      return send(record, null);
                  }
                  
                  @Override
                  public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
                      // intercept the record, which can be potentially modified; this method does not throw exceptions
                      // 攔截器處理發送的數據
                      ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
                      return doSend(interceptedRecord, callback);
                  }
                  

                  點擊 onSend()方法,進行攔截器相關處理。

                  org.apache.kafka.clients.producer.internals.ProducerInterceptors#onSend

                  public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
                      ProducerRecord<K, V> interceptRecord = record;
                      for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
                          try {
                              // 攔截器處理
                              interceptRecord = interceptor.onSend(interceptRecord);
                          } catch (Exception e) {
                              // do not propagate interceptor exception, log and continue calling other interceptors
                              // be careful not to throw exception from here
                              if (record != null)
                                  log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                              else
                                  log.warn("Error executing interceptor onSend callback", e);
                          }
                      }
                      return interceptRecord;
                  }
                  

                  從攔截器處理中返回,點擊 doSend()方法。

                  org.apache.kafka.clients.producer.KafkaProducer#doSend

                  /**
                   * Implementation of asynchronously send a record to a topic.
                   */
                  private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
                      TopicPartition tp = null;
                      try {
                          throwIfProducerClosed();
                          // first make sure the metadata for the topic is available
                          long nowMs = time.milliseconds();
                          ClusterAndWaitTime clusterAndWaitTime;
                          try {
                              // 獲取元數據
                              clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
                          } catch (KafkaException e) {
                              if (metadata.isClosed())
                                  throw new KafkaException("Producer closed while send in progress", e);
                              throw e;
                          }
                          nowMs += clusterAndWaitTime.waitedOnMetadataMs;
                          long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                          Cluster cluster = clusterAndWaitTime.cluster;
                          // 序列化相關操作
                          byte[] serializedKey;
                          try {
                              serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
                          } catch (ClassCastException cce) {
                              throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                                      " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                                      " specified in key.serializer", cce);
                          }
                          byte[] serializedValue;
                          try {
                              serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
                          } catch (ClassCastException cce) {
                              throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                                      " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                                      " specified in value.serializer", cce);
                          }
                          // 分區操作
                          int partition = partition(record, serializedKey, serializedValue, cluster);
                          tp = new TopicPartition(record.topic(), partition);
                  
                          setReadOnly(record.headers());
                          Header[] headers = record.headers().toArray();
                  
                          int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                                  compressionType, serializedKey, serializedValue, headers);
                          // 保證數據大小能夠傳輸(序列化后的 壓縮后的)
                          ensureValidRecordSize(serializedSize);
                          long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
                          if (log.isTraceEnabled()) {
                              log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
                          }
                          // producer callback will make sure to call both 'callback' and interceptor callback
                          Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
                  
                          if (transactionManager != null && transactionManager.isTransactional()) {
                              transactionManager.failIfNotReadyForSend();
                          }
                          // accumulator緩存 追加數據,result是是否添加成功的結果
                          RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                                  serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
                  
                          if (result.abortForNewBatch) {
                              int prevPartition = partition;
                              partitioner.onNewBatch(record.topic(), cluster, prevPartition);
                              partition = partition(record, serializedKey, serializedValue, cluster);
                              tp = new TopicPartition(record.topic(), partition);
                              if (log.isTraceEnabled()) {
                                  log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
                              }
                              // producer callback will make sure to call both 'callback' and interceptor callback
                              interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
                  
                              result = accumulator.append(tp, timestamp, serializedKey,
                                  serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
                          }
                  
                          if (transactionManager != null && transactionManager.isTransactional())
                              transactionManager.maybeAddPartitionToTransaction(tp);
                  
                          // 批次大小已經滿了或者創建了一個新的批次
                          if (result.batchIsFull || result.newBatchCreated) {
                              log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                              // 喚醒發送線程
                              this.sender.wakeup();
                          }
                          return result.future;
                          // handling exceptions and record the errors;
                          // for API exceptions return them in the future,
                          // for other exceptions throw directly
                      } catch (ApiException e) {
                          log.debug("Exception occurred during message send:", e);
                          if (callback != null)
                              callback.onCompletion(null, e);
                          this.errors.record();
                          this.interceptors.onSendError(record, tp, e);
                          return new FutureFailure(e);
                      } catch (InterruptedException e) {
                          this.errors.record();
                          this.interceptors.onSendError(record, tp, e);
                          throw new InterruptException(e);
                      } catch (KafkaException e) {
                          this.errors.record();
                          this.interceptors.onSendError(record, tp, e);
                          throw e;
                      } catch (Exception e) {
                          // we notify interceptor about all exceptions, since onSend is called before anything else in this method
                          this.interceptors.onSendError(record, tp, e);
                          throw e;
                      }
                  }
                  

                  2.2.2 分區選擇

                  詳解默認分區規則。

                  org.apache.kafka.clients.producer.KafkaProducer#partition

                  /**
                   * computes partition for given record.
                   * if the record has partition returns the value otherwise
                   * calls configured partitioner class to compute the partition.
                   */
                  private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
                      Integer partition = record.partition();
                      // 如果指定分區,按照指定分區配置
                      return partition != null ?
                              partition :
                              // 分區器選擇分區
                              partitioner.partition(
                                      record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
                  }
                  

                  點擊 partition,跳轉到 Partitioner 接口。選中 partition,點擊 ctrl+ h,查找接口實現類

                  /**
                   * Compute the partition for the given record.
                   *
                   * @param topic The topic name
                   * @param key The key to partition on (or null if no key)
                   * @param keyBytes The serialized key to partition on( or null if no key)
                   * @param value The value to partition on or null
                   * @param valueBytes The serialized value to partition on or null
                   * @param cluster The current cluster metadata
                   */
                  int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
                  

                  選擇默認的分區器 DefaultPartitioner

                  org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster, int)

                  /**
                   * Compute the partition for the given record.
                   *
                   * @param topic The topic name
                   * @param numPartitions The number of partitions of the given {@code topic}
                   * @param key The key to partition on (or null if no key)
                   * @param keyBytes serialized key to partition on (or null if no key)
                   * @param value The value to partition on or null
                   * @param valueBytes serialized value to partition on or null
                   * @param cluster The current cluster metadata
                   */
                  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                                       int numPartitions) {
                      // 沒有指定key
                      if (keyBytes == null) {
                          // 按照粘性分區處理
                          return stickyPartitionCache.partition(topic, cluster);
                      }
                      // 如果指定key,按照key的hash值對分區取模
                      // hash the keyBytes to choose a partition
                      return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
                  }
                  

                  2.2.3 發送消息大小校驗

                  詳解緩沖區大小

                  org.apache.kafka.clients.producer.KafkaProducer#ensureValidRecordSize

                  /**
                   * Validate that the record size isn't too large
                   */
                  private void ensureValidRecordSize(int size) {
                      // 單條消息最大值 maxRequestSize 1m
                      if (size > maxRequestSize)
                          throw new RecordTooLargeException("The message is " + size +
                                  " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
                                  ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
                      // totalMemorySize 緩存大小 32m
                      if (size > totalMemorySize)
                          throw new RecordTooLargeException("The message is " + size +
                                  " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                                  ProducerConfig.BUFFER_MEMORY_CONFIG +
                                  " configuration.");
                  }
                  

                  2.2.4 內存池

                  RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                          serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
                  
                  /**
                   * Add a record to the accumulator, return the append result
                   * <p>
                   * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
                   * <p>
                   *
                   * @param tp The topic/partition to which this record is being sent
                   * @param timestamp The timestamp of the record
                   * @param key The key for the record
                   * @param value The value for the record
                   * @param headers the Headers for the record
                   * @param callback The user-supplied callback to execute when the request is complete
                   * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
                   * @param abortOnNewBatch A boolean that indicates returning before a new batch is created and
                   *                        running the partitioner's onNewBatch method before trying to append again
                   * @param nowMs The current time, in milliseconds
                   */
                  public RecordAppendResult append(TopicPartition tp,
                                                   long timestamp,
                                                   byte[] key,
                                                   byte[] value,
                                                   Header[] headers,
                                                   Callback callback,
                                                   long maxTimeToBlock,
                                                   boolean abortOnNewBatch,
                                                   long nowMs) throws InterruptedException {
                      // We keep track of the number of appending thread to make sure we do not miss batches in
                      // abortIncompleteBatches().
                      appendsInProgress.incrementAndGet();
                      ByteBuffer buffer = null;
                      if (headers == null) headers = Record.EMPTY_HEADERS;
                      try {
                          // check if we have an in-progress batch
                          // 獲取或者創建一個隊列(按照每個主題的分區)
                          Deque<ProducerBatch> dq = getOrCreateDeque(tp);
                          synchronized (dq) {
                              if (closed)
                                  throw new KafkaException("Producer closed while send in progress");
                              // 嘗試向隊列里面添加數據(正常添加不成功)
                              RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                              if (appendResult != null)
                                  return appendResult;
                          }
                  
                          // we don't have an in-progress record batch try to allocate a new batch
                          if (abortOnNewBatch) {
                              // Return a result that will cause another call to append.
                              return new RecordAppendResult(null, false, false, true);
                          }
                  
                          byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                          // 取批次大?。J 16k)和消息大小的最大值(上限默認 1m)。這樣設計的主要原因是有可能一條消息的大小大于批次大小。
                          int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                          log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
                          // 申請內存 內存池分配內存
                          buffer = free.allocate(size, maxTimeToBlock);
                  
                          // Update the current time in case the buffer allocation blocked above.
                          nowMs = time.milliseconds();
                          synchronized (dq) {
                              // Need to check if producer is closed again after grabbing the dequeue lock.
                              if (closed)
                                  throw new KafkaException("Producer closed while send in progress");
                  
                              RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
                              if (appendResult != null) {
                                  // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                                  return appendResult;
                              }
                  
                              // 分裝內存buffer
                              MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                              // 再次封裝(得到真正的批次大?。?            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
                              FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
                                      callback, nowMs));
                  
                              // 向隊列的末尾添加批次
                              dq.addLast(batch);
                              incomplete.add(batch);
                  
                              // Don't deallocate this buffer in the finally block as it's being used in the record batch
                              buffer = null;
                              return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
                          }
                      } finally {
                          // 釋放內存
                          if (buffer != null)
                              free.deallocate(buffer);
                          appendsInProgress.decrementAndGet();
                      }
                  }
                  

                  2.3 sender 線程發送數據

                  image

                  詳解發送線程。

                  if (result.batchIsFull || result.newBatchCreated) {
                      log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                      this.sender.wakeup();
                  }
                  

                  進入 sender 發送線程的 run()方法。

                  // main loop, runs until close is called
                  while (running) {
                      try {
                          runOnce();
                      } catch (Exception e) {
                          log.error("Uncaught error in kafka producer I/O thread: ", e);
                      }
                  }
                  
                  /**
                   * Run a single iteration of sending
                   *
                   */
                  void runOnce() {
                      if (transactionManager != null) {
                          try {
                              // 事務相關操作
                              transactionManager.maybeResolveSequences();
                  
                              // do not continue sending if the transaction manager is in a failed state
                              if (transactionManager.hasFatalError()) {
                                  RuntimeException lastError = transactionManager.lastError();
                                  if (lastError != null)
                                      maybeAbortBatches(lastError);
                                  client.poll(retryBackoffMs, time.milliseconds());
                                  return;
                              }
                  
                              // Check whether we need a new producerId. If so, we will enqueue an InitProducerId
                              // request which will be sent below
                              transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
                  
                              if (maybeSendAndPollTransactionalRequest()) {
                                  return;
                              }
                          } catch (AuthenticationException e) {
                              // This is already logged as error, but propagated here to perform any clean ups.
                              log.trace("Authentication exception while processing transactional request", e);
                              transactionManager.authenticationFailed(e);
                          }
                      }
                  
                      long currentTimeMs = time.milliseconds();
                      // 發送數據
                      long pollTimeout = sendProducerData(currentTimeMs);
                      // 獲取發送結果
                      client.poll(pollTimeout, currentTimeMs);
                  }
                  

                  org.apache.kafka.clients.producer.internals.Sender#sendProducerData

                  private long sendProducerData(long now) {
                      // 獲取元數據
                      Cluster cluster = metadata.fetch();
                      // 判斷32m緩存是否準備好
                      // get the list of partitions with data ready to send
                      RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
                  
                      // if there are any partitions whose leaders are not known yet, force metadata update
                      if (!result.unknownLeaderTopics.isEmpty()) {
                          // The set of topics with unknown leader contains topics with leader election pending as well as
                          // topics which may have expired. Add the topic again to metadata to ensure it is included
                          // and request metadata update, since there are messages to send to the topic.
                          for (String topic : result.unknownLeaderTopics)
                              this.metadata.add(topic, now);
                  
                          log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                              result.unknownLeaderTopics);
                          this.metadata.requestUpdate();
                      }
                  
                      // 刪除掉沒有準備好發送的數據
                      // remove any nodes we aren't ready to send to
                      Iterator<Node> iter = result.readyNodes.iterator();
                      long notReadyTimeout = Long.MAX_VALUE;
                      while (iter.hasNext()) {
                          Node node = iter.next();
                          if (!this.client.ready(node, now)) {
                              iter.remove();
                              notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
                          }
                      }
                  
                      // 發送每個節點數據,進行封裝
                      // create produce requests
                      Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
                      addToInflightBatches(batches);
                      if (guaranteeMessageOrder) {
                          // Mute all the partitions drained
                          for (List<ProducerBatch> batchList : batches.values()) {
                              for (ProducerBatch batch : batchList)
                                  this.accumulator.mutePartition(batch.topicPartition);
                          }
                      }
                  
                      accumulator.resetNextBatchExpiryTime();
                      List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
                      List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
                      expiredBatches.addAll(expiredInflightBatches);
                  
                      // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
                      // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
                      // we need to reset the producer id here.
                      if (!expiredBatches.isEmpty())
                          log.trace("Expired {} batches in accumulator", expiredBatches.size());
                      for (ProducerBatch expiredBatch : expiredBatches) {
                          String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                              + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
                          failBatch(expiredBatch, new TimeoutException(errorMessage), false);
                          if (transactionManager != null && expiredBatch.inRetry()) {
                              // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                              transactionManager.markSequenceUnresolved(expiredBatch);
                          }
                      }
                      sensors.updateProduceRequestMetrics(batches);
                  
                      // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
                      // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
                      // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
                      // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
                      // that aren't ready to send since they would cause busy looping.
                      long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
                      pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
                      pollTimeout = Math.max(pollTimeout, 0);
                      if (!result.readyNodes.isEmpty()) {
                          log.trace("Nodes with data ready to send: {}", result.readyNodes);
                          // if some partitions are already ready to be sent, the select time would be 0;
                          // otherwise if some partition already has some data accumulated but not ready yet,
                          // the select time will be the time difference between now and its linger expiry time;
                          // otherwise the select time will be the time difference between now and the metadata expiry time;
                          pollTimeout = 0;
                      }
                      // 發送請求
                      sendProduceRequests(batches, now);
                      return pollTimeout;
                  }
                  

                  org.apache.kafka.clients.producer.internals.RecordAccumulator#ready

                  /**
                   * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
                   * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
                   * partition batches.
                   * <p>
                   * A destination node is ready to send data if:
                   * <ol>
                   * <li>There is at least one partition that is not backing off its send
                   * <li><b>and</b> those partitions are not muted (to prevent reordering if
                   *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
                   *   is set to one)</li>
                   * <li><b>and <i>any</i></b> of the following are true</li>
                   * <ul>
                   *     <li>The record set is full</li>
                   *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
                   *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
                   *     are immediately considered ready).</li>
                   *     <li>The accumulator has been closed</li>
                   * </ul>
                   * </ol>
                   */
                  public ReadyCheckResult ready(Cluster cluster, long nowMs) {
                      Set<Node> readyNodes = new HashSet<>();
                      long nextReadyCheckDelayMs = Long.MAX_VALUE;
                      Set<String> unknownLeaderTopics = new HashSet<>();
                  
                      boolean exhausted = this.free.queued() > 0;
                      for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
                          Deque<ProducerBatch> deque = entry.getValue();
                          synchronized (deque) {
                              // When producing to a large number of partitions, this path is hot and deques are often empty.
                              // We check whether a batch exists first to avoid the more expensive checks whenever possible.
                              ProducerBatch batch = deque.peekFirst();
                              if (batch != null) {
                                  TopicPartition part = entry.getKey();
                                  Node leader = cluster.leaderFor(part);
                                  if (leader == null) {
                                      // This is a partition for which leader is not known, but messages are available to send.
                                      // Note that entries are currently not removed from batches when deque is empty.
                                      unknownLeaderTopics.add(part.topic());
                                  } else if (!readyNodes.contains(leader) && !isMuted(part)) {
                                      long waitedTimeMs = batch.waitedTimeMs(nowMs);
                                      // 如果不是第一次拉取該批次數據,且等待時間沒有超過重試時間,backingOff=true
                                      boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                                      // 如果 backingOff=true,選擇重試時間,如果不是重試,選擇 lingerMs
                                      long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                                      // 批次大小滿足發送條件
                                      boolean full = deque.size() > 1 || batch.isFull();
                                      // 如果超市,也要發送
                                      boolean expired = waitedTimeMs >= timeToWaitMs;
                                      boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting();
                                      boolean sendable = full
                                          || expired
                                          || exhausted
                                          || closed
                                          || flushInProgress()
                                          || transactionCompleting;
                                      if (sendable && !backingOff) {
                                          readyNodes.add(leader);
                                      } else {
                                          long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                                          // Note that this results in a conservative estimate since an un-sendable partition may have
                                          // a leader that will later be found to have sendable data. However, this is good enough
                                          // since we'll just wake up and then sleep again for the remaining time.
                                          nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                                      }
                                  }
                              }
                          }
                      }
                      return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
                  }
                  

                  org.apache.kafka.clients.producer.internals.RecordAccumulator#drain

                  /**
                   * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
                   * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
                   *
                   * @param cluster The current cluster metadata
                   * @param nodes The list of node to drain
                   * @param maxSize The maximum number of bytes to drain
                   * @param now The current unix time in milliseconds
                   * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
                   */
                  public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
                      if (nodes.isEmpty())
                          return Collections.emptyMap();
                  
                      Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
                      // 發往同一個 broker 節點的數據,打包為一個請求批次。
                      for (Node node : nodes) {
                          List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
                          batches.put(node.id(), ready);
                      }
                      return batches;
                  }
                  

                  org.apache.kafka.clients.producer.internals.Sender#sendProduceRequests

                  /**
                   * Transfer the record batches into a list of produce requests on a per-node basis
                   */
                  private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
                      for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
                          sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
                  }
                  

                  org.apache.kafka.clients.producer.internals.Sender#sendProduceRequest

                  /**
                   * Create a produce request from the given record batches
                   */
                  private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
                      if (batches.isEmpty())
                          return;
                  
                      final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
                  
                      // find the minimum magic version used when creating the record sets
                      byte minUsedMagic = apiVersions.maxUsableProduceMagic();
                      for (ProducerBatch batch : batches) {
                          if (batch.magic() < minUsedMagic)
                              minUsedMagic = batch.magic();
                      }
                      ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
                      for (ProducerBatch batch : batches) {
                          TopicPartition tp = batch.topicPartition;
                          MemoryRecords records = batch.records();
                  
                          // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
                          // that the producer starts building the batch and the time that we send the request, and we may have
                          // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
                          // the new message format, but found that the broker didn't support it, so we need to down-convert on the
                          // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
                          // not all support the same message format version. For example, if a partition migrates from a broker
                          // which is supporting the new magic version to one which doesn't, then we will need to convert.
                          if (!records.hasMatchingMagic(minUsedMagic))
                              records = batch.records().downConvert(minUsedMagic, 0, time).records();
                          ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
                          if (tpData == null) {
                              tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
                              tpd.add(tpData);
                          }
                          tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
                                  .setIndex(tp.partition())
                                  .setRecords(records));
                          recordsByPartition.put(tp, batch);
                      }
                  
                      String transactionalId = null;
                      if (transactionManager != null && transactionManager.isTransactional()) {
                          transactionalId = transactionManager.transactionalId();
                      }
                  
                      ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
                              new ProduceRequestData()
                                      .setAcks(acks)
                                      .setTimeoutMs(timeout)
                                      .setTransactionalId(transactionalId)
                                      .setTopicData(tpd));
                      RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
                  
                      String nodeId = Integer.toString(destination);
                      // 創建發送請求對象
                      ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                              requestTimeoutMs, callback);
                      // 發送請求
                      client.send(clientRequest, now);
                      log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
                  }
                  

                  org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long)

                  private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
                      ensureActive();
                      String nodeId = clientRequest.destination();
                      if (!isInternalRequest) {
                          // If this request came from outside the NetworkClient, validate
                          // that we can send data.  If the request is internal, we trust
                          // that internal code has done this validation.  Validation
                          // will be slightly different for some internal requests (for
                          // example, ApiVersionsRequests can be sent prior to being in
                          // READY state.)
                          if (!canSendRequest(nodeId, now))
                              throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
                      }
                      AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
                      try {
                          NodeApiVersions versionInfo = apiVersions.get(nodeId);
                          short version;
                          // Note: if versionInfo is null, we have no server version information. This would be
                          // the case when sending the initial ApiVersionRequest which fetches the version
                          // information itself.  It is also the case when discoverBrokerVersions is set to false.
                          if (versionInfo == null) {
                              version = builder.latestAllowedVersion();
                              if (discoverBrokerVersions && log.isTraceEnabled())
                                  log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                                          "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
                          } else {
                              version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
                                      builder.latestAllowedVersion());
                          }
                          // The call to build may also throw UnsupportedVersionException, if there are essential
                          // fields that cannot be represented in the chosen version.
                          // 發送請求
                          doSend(clientRequest, isInternalRequest, now, builder.build(version));
                      } catch (UnsupportedVersionException unsupportedVersionException) {
                          // If the version is not supported, skip sending the request over the wire.
                          // Instead, simply add it to the local queue of aborted requests.
                          log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
                                  clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
                          ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                                  clientRequest.callback(), clientRequest.destination(), now, now,
                                  false, unsupportedVersionException, null, null);
                  
                          if (!isInternalRequest)
                              abortedSends.add(clientResponse);
                          else if (clientRequest.apiKey() == ApiKeys.METADATA)
                              metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
                      }
                  }
                  

                  org.apache.kafka.clients.NetworkClient#doSend(org.apache.kafka.clients.ClientRequest, boolean, long, org.apache.kafka.common.requests.AbstractRequest)

                  private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
                      String destination = clientRequest.destination();
                      RequestHeader header = clientRequest.makeHeader(request.version());
                      if (log.isDebugEnabled()) {
                          log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
                              clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
                      }
                      Send send = request.toSend(header);
                      InFlightRequest inFlightRequest = new InFlightRequest(
                              clientRequest,
                              header,
                              isInternalRequest,
                              request,
                              send,
                              now);
                      // 添加請求到 inFlightRequests
                      this.inFlightRequests.add(inFlightRequest);
                      // 發送數據
                      selector.send(new NetworkSend(clientRequest.destination(), send));
                  }
                  

                  獲取服務器端響應

                  client.poll(pollTimeout, currentTimeMs);
                  
                  /**
                   * Do actual reads and writes to sockets.
                   *
                   * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
                   *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
                   *                metadata timeout
                   * @param now The current time in milliseconds
                   * @return The list of responses received
                   */
                  @Override
                  public List<ClientResponse> poll(long timeout, long now) {
                      ensureActive();
                  
                      if (!abortedSends.isEmpty()) {
                          // If there are aborted sends because of unsupported version exceptions or disconnects,
                          // handle them immediately without waiting for Selector#poll.
                          List<ClientResponse> responses = new ArrayList<>();
                          handleAbortedSends(responses);
                          completeResponses(responses);
                          return responses;
                      }
                  
                      long metadataTimeout = metadataUpdater.maybeUpdate(now);
                      try {
                          this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
                      } catch (IOException e) {
                          log.error("Unexpected error during I/O", e);
                      }
                  
                      // 獲取發送后的響應
                      // process completed actions
                      long updatedNow = this.time.milliseconds();
                      List<ClientResponse> responses = new ArrayList<>();
                      handleCompletedSends(responses, updatedNow);
                      handleCompletedReceives(responses, updatedNow);
                      handleDisconnections(responses, updatedNow);
                      handleConnections();
                      handleInitiateApiVersionRequests(updatedNow);
                      handleTimedOutConnections(responses, updatedNow);
                      handleTimedOutRequests(responses, updatedNow);
                      completeResponses(responses);
                  
                      return responses;
                  }
                  
                  隨時隨地學軟件編程-關注百度小程序和微信小程序
                  關于找一找教程網

                  本站文章僅代表作者觀點,不代表本站立場,所有文章非營利性免費分享。
                  本站提供了軟件編程、網站開發技術、服務器運維、人工智能等等IT技術文章,希望廣大程序員努力學習,讓我們用科技改變世界。
                  [Kafka 生產者源碼]http://www.yachtsalesaustralia.com/tech/detail-318720.html

                  贊(0)
                  關注微信小程序
                  程序員編程王-隨時隨地學編程

                  掃描二維碼或查找【程序員編程王】

                  可以隨時隨地學編程啦!

                  技術文章導航 更多>
                  国产在线拍揄自揄视频菠萝

                        1. <dd id="erndk"></dd>