package org.springframework.amqp.rabbit.connection;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Command;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.LongString;
import com.rabbitmq.client.Method;
import com.rabbitmq.client.ReturnCallback;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.recovery.AutorecoveringChannel;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:WEB-INF/lib/spring-rabbit-2.2.7.RELEASE.jar:org/springframework/amqp/rabbit/connection/PublisherCallbackChannelImpl.class */
public class PublisherCallbackChannelImpl implements PublisherCallbackChannel, ConfirmListener, ReturnListener, ShutdownListener {
    private static final MessagePropertiesConverter CONVERTER = new DefaultMessagePropertiesConverter();
    private static final long RETURN_CALLBACK_TIMEOUT = 60;
    private final Channel delegate;
    private final ExecutorService executor;
    private volatile Consumer<Channel> afterAckCallback;
    private boolean hasReturned;
    private final Log logger = LogFactory.getLog(getClass());
    private final ConcurrentMap<String, PublisherCallbackChannel.Listener> listeners = new ConcurrentHashMap();
    private final Map<PublisherCallbackChannel.Listener, SortedMap<Long, PendingConfirm>> pendingConfirms = new ConcurrentHashMap();
    private final Map<String, PendingConfirm> pendingReturns = new ConcurrentHashMap();
    private final SortedMap<Long, PublisherCallbackChannel.Listener> listenerForSeq = new ConcurrentSkipListMap();
    private final CountDownLatch returnLatch = new CountDownLatch(1);

