package org.apache.kafka.clients.producer;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetrics;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.Sender;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.springframework.web.servlet.tags.BindErrorsTag;

/* loaded from: input_file:WEB-INF/lib/kafka-clients-3.1.2.jar:org/apache/kafka/clients/producer/KafkaProducer.class */
public class KafkaProducer<K, V> implements Producer<K, V> {
    private final Logger log;
    private static final String JMX_PREFIX = "kafka.producer";
    public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread";
    public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics";
    private final String clientId;
    final Metrics metrics;
    private final KafkaProducerMetrics producerMetrics;
    private final Partitioner partitioner;
    private final int maxRequestSize;
    private final long totalMemorySize;
    private final ProducerMetadata metadata;
    private final RecordAccumulator accumulator;
    private final Sender sender;
    private final Thread ioThread;
    private final CompressionType compressionType;
    private final Sensor errors;
    private final Time time;
    private final Serializer<K> keySerializer;
    private final Serializer<V> valueSerializer;
    private final ProducerConfig producerConfig;
    private final long maxBlockTimeMs;
    private final ProducerInterceptors<K, V> interceptors;
    private final ApiVersions apiVersions;
    private final TransactionManager transactionManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kafka-clients-3.1.2.jar:org/apache/kafka/clients/producer/KafkaProducer$ClusterAndWaitTime.class */
    public static class ClusterAndWaitTime {
        final Cluster cluster;
        final long waitedOnMetadataMs;