    public PublisherCallbackChannelImpl(Channel channel, ExecutorService executorService) {
        Assert.notNull(executorService, "'executor' must not be null");
        this.delegate = channel;
        this.executor = executorService;
        channel.addShutdownListener(this);
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public synchronized void setAfterAckCallback(Consumer<Channel> consumer) {
        if (getPendingConfirmsCount() != 0 || consumer == null) {
            this.afterAckCallback = consumer;
        } else {
            consumer.accept(this);
        }
    }

    @Override // com.rabbitmq.client.ShutdownNotifier
    public void addShutdownListener(ShutdownListener shutdownListener) {
        this.delegate.addShutdownListener(shutdownListener);
    }

    @Override // com.rabbitmq.client.ShutdownNotifier
    public void removeShutdownListener(ShutdownListener shutdownListener) {
        this.delegate.removeShutdownListener(shutdownListener);
    }

    @Override // com.rabbitmq.client.ShutdownNotifier
    public ShutdownSignalException getCloseReason() {
        return this.delegate.getCloseReason();
    }

    @Override // com.rabbitmq.client.ShutdownNotifier
    public void notifyListeners() {
        this.delegate.notifyListeners();
    }

    @Override // com.rabbitmq.client.ShutdownNotifier
    public boolean isOpen() {
        return this.delegate.isOpen();
    }

    @Override // com.rabbitmq.client.Channel
    public int getChannelNumber() {
        return this.delegate.getChannelNumber();
    }

    @Override // com.rabbitmq.client.Channel
    public com.rabbitmq.client.Connection getConnection() {
        return this.delegate.getConnection();
    }

    @Override // com.rabbitmq.client.Channel
    public void close(int i, String str) throws IOException, TimeoutException {
        this.delegate.close(i, str);
        if (this.delegate instanceof AutorecoveringChannel) {
            ClosingRecoveryListener.removeChannel((AutorecoveringChannel) this.delegate);
        }
    }

    @Override // com.rabbitmq.client.Channel
    public void abort() throws IOException {
        this.delegate.abort();
    }

    @Override // com.rabbitmq.client.Channel
    public void abort(int i, String str) throws IOException {
        this.delegate.abort(i, str);
    }

    @Override // com.rabbitmq.client.Channel
    public com.rabbitmq.client.Consumer getDefaultConsumer() {
        return this.delegate.getDefaultConsumer();
    }

    @Override // com.rabbitmq.client.Channel
    public void setDefaultConsumer(com.rabbitmq.client.Consumer consumer) {
        this.delegate.setDefaultConsumer(consumer);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicQos(int i, int i2, boolean z) throws IOException {
        this.delegate.basicQos(i, i2, z);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicQos(int i, boolean z) throws IOException {
        this.delegate.basicQos(i, z);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicQos(int i) throws IOException {
        this.delegate.basicQos(i);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicPublish(String str, String str2, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        this.delegate.basicPublish(str, str2, basicProperties, bArr);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicPublish(String str, String str2, boolean z, boolean z2, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        this.delegate.basicPublish(str, str2, z, basicProperties, bArr);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicPublish(String str, String str2, boolean z, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        this.delegate.basicPublish(str, str2, z, basicProperties, bArr);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, String str2) throws IOException {
        return this.delegate.exchangeDeclare(str, str2);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, BuiltinExchangeType builtinExchangeType) throws IOException {
        return this.delegate.exchangeDeclare(str, builtinExchangeType);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, String str2, boolean z) throws IOException {
        return this.delegate.exchangeDeclare(str, str2, z);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, BuiltinExchangeType builtinExchangeType, boolean z) throws IOException {
        return this.delegate.exchangeDeclare(str, builtinExchangeType, z);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, String str2, boolean z, boolean z2, Map<String, Object> map) throws IOException {
        return this.delegate.exchangeDeclare(str, str2, z, z2, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, BuiltinExchangeType builtinExchangeType, boolean z, boolean z2, Map<String, Object> map) throws IOException {
        return this.delegate.exchangeDeclare(str, builtinExchangeType, z, z2, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, String str2, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws IOException {
        return this.delegate.exchangeDeclare(str, str2, z, z2, z3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclare(String str, BuiltinExchangeType builtinExchangeType, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws IOException {
        return this.delegate.exchangeDeclare(str, builtinExchangeType, z, z2, z3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeclareOk exchangeDeclarePassive(String str) throws IOException {
        return this.delegate.exchangeDeclarePassive(str);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeleteOk exchangeDelete(String str, boolean z) throws IOException {
        return this.delegate.exchangeDelete(str, z);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.DeleteOk exchangeDelete(String str) throws IOException {
        return this.delegate.exchangeDelete(str);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.BindOk exchangeBind(String str, String str2, String str3) throws IOException {
        return this.delegate.exchangeBind(str, str2, str3);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.BindOk exchangeBind(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        return this.delegate.exchangeBind(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.UnbindOk exchangeUnbind(String str, String str2, String str3) throws IOException {
        return this.delegate.exchangeUnbind(str, str2, str3);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Exchange.UnbindOk exchangeUnbind(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        return this.delegate.exchangeUnbind(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.DeclareOk queueDeclare() throws IOException {
        return this.delegate.queueDeclare();
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.DeclareOk queueDeclare(String str, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws IOException {
        return this.delegate.queueDeclare(str, z, z2, z3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.DeclareOk queueDeclarePassive(String str) throws IOException {
        return this.delegate.queueDeclarePassive(str);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.DeleteOk queueDelete(String str) throws IOException {
        return this.delegate.queueDelete(str);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.DeleteOk queueDelete(String str, boolean z, boolean z2) throws IOException {
        return this.delegate.queueDelete(str, z, z2);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.BindOk queueBind(String str, String str2, String str3) throws IOException {
        return this.delegate.queueBind(str, str2, str3);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.BindOk queueBind(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        return this.delegate.queueBind(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.UnbindOk queueUnbind(String str, String str2, String str3) throws IOException {
        return this.delegate.queueUnbind(str, str2, str3);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.UnbindOk queueUnbind(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        return this.delegate.queueUnbind(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Queue.PurgeOk queuePurge(String str) throws IOException {
        return this.delegate.queuePurge(str);
    }

    @Override // com.rabbitmq.client.Channel
    public GetResponse basicGet(String str, boolean z) throws IOException {
        return this.delegate.basicGet(str, z);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicAck(long j, boolean z) throws IOException {
        this.delegate.basicAck(j, z);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicNack(long j, boolean z, boolean z2) throws IOException {
        this.delegate.basicNack(j, z, z2);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicReject(long j, boolean z) throws IOException {
        this.delegate.basicReject(j, z);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, com.rabbitmq.client.Consumer consumer) throws IOException {
        return this.delegate.basicConsume(str, consumer);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        return this.delegate.basicConsume(str, deliverCallback, cancelCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, deliverCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, deliverCallback, cancelCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, com.rabbitmq.client.Consumer consumer) throws IOException {
        return this.delegate.basicConsume(str, z, consumer);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        return this.delegate.basicConsume(str, z, deliverCallback, cancelCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, deliverCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, deliverCallback, cancelCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, com.rabbitmq.client.Consumer consumer) throws IOException {
        return this.delegate.basicConsume(str, z, str2, consumer);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        return this.delegate.basicConsume(str, z, str2, deliverCallback, cancelCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, str2, deliverCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, str2, deliverCallback, cancelCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, Map<String, Object> map, com.rabbitmq.client.Consumer consumer) throws IOException {
        return this.delegate.basicConsume(str, z, map, consumer);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, Map<String, Object> map, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        return this.delegate.basicConsume(str, z, map, deliverCallback, cancelCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, Map<String, Object> map, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, map, deliverCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, Map<String, Object> map, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, map, deliverCallback, cancelCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, boolean z2, boolean z3, Map<String, Object> map, com.rabbitmq.client.Consumer consumer) throws IOException {
        return this.delegate.basicConsume(str, z, str2, z2, z3, map, consumer);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, boolean z2, boolean z3, Map<String, Object> map, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException {
        return this.delegate.basicConsume(str, z, str2, z2, z3, map, deliverCallback, cancelCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, boolean z2, boolean z3, Map<String, Object> map, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, str2, z2, z3, map, deliverCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public String basicConsume(String str, boolean z, String str2, boolean z2, boolean z3, Map<String, Object> map, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback consumerShutdownSignalCallback) throws IOException {
        return this.delegate.basicConsume(str, z, str2, z2, z3, map, deliverCallback, cancelCallback, consumerShutdownSignalCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public void basicCancel(String str) throws IOException {
        this.delegate.basicCancel(str);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Basic.RecoverOk basicRecover() throws IOException {
        return this.delegate.basicRecover();
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Basic.RecoverOk basicRecover(boolean z) throws IOException {
        return this.delegate.basicRecover(z);
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Tx.SelectOk txSelect() throws IOException {
        return this.delegate.txSelect();
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Tx.CommitOk txCommit() throws IOException {
        return this.delegate.txCommit();
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Tx.RollbackOk txRollback() throws IOException {
        return this.delegate.txRollback();
    }

    @Override // com.rabbitmq.client.Channel
    public AMQP.Confirm.SelectOk confirmSelect() throws IOException {
        return this.delegate.confirmSelect();
    }

    @Override // com.rabbitmq.client.Channel
    public long getNextPublishSeqNo() {
        return this.delegate.getNextPublishSeqNo();
    }

    @Override // com.rabbitmq.client.Channel
    public boolean waitForConfirms() throws InterruptedException {
        return this.delegate.waitForConfirms();
    }

    @Override // com.rabbitmq.client.Channel
    public boolean waitForConfirms(long j) throws InterruptedException, TimeoutException {
        return this.delegate.waitForConfirms(j);
    }

    @Override // com.rabbitmq.client.Channel
    public void waitForConfirmsOrDie() throws IOException, InterruptedException {
        this.delegate.waitForConfirmsOrDie();
    }

    @Override // com.rabbitmq.client.Channel
    public void waitForConfirmsOrDie(long j) throws IOException, InterruptedException, TimeoutException {
        this.delegate.waitForConfirmsOrDie(j);
    }

    @Override // com.rabbitmq.client.Channel
    public void asyncRpc(Method method) throws IOException {
        this.delegate.asyncRpc(method);
    }

    @Override // com.rabbitmq.client.Channel
    public Command rpc(Method method) throws IOException {
        return this.delegate.rpc(method);
    }

    @Override // com.rabbitmq.client.Channel
    public void addConfirmListener(ConfirmListener confirmListener) {
        this.delegate.addConfirmListener(confirmListener);
    }

    @Override // com.rabbitmq.client.Channel
    public ConfirmListener addConfirmListener(ConfirmCallback confirmCallback, ConfirmCallback confirmCallback2) {
        return this.delegate.addConfirmListener(confirmCallback, confirmCallback2);
    }

    @Override // com.rabbitmq.client.Channel
    public boolean removeConfirmListener(ConfirmListener confirmListener) {
        return this.delegate.removeConfirmListener(confirmListener);
    }

    @Override // com.rabbitmq.client.Channel
    public void clearConfirmListeners() {
        this.delegate.clearConfirmListeners();
    }

    @Override // com.rabbitmq.client.Channel
    public void addReturnListener(ReturnListener returnListener) {
        this.delegate.addReturnListener(returnListener);
    }

    @Override // com.rabbitmq.client.Channel
    public ReturnListener addReturnListener(ReturnCallback returnCallback) {
        return this.delegate.addReturnListener(returnCallback);
    }

    @Override // com.rabbitmq.client.Channel
    public boolean removeReturnListener(ReturnListener returnListener) {
        return this.delegate.removeReturnListener(returnListener);
    }

    @Override // com.rabbitmq.client.Channel
    public synchronized void clearReturnListeners() {
        this.delegate.clearReturnListeners();
    }

    @Override // com.rabbitmq.client.Channel
    public void exchangeBindNoWait(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        this.delegate.exchangeBind(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public void exchangeDeclareNoWait(String str, String str2, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws IOException {
        this.delegate.exchangeDeclareNoWait(str, str2, z, z2, z3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public void exchangeDeclareNoWait(String str, BuiltinExchangeType builtinExchangeType, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws IOException {
        this.delegate.exchangeDeclareNoWait(str, builtinExchangeType, z, z2, z3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public void exchangeDeleteNoWait(String str, boolean z) throws IOException {
        this.delegate.exchangeDeleteNoWait(str, z);
    }

    @Override // com.rabbitmq.client.Channel
    public void exchangeUnbindNoWait(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        this.delegate.exchangeUnbindNoWait(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public void queueBindNoWait(String str, String str2, String str3, Map<String, Object> map) throws IOException {
        this.delegate.queueBindNoWait(str, str2, str3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public void queueDeclareNoWait(String str, boolean z, boolean z2, boolean z3, Map<String, Object> map) throws IOException {
        this.delegate.queueDeclareNoWait(str, z, z2, z3, map);
    }

    @Override // com.rabbitmq.client.Channel
    public void queueDeleteNoWait(String str, boolean z, boolean z2) throws IOException {
        this.delegate.queueDeleteNoWait(str, z, z2);
    }

    @Override // com.rabbitmq.client.Channel
    public long consumerCount(String str) throws IOException {
        return this.delegate.consumerCount(str);
    }

    @Override // com.rabbitmq.client.Channel
    public CompletableFuture<Command> asyncCompletableRpc(Method method) throws IOException {
        return this.delegate.asyncCompletableRpc(method);
    }

    @Override // com.rabbitmq.client.Channel
    public long messageCount(String str) throws IOException {
        return this.delegate.messageCount(str);
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public Channel getDelegate() {
        return this.delegate;
    }

    @Override // com.rabbitmq.client.Channel, java.lang.AutoCloseable
    public void close() throws IOException, TimeoutException {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Closing " + this.delegate);
        }
        try {
            this.delegate.close();
        } catch (AlreadyClosedException e) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace(this.delegate + " is already closed");
            }
        }
        shutdownCompleted("Channel closed by application");
    }

    private void shutdownCompleted(String str) {
        this.executor.execute(() -> {
            generateNacksForPendingAcks(str);
        });
    }

    private synchronized void generateNacksForPendingAcks(String str) {
        for (Map.Entry<PublisherCallbackChannel.Listener, SortedMap<Long, PendingConfirm>> entry : this.pendingConfirms.entrySet()) {
            PublisherCallbackChannel.Listener key = entry.getKey();
            for (Map.Entry<Long, PendingConfirm> entry2 : entry.getValue().entrySet()) {
                entry2.getValue().setCause(str);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug(toString() + " PC:Nack:(close):" + entry2.getKey());
                }
                processAck(entry2.getKey().longValue(), false, false, false);
            }
            key.revoke(this);
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("PendingConfirms cleared");
        }
        this.pendingConfirms.clear();
        this.listenerForSeq.clear();
        this.listeners.clear();
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public synchronized int getPendingConfirmsCount(PublisherCallbackChannel.Listener listener) {
        SortedMap<Long, PendingConfirm> sortedMap = this.pendingConfirms.get(listener);
        if (sortedMap == null) {
            return 0;
        }
        return sortedMap.entrySet().size();
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public synchronized int getPendingConfirmsCount() {
        return this.pendingConfirms.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum();
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public void addListener(PublisherCallbackChannel.Listener listener) {
        Assert.notNull(listener, "Listener cannot be null");
        if (this.listeners.size() == 0) {
            this.delegate.addConfirmListener(this);
            this.delegate.addReturnListener(this);
        }
        if (this.listeners.putIfAbsent(listener.getUUID(), listener) == null) {
            this.pendingConfirms.put(listener, new ConcurrentSkipListMap());
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Added listener " + listener);
            }
        }
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public synchronized Collection<PendingConfirm> expire(PublisherCallbackChannel.Listener listener, long j) {
        SortedMap<Long, PendingConfirm> sortedMap = this.pendingConfirms.get(listener);
        if (sortedMap == null) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Long, PendingConfirm>> it = sortedMap.entrySet().iterator();
        while (it.hasNext()) {
            PendingConfirm value = it.next().getValue();
            if (value.getTimestamp() >= j) {
                break;
            }
            arrayList.add(value);
            it.remove();
            CorrelationData correlationData = value.getCorrelationData();
            if (correlationData != null && StringUtils.hasText(correlationData.getId())) {
                this.pendingReturns.remove(correlationData.getId());
            }
        }
        return arrayList;
    }

    @Override // com.rabbitmq.client.ConfirmListener
    public void handleAck(long j, boolean z) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(toString() + " PC:Ack:" + j + ":" + z);
        }
        processAck(j, true, z, true);
    }

    @Override // com.rabbitmq.client.ConfirmListener
    public void handleNack(long j, boolean z) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(toString() + " PC:Nack:" + j + ":" + z);
        }
        processAck(j, false, z, true);
    }

    private synchronized void processAck(long j, boolean z, boolean z2, boolean z3) {
        try {
            doProcessAck(j, z, z2, z3);
        } catch (Exception e) {
            this.logger.error("Failed to process publisher confirm", e);
        }
    }

    private void doProcessAck(long j, boolean z, boolean z2, boolean z3) {
        if (z2) {
            processMultipleAck(j, z);
            return;
        }
        PublisherCallbackChannel.Listener remove = this.listenerForSeq.remove(Long.valueOf(j));
        if (remove == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(this.delegate.toString() + " No listener for seq:" + j);
                return;
            }
            return;
        }
        SortedMap<Long, PendingConfirm> sortedMap = this.pendingConfirms.get(remove);
        PendingConfirm pendingConfirm = null;
        if (sortedMap != null) {
            pendingConfirm = z3 ? sortedMap.remove(Long.valueOf(j)) : sortedMap.get(Long.valueOf(j));
        }
        if (pendingConfirm != null) {
            CorrelationData correlationData = pendingConfirm.getCorrelationData();
            if (correlationData != null) {
                correlationData.getFuture().set(new CorrelationData.Confirm(z, pendingConfirm.getCause()));
                if (StringUtils.hasText(correlationData.getId())) {
                    this.pendingReturns.remove(correlationData.getId());
                }
            }
            doHandleConfirm(z, remove, pendingConfirm);
        }
    }

    private void processMultipleAck(long j, boolean z) {
        SortedMap<Long, PublisherCallbackChannel.Listener> headMap = this.listenerForSeq.headMap(Long.valueOf(j + 1));
        for (PublisherCallbackChannel.Listener listener : new HashSet(headMap.values())) {
            SortedMap<Long, PendingConfirm> sortedMap = this.pendingConfirms.get(listener);
            if (sortedMap != null) {
                Iterator<Map.Entry<Long, PendingConfirm>> it = sortedMap.headMap(Long.valueOf(j + 1)).entrySet().iterator();
                while (it.hasNext()) {
                    PendingConfirm value = it.next().getValue();
                    CorrelationData correlationData = value.getCorrelationData();
                    if (correlationData != null) {
                        correlationData.getFuture().set(new CorrelationData.Confirm(z, value.getCause()));
                        if (StringUtils.hasText(correlationData.getId())) {
                            this.pendingReturns.remove(correlationData.getId());
                        }
                    }
                    it.remove();
                    doHandleConfirm(z, listener, value);
                }
            }
        }
        Iterator it2 = new ArrayList(headMap.keySet()).iterator();
        while (it2.hasNext()) {
            this.listenerForSeq.remove((Long) it2.next());
        }
    }

    private void doHandleConfirm(boolean z, PublisherCallbackChannel.Listener listener, PendingConfirm pendingConfirm) {
        this.executor.execute(() -> {
            try {
                try {
                    if (listener.isConfirmListener()) {
                        if (this.hasReturned && !this.returnLatch.await(60L, TimeUnit.SECONDS)) {
                            this.logger.error("Return callback failed to execute in 60 seconds");
                        }
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Sending confirm " + pendingConfirm);
                        }
                        listener.handleConfirm(pendingConfirm, z);
                    }
                    try {
                        if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
                            this.afterAckCallback.accept(this);
                            this.afterAckCallback = null;
                        }
                    } catch (Exception e) {
                        this.logger.error("Failed to invoke afterAckCallback", e);
                    }
                } catch (Exception e2) {
                    this.logger.error("Exception delivering confirm", e2);
                    try {
                        if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
                            this.afterAckCallback.accept(this);
                            this.afterAckCallback = null;
                        }
                    } catch (Exception e3) {
                        this.logger.error("Failed to invoke afterAckCallback", e3);
                    }
                }
            } catch (Throwable th) {
                try {
                    if (this.afterAckCallback != null && getPendingConfirmsCount() == 0) {
                        this.afterAckCallback.accept(this);
                        this.afterAckCallback = null;
                    }
                } catch (Exception e4) {
                    this.logger.error("Failed to invoke afterAckCallback", e4);
                }
                throw th;
            }
        });
    }

    @Override // org.springframework.amqp.rabbit.connection.PublisherCallbackChannel
    public synchronized void addPendingConfirm(PublisherCallbackChannel.Listener listener, long j, PendingConfirm pendingConfirm) {
        SortedMap<Long, PendingConfirm> sortedMap = this.pendingConfirms.get(listener);
        Assert.notNull(sortedMap, "Listener not registered: " + listener + " " + this.pendingConfirms.keySet());
        sortedMap.put(Long.valueOf(j), pendingConfirm);
        this.listenerForSeq.put(Long.valueOf(j), listener);
        if (pendingConfirm.getCorrelationData() != null) {
            String id = pendingConfirm.getCorrelationData().getId();
            if (StringUtils.hasText(id)) {
                this.pendingReturns.put(id, pendingConfirm);
            }
        }
    }

    @Override // com.rabbitmq.client.ReturnListener
    public void handleReturn(int i, String str, String str2, String str3, AMQP.BasicProperties basicProperties, byte[] bArr) {
        PendingConfirm remove;
        LongString longString = (LongString) basicProperties.getHeaders().get(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY);
        if (longString != null && (remove = this.pendingReturns.remove(longString.toString())) != null) {
            MessageProperties messageProperties = CONVERTER.toMessageProperties(basicProperties, new Envelope(0L, false, str2, str3), StandardCharsets.UTF_8.name());
            if (remove.getCorrelationData() != null) {
                remove.getCorrelationData().setReturnedMessage(new Message(bArr, messageProperties));
            }
        }
        Object obj = basicProperties.getHeaders().get(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY);
        String str4 = null;
        if (obj != null) {
            str4 = obj.toString();
        }
        PublisherCallbackChannel.Listener listener = null;
        if (str4 != null) {
            listener = this.listeners.get(str4);
        } else {
            this.logger.error("No 'spring_listener_return_correlation' header in returned message");
        }
        if (listener == null || !listener.isReturnListener()) {
            if (this.logger.isWarnEnabled()) {
                this.logger.warn("No Listener for returned message");
            }
        } else {
            this.hasReturned = true;
            PublisherCallbackChannel.Listener listener2 = listener;
            this.executor.execute(() -> {
                try {
                    try {
                        listener2.handleReturn(i, str, str2, str3, basicProperties, bArr);
                        this.returnLatch.countDown();
                    } catch (Exception e) {
                        this.logger.error("Exception delivering returned message ", e);
                        this.returnLatch.countDown();
                    }
                } catch (Throwable th) {
                    this.returnLatch.countDown();
                    throw th;
                }
            });
        }
    }

    @Override // com.rabbitmq.client.ShutdownListener
    public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
        shutdownCompleted(shutdownSignalException.getMessage());
    }

    public int hashCode() {
        return this.delegate.hashCode();
    }

    public boolean equals(Object obj) {
        return obj == this || this.delegate.equals(obj);
    }

    public String toString() {
        return "PublisherCallbackChannelImpl: " + this.delegate.toString();
    }

    public static PublisherCallbackChannelFactory factory() {
        return (channel, executorService) -> {
            return new PublisherCallbackChannelImpl(channel, executorService);
        };
    }
}