        ClusterAndWaitTime(Cluster cluster, long j) {
            this.cluster = cluster;
            this.waitedOnMetadataMs = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kafka-clients-3.1.2.jar:org/apache/kafka/clients/producer/KafkaProducer$FutureFailure.class */
    public static class FutureFailure implements Future<RecordMetadata> {
        private final ExecutionException exception;

        public FutureFailure(Exception exc) {
            this.exception = new ExecutionException(exc);
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return false;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get() throws ExecutionException {
            throw this.exception;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public RecordMetadata get(long j, TimeUnit timeUnit) throws ExecutionException {
            throw this.exception;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/kafka-clients-3.1.2.jar:org/apache/kafka/clients/producer/KafkaProducer$InterceptorCallback.class */
    public static class InterceptorCallback<K, V> implements Callback {
        private final Callback userCallback;
        private final ProducerInterceptors<K, V> interceptors;
        private final TopicPartition tp;

        private InterceptorCallback(Callback callback, ProducerInterceptors<K, V> producerInterceptors, TopicPartition topicPartition) {
            this.userCallback = callback;
            this.interceptors = producerInterceptors;
            this.tp = topicPartition;
        }

        @Override // org.apache.kafka.clients.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            RecordMetadata recordMetadata2 = recordMetadata != null ? recordMetadata : new RecordMetadata(this.tp, -1L, -1, -1L, -1, -1);
            this.interceptors.onAcknowledgement(recordMetadata2, exc);
            if (this.userCallback != null) {
                this.userCallback.onCompletion(recordMetadata2, exc);
            }
        }
    }

    public KafkaProducer(Map<String, Object> map) {
        this(map, (Serializer) null, (Serializer) null);
    }

    public KafkaProducer(Map<String, Object> map, Serializer<K> serializer, Serializer<V> serializer2) {
        this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(map, serializer, serializer2)), serializer, serializer2, null, null, null, Time.SYSTEM);
    }

    public KafkaProducer(Properties properties) {
        this(properties, (Serializer) null, (Serializer) null);
    }

    public KafkaProducer(Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        this(Utils.propsToMap(properties), serializer, serializer2);
    }

    KafkaProducer(ProducerConfig producerConfig, Serializer<K> serializer, Serializer<V> serializer2, ProducerMetadata producerMetadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> producerInterceptors, Time time) {
        try {
            this.producerConfig = producerConfig;
            this.time = time;
            String string = producerConfig.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
            this.clientId = producerConfig.getString("client.id");
            LogContext logContext = string == null ? new LogContext(String.format("[Producer clientId=%s] ", this.clientId)) : new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, string));
            this.log = logContext.logger(KafkaProducer.class);
            this.log.trace("Starting the Kafka producer");
            MetricConfig tags = new MetricConfig().samples(producerConfig.getInt("metrics.num.samples").intValue()).timeWindow(producerConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).recordLevel(Sensor.RecordingLevel.forName(producerConfig.getString("metrics.recording.level"))).tags(Collections.singletonMap(ClientQuotaEntity.CLIENT_ID, this.clientId));
            List<?> configuredInstances = producerConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
            JmxReporter jmxReporter = new JmxReporter();
            jmxReporter.configure(producerConfig.originals(Collections.singletonMap("client.id", this.clientId)));
            configuredInstances.add(jmxReporter);
            this.metrics = new Metrics(tags, (List<MetricsReporter>) configuredInstances, time, (MetricsContext) new KafkaMetricsContext(JMX_PREFIX, producerConfig.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)));
            this.producerMetrics = new KafkaProducerMetrics(this.metrics);
            this.partitioner = (Partitioner) producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class, Collections.singletonMap("client.id", this.clientId));
            long longValue = producerConfig.getLong("retry.backoff.ms").longValue();
            if (serializer == null) {
                this.keySerializer = (Serializer) producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
                this.keySerializer.configure(producerConfig.originals(Collections.singletonMap("client.id", this.clientId)), true);
            } else {
                producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
                this.keySerializer = serializer;
            }
            if (serializer2 == null) {
                this.valueSerializer = (Serializer) producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
                this.valueSerializer.configure(producerConfig.originals(Collections.singletonMap("client.id", this.clientId)), false);
            } else {
                producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                this.valueSerializer = serializer2;
            }
            List<?> configuredInstances2 = producerConfig.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class, Collections.singletonMap("client.id", this.clientId));
            if (producerInterceptors != null) {
                this.interceptors = producerInterceptors;
            } else {
                this.interceptors = new ProducerInterceptors<>(configuredInstances2);
            }
            ClusterResourceListeners configureClusterResourceListeners = configureClusterResourceListeners(serializer, serializer2, configuredInstances2, configuredInstances);
            this.maxRequestSize = producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG).intValue();
            this.totalMemorySize = producerConfig.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG).longValue();
            this.compressionType = CompressionType.forName(producerConfig.getString("compression.type"));
            this.maxBlockTimeMs = producerConfig.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG).longValue();
            int configureDeliveryTimeout = configureDeliveryTimeout(producerConfig, this.log);
            this.apiVersions = new ApiVersions();
            this.transactionManager = configureTransactionState(producerConfig, logContext);
            this.accumulator = new RecordAccumulator(logContext, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG).intValue(), this.compressionType, lingerMs(producerConfig), longValue, configureDeliveryTimeout, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, producerConfig.getInt(ProducerConfig.BATCH_SIZE_CONFIG).intValue(), this.metrics, time, "producer-metrics"));
            List<InetSocketAddress> parseAndValidateAddresses = ClientUtils.parseAndValidateAddresses(producerConfig.getList("bootstrap.servers"), producerConfig.getString("client.dns.lookup"));
            if (producerMetadata != null) {
                this.metadata = producerMetadata;
            } else {
                this.metadata = new ProducerMetadata(longValue, producerConfig.getLong("metadata.max.age.ms").longValue(), producerConfig.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG).longValue(), logContext, configureClusterResourceListeners, Time.SYSTEM);
                this.metadata.bootstrap(parseAndValidateAddresses);
            }
            this.errors = this.metrics.sensor(BindErrorsTag.ERRORS_VARIABLE_NAME);
            this.sender = newSender(logContext, kafkaClient, this.metadata);
            this.ioThread = new KafkaThread("kafka-producer-network-thread | " + this.clientId, this.sender, true);
            this.ioThread.start();
            producerConfig.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka producer started");
        } catch (Throwable th) {
            close(Duration.ofMillis(0L), true);
            throw new KafkaException("Failed to construct kafka producer", th);
        }
    }

    KafkaProducer(ProducerConfig producerConfig, LogContext logContext, Metrics metrics, Serializer<K> serializer, Serializer<V> serializer2, ProducerMetadata producerMetadata, RecordAccumulator recordAccumulator, TransactionManager transactionManager, Sender sender, ProducerInterceptors<K, V> producerInterceptors, Partitioner partitioner, Time time, KafkaThread kafkaThread) {
        this.producerConfig = producerConfig;
        this.time = time;
        this.clientId = producerConfig.getString("client.id");
        this.log = logContext.logger(KafkaProducer.class);
        this.metrics = metrics;
        this.producerMetrics = new KafkaProducerMetrics(metrics);
        this.partitioner = partitioner;
        this.keySerializer = serializer;
        this.valueSerializer = serializer2;
        this.interceptors = producerInterceptors;
        this.maxRequestSize = producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG).intValue();
        this.totalMemorySize = producerConfig.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG).longValue();
        this.compressionType = CompressionType.forName(producerConfig.getString("compression.type"));
        this.maxBlockTimeMs = producerConfig.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG).longValue();
        this.apiVersions = new ApiVersions();
        this.transactionManager = transactionManager;
        this.accumulator = recordAccumulator;
        this.errors = this.metrics.sensor(BindErrorsTag.ERRORS_VARIABLE_NAME);
        this.metadata = producerMetadata;
        this.sender = sender;
        this.ioThread = kafkaThread;
    }

    Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata producerMetadata) {
        int intValue = this.producerConfig.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).intValue();
        int intValue2 = this.producerConfig.getInt("request.timeout.ms").intValue();
        ChannelBuilder createChannelBuilder = ClientUtils.createChannelBuilder(this.producerConfig, this.time, logContext);
        ProducerMetrics producerMetrics = new ProducerMetrics(this.metrics);
        return new Sender(logContext, kafkaClient != null ? kafkaClient : new NetworkClient(new Selector(this.producerConfig.getLong("connections.max.idle.ms").longValue(), this.metrics, this.time, "producer", createChannelBuilder, logContext), producerMetadata, this.clientId, intValue, this.producerConfig.getLong("reconnect.backoff.ms").longValue(), this.producerConfig.getLong("reconnect.backoff.max.ms").longValue(), this.producerConfig.getInt("send.buffer.bytes").intValue(), this.producerConfig.getInt("receive.buffer.bytes").intValue(), intValue2, this.producerConfig.getLong("socket.connection.setup.timeout.ms").longValue(), this.producerConfig.getLong("socket.connection.setup.timeout.max.ms").longValue(), this.time, true, this.apiVersions, Sender.throttleTimeSensor(producerMetrics.senderMetrics), logContext), producerMetadata, this.accumulator, intValue == 1, this.producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG).intValue(), Short.parseShort(this.producerConfig.getString(ProducerConfig.ACKS_CONFIG)), this.producerConfig.getInt("retries").intValue(), producerMetrics.senderMetrics, this.time, intValue2, this.producerConfig.getLong("retry.backoff.ms").longValue(), this.transactionManager, this.apiVersions);
    }

    private static int lingerMs(ProducerConfig producerConfig) {
        return (int) Math.min(producerConfig.getLong(ProducerConfig.LINGER_MS_CONFIG).longValue(), 2147483647L);
    }

    private static int configureDeliveryTimeout(ProducerConfig producerConfig, Logger logger) {
        int intValue = producerConfig.getInt(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG).intValue();
        int min = (int) Math.min(lingerMs(producerConfig) + producerConfig.getInt("request.timeout.ms").intValue(), 2147483647L);
        if (intValue < min) {
            if (producerConfig.originals().containsKey(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)) {
                throw new ConfigException("delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms");
            }
            intValue = min;
            logger.warn("{} should be equal to or larger than {} + {}. Setting it to {}.", ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, ProducerConfig.LINGER_MS_CONFIG, "request.timeout.ms", Integer.valueOf(intValue));
        }
        return intValue;
    }

    private TransactionManager configureTransactionState(ProducerConfig producerConfig, LogContext logContext) {
        TransactionManager transactionManager = null;
        if (producerConfig.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG).booleanValue()) {
            transactionManager = new TransactionManager(logContext, producerConfig.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG), producerConfig.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG).intValue(), producerConfig.getLong("retry.backoff.ms").longValue(), this.apiVersions);
            if (transactionManager.isTransactional()) {
                this.log.info("Instantiated a transactional producer.");
            } else {
                this.log.info("Instantiated an idempotent producer.");
            }
        }
        return transactionManager;
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void initTransactions() {
        throwIfNoTransactionManager();
        throwIfProducerClosed();
        long nanoseconds = this.time.nanoseconds();
        TransactionalRequestResult initializeTransactions = this.transactionManager.initializeTransactions();
        this.sender.wakeup();
        initializeTransactions.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordInit(this.time.nanoseconds() - nanoseconds);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void beginTransaction() throws ProducerFencedException {
        throwIfNoTransactionManager();
        throwIfProducerClosed();
        long nanoseconds = this.time.nanoseconds();
        this.transactionManager.beginTransaction();
        this.producerMetrics.recordBeginTxn(this.time.nanoseconds() - nanoseconds);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    @Deprecated
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) throws ProducerFencedException {
        sendOffsetsToTransaction(map, new ConsumerGroupMetadata(str));
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) throws ProducerFencedException {
        throwIfInvalidGroupMetadata(consumerGroupMetadata);
        throwIfNoTransactionManager();
        throwIfProducerClosed();
        if (map.isEmpty()) {
            return;
        }
        long nanoseconds = this.time.nanoseconds();
        TransactionalRequestResult sendOffsetsToTransaction = this.transactionManager.sendOffsetsToTransaction(map, consumerGroupMetadata);
        this.sender.wakeup();
        sendOffsetsToTransaction.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordSendOffsets(this.time.nanoseconds() - nanoseconds);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void commitTransaction() throws ProducerFencedException {
        throwIfNoTransactionManager();
        throwIfProducerClosed();
        long nanoseconds = this.time.nanoseconds();
        TransactionalRequestResult beginCommit = this.transactionManager.beginCommit();
        this.sender.wakeup();
        beginCommit.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordCommitTxn(this.time.nanoseconds() - nanoseconds);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void abortTransaction() throws ProducerFencedException {
        throwIfNoTransactionManager();
        throwIfProducerClosed();
        this.log.info("Aborting incomplete transaction");
        long nanoseconds = this.time.nanoseconds();
        TransactionalRequestResult beginAbort = this.transactionManager.beginAbort();
        this.sender.wakeup();
        beginAbort.await(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
        this.producerMetrics.recordAbortTxn(this.time.nanoseconds() - nanoseconds);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
        return send(producerRecord, null);
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
        return doSend(this.interceptors.onSend(producerRecord), callback);
    }

    private void throwIfProducerClosed() {
        if (this.sender == null || !this.sender.isRunning()) {
            throw new IllegalStateException("Cannot perform operation after producer has been closed");
        }
    }

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> producerRecord, Callback callback) {
        TopicPartition topicPartition = null;
        try {
            throwIfProducerClosed();
            long milliseconds = this.time.milliseconds();
            try {
                ClusterAndWaitTime waitOnMetadata = waitOnMetadata(producerRecord.topic(), producerRecord.partition(), milliseconds, this.maxBlockTimeMs);
                long j = milliseconds + waitOnMetadata.waitedOnMetadataMs;
                long max = Math.max(0L, this.maxBlockTimeMs - waitOnMetadata.waitedOnMetadataMs);
                Cluster cluster = waitOnMetadata.cluster;
                try {
                    byte[] serialize = this.keySerializer.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.key());
                    try {
                        byte[] serialize2 = this.valueSerializer.serialize(producerRecord.topic(), producerRecord.headers(), producerRecord.value());
                        int partition = partition(producerRecord, serialize, serialize2, cluster);
                        TopicPartition topicPartition2 = new TopicPartition(producerRecord.topic(), partition);
                        setReadOnly(producerRecord.headers());
                        Header[] array = producerRecord.headers().toArray();
                        ensureValidRecordSize(AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serialize, serialize2, array));
                        long longValue = producerRecord.timestamp() == null ? j : producerRecord.timestamp().longValue();
                        if (this.log.isTraceEnabled()) {
                            this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", producerRecord, callback, producerRecord.topic(), Integer.valueOf(partition));
                        }
                        RecordAccumulator.RecordAppendResult append = this.accumulator.append(topicPartition2, longValue, serialize, serialize2, array, new InterceptorCallback(callback, this.interceptors, topicPartition2), max, true, j);
                        if (append.abortForNewBatch) {
                            this.partitioner.onNewBatch(producerRecord.topic(), cluster, partition);
                            partition = partition(producerRecord, serialize, serialize2, cluster);
                            topicPartition2 = new TopicPartition(producerRecord.topic(), partition);
                            if (this.log.isTraceEnabled()) {
                                this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", producerRecord.topic(), Integer.valueOf(partition), Integer.valueOf(partition));
                            }
                            append = this.accumulator.append(topicPartition2, longValue, serialize, serialize2, array, new InterceptorCallback(callback, this.interceptors, topicPartition2), max, false, j);
                        }
                        if (this.transactionManager != null) {
                            this.transactionManager.maybeAddPartition(topicPartition2);
                        }
                        if (append.batchIsFull || append.newBatchCreated) {
                            this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", producerRecord.topic(), Integer.valueOf(partition));
                            this.sender.wakeup();
                        }
                        return append.future;
                    } catch (ClassCastException e) {
                        throw new SerializationException("Can't convert value of class " + producerRecord.value().getClass().getName() + " to class " + this.producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", e);
                    }
                } catch (ClassCastException e2) {
                    throw new SerializationException("Can't convert key of class " + producerRecord.key().getClass().getName() + " to class " + this.producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", e2);
                }
            } catch (KafkaException e3) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", e3);
                }
                throw e3;
            }
        } catch (InterruptedException e4) {
            this.errors.record();
            this.interceptors.onSendError(producerRecord, null, e4);
            throw new InterruptException(e4);
        } catch (ApiException e5) {
            this.log.debug("Exception occurred during message send:", (Throwable) e5);
            if (0 == 0) {
                topicPartition = ProducerInterceptors.extractTopicPartition(producerRecord);
            }
            new InterceptorCallback(callback, this.interceptors, topicPartition).onCompletion(null, e5);
            this.errors.record();
            this.interceptors.onSendError(producerRecord, topicPartition, e5);
            return new FutureFailure(e5);
        } catch (KafkaException e6) {
            this.errors.record();
            this.interceptors.onSendError(producerRecord, null, e6);
            throw e6;
        } catch (Exception e7) {
            this.interceptors.onSendError(producerRecord, null, e7);
            throw e7;
        }
    }

    private void setReadOnly(Headers headers) {
        if (headers instanceof RecordHeaders) {
            ((RecordHeaders) headers).setReadOnly();
        }
    }

    private ClusterAndWaitTime waitOnMetadata(String str, Integer num, long j, long j2) throws InterruptedException {
        Cluster fetch;
        Cluster fetch2 = this.metadata.fetch();
        if (fetch2.invalidTopics().contains(str)) {
            throw new InvalidTopicException(str);
        }
        this.metadata.add(str, j);
        Integer partitionCountForTopic = fetch2.partitionCountForTopic(str);
        if (partitionCountForTopic != null && (num == null || num.intValue() < partitionCountForTopic.intValue())) {
            return new ClusterAndWaitTime(fetch2, 0L);
        }
        long j3 = j2;
        long j4 = 0;
        while (true) {
            if (num != null) {
                this.log.trace("Requesting metadata update for partition {} of topic {}.", num, str);
            } else {
                this.log.trace("Requesting metadata update for topic {}.", str);
            }
            this.metadata.add(str, j + j4);
            int requestUpdateForTopic = this.metadata.requestUpdateForTopic(str);
            this.sender.wakeup();
            try {
                this.metadata.awaitUpdate(requestUpdateForTopic, j3);
                fetch = this.metadata.fetch();
                j4 = this.time.milliseconds() - j;
                if (j4 >= j2) {
                    throw new TimeoutException(partitionCountForTopic == null ? String.format("Topic %s not present in metadata after %d ms.", str, Long.valueOf(j2)) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", num, str, partitionCountForTopic, Long.valueOf(j2)));
                }
                this.metadata.maybeThrowExceptionForTopic(str);
                j3 = j2 - j4;
                partitionCountForTopic = fetch.partitionCountForTopic(str);
                if (partitionCountForTopic == null || (num != null && num.intValue() >= partitionCountForTopic.intValue())) {
                }
            } catch (TimeoutException e) {
                throw new TimeoutException(String.format("Topic %s not present in metadata after %d ms.", str, Long.valueOf(j2)));
            }
        }
        return new ClusterAndWaitTime(fetch, j4);
    }

    private void ensureValidRecordSize(int i) {
        if (i > this.maxRequestSize) {
            throw new RecordTooLargeException("The message is " + i + " bytes when serialized which is larger than " + this.maxRequestSize + ", which is the value of the " + ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
        }
        if (i > this.totalMemorySize) {
            throw new RecordTooLargeException("The message is " + i + " bytes when serialized which is larger than the total memory buffer you have configured with the " + ProducerConfig.BUFFER_MEMORY_CONFIG + " configuration.");
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void flush() {
        this.log.trace("Flushing accumulated records in producer.");
        long nanoseconds = this.time.nanoseconds();
        this.accumulator.beginFlush();
        this.sender.wakeup();
        try {
            try {
                this.accumulator.awaitFlushCompletion();
                this.producerMetrics.recordFlush(this.time.nanoseconds() - nanoseconds);
            } catch (InterruptedException e) {
                throw new InterruptException("Flush interrupted.", e);
            }
        } catch (Throwable th) {
            this.producerMetrics.recordFlush(this.time.nanoseconds() - nanoseconds);
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public List<PartitionInfo> partitionsFor(String str) {
        Objects.requireNonNull(str, "topic cannot be null");
        try {
            return waitOnMetadata(str, null, this.time.milliseconds(), this.maxBlockTimeMs).cluster.partitionsForTopic(str);
        } catch (InterruptedException e) {
            throw new InterruptException(e);
        }
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.clients.producer.Producer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Duration.ofMillis(Long.MAX_VALUE));
    }

    @Override // org.apache.kafka.clients.producer.Producer
    public void close(Duration duration) {
        close(duration, false);
    }

    private void close(Duration duration, boolean z) {
        long millis = duration.toMillis();
        if (millis < 0) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        this.log.info("Closing the Kafka producer with timeoutMillis = {} ms.", Long.valueOf(millis));
        AtomicReference atomicReference = new AtomicReference();
        boolean z2 = Thread.currentThread() == this.ioThread;
        if (millis > 0) {
            if (z2) {
                this.log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", Long.valueOf(millis));
            } else {
                if (this.sender != null) {
                    this.sender.initiateClose();
                }
                if (this.ioThread != null) {
                    try {
                        this.ioThread.join(millis);
                    } catch (InterruptedException e) {
                        atomicReference.compareAndSet(null, new InterruptException(e));
                        this.log.error("Interrupted while joining ioThread", (Throwable) e);
                    }
                }
            }
        }
        if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
            this.log.info("Proceeding to force close the producer since pending requests could not be completed within timeout {} ms.", Long.valueOf(millis));
            this.sender.forceClose();
            if (!z2) {
                try {
                    this.ioThread.join();
                } catch (InterruptedException e2) {
                    atomicReference.compareAndSet(null, new InterruptException(e2));
                }
            }
        }
        Utils.closeQuietly(this.interceptors, "producer interceptors", atomicReference);
        Utils.closeQuietly(this.producerMetrics, "producer metrics wrapper", atomicReference);
        Utils.closeQuietly(this.metrics, "producer metrics", atomicReference);
        Utils.closeQuietly(this.keySerializer, "producer keySerializer", atomicReference);
        Utils.closeQuietly(this.valueSerializer, "producer valueSerializer", atomicReference);
        Utils.closeQuietly(this.partitioner, "producer partitioner", atomicReference);
        AppInfoParser.unregisterAppInfo(JMX_PREFIX, this.clientId, this.metrics);
        Throwable th = (Throwable) atomicReference.get();
        if (th == null || z) {
            this.log.debug("Kafka producer has been closed");
        } else {
            if (!(th instanceof InterruptException)) {
                throw new KafkaException("Failed to close kafka producer", th);
            }
            throw ((InterruptException) th);
        }
    }

    private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> serializer, Serializer<V> serializer2, List<?>... listArr) {
        ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
        for (List<?> list : listArr) {
            clusterResourceListeners.maybeAddAll(list);
        }
        clusterResourceListeners.maybeAdd(serializer);
        clusterResourceListeners.maybeAdd(serializer2);
        return clusterResourceListeners;
    }

    private int partition(ProducerRecord<K, V> producerRecord, byte[] bArr, byte[] bArr2, Cluster cluster) {
        Integer partition = producerRecord.partition();
        return partition != null ? partition.intValue() : this.partitioner.partition(producerRecord.topic(), producerRecord.key(), bArr, producerRecord.value(), bArr2, cluster);
    }

    private void throwIfInvalidGroupMetadata(ConsumerGroupMetadata consumerGroupMetadata) {
        if (consumerGroupMetadata == null) {
            throw new IllegalArgumentException("Consumer group metadata could not be null");
        }
        if (consumerGroupMetadata.generationId() > 0 && "".equals(consumerGroupMetadata.memberId())) {
            throw new IllegalArgumentException("Passed in group metadata " + consumerGroupMetadata + " has generationId > 0 but member.id ");
        }
    }

    private void throwIfNoTransactionManager() {
        if (this.transactionManager == null) {
            throw new IllegalStateException("Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property");
        }
    }

    String getClientId() {
        return this.clientId;
    }
}
