/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.rabbit.stream.producer;

import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.ProducerBuilder;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
import org.springframework.rabbit.stream.producer.RabbitStreamOperations;
import org.springframework.rabbit.stream.producer.StreamSendException;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.rabbit.stream.support.converter.DefaultStreamMessageConverter;
import org.springframework.rabbit.stream.support.converter.StreamMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

public class RabbitStreamTemplate
implements RabbitStreamOperations,
BeanNameAware {
    protected final LogAccessor logger = new LogAccessor(this.getClass());
    private final Environment environment;
    private final String streamName;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter();
    private boolean streamConverterSet;
    private Producer producer;
    private String beanName;
    private ProducerCustomizer producerCustomizer = (name, builder) -> {};

    public RabbitStreamTemplate(Environment environment, String streamName) {
        Assert.notNull((Object)environment, (String)"'environment' cannot be null");
        Assert.notNull((Object)streamName, (String)"'streamName' cannot be null");
        this.environment = environment;
        this.streamName = streamName;
    }

    private synchronized Producer createOrGetProducer() {
        if (this.producer == null) {
            ProducerBuilder builder = this.environment.producerBuilder();
            builder.stream(this.streamName);
            this.producerCustomizer.accept(this.beanName, builder);
            this.producer = builder.build();
            if (!this.streamConverterSet) {
                ((DefaultStreamMessageConverter)this.streamConverter).setBuilderSupplier(() -> this.producer.messageBuilder());
            }
        }
        return this.producer;
    }

    public synchronized void setBeanName(String name) {
        this.beanName = name;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' cannot be null");
        this.messageConverter = messageConverter;
    }

    public synchronized void setStreamConverter(StreamMessageConverter streamConverter) {
        Assert.notNull((Object)streamConverter, (String)"'streamConverter' cannot be null");
        this.streamConverter = streamConverter;
        this.streamConverterSet = true;
    }

    public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
        Assert.notNull((Object)producerCustomizer, (String)"'producerCustomizer' cannot be null");
        this.producerCustomizer = producerCustomizer;
    }

    @Override
    public MessageConverter messageConverter() {
        return this.messageConverter;
    }

    @Override
    public StreamMessageConverter streamMessageConverter() {
        return this.streamConverter;
    }

    @Override
    public ListenableFuture<Boolean> send(org.springframework.amqp.core.Message message) {
        SettableListenableFuture future = new SettableListenableFuture();
        this.createOrGetProducer().send(this.streamConverter.fromMessage(message), this.handleConfirm((SettableListenableFuture<Boolean>)future));
        return future;
    }

    @Override
    public ListenableFuture<Boolean> convertAndSend(Object message) {
        return this.convertAndSend(message, null);
    }

    @Override
    public ListenableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp) {
        org.springframework.amqp.core.Message message2 = this.messageConverter.toMessage(message, (MessageProperties)new StreamMessageProperties());
        Assert.notNull((Object)message2, (String)"The message converter returned null");
        if (mpp != null && (message2 = mpp.postProcessMessage(message2)) == null) {
            this.logger.debug((CharSequence)"Message Post Processor returned null, message not sent");
            SettableListenableFuture future = new SettableListenableFuture();
            future.set((Object)false);
            return future;
        }
        return this.send(message2);
    }

    @Override
    public ListenableFuture<Boolean> send(Message message) {
        SettableListenableFuture future = new SettableListenableFuture();
        this.createOrGetProducer().send(message, this.handleConfirm((SettableListenableFuture<Boolean>)future));
        return future;
    }

    @Override
    public MessageBuilder messageBuilder() {
        return this.createOrGetProducer().messageBuilder();
    }

    private ConfirmationHandler handleConfirm(SettableListenableFuture<Boolean> future) {
        return confStatus -> {
            if (confStatus.isConfirmed()) {
                future.set((Object)true);
            } else {
                String errorMessage;
                short code = confStatus.getCode();
                switch (code) {
                    case 10001: {
                        errorMessage = "Message Enqueueing Failed";
                        break;
                    }
                    case 10003: {
                        errorMessage = "Producer Closed";
                        break;
                    }
                    case 10002: {
                        errorMessage = "Producer Not Available";
                        break;
                    }
                    case 10004: {
                        errorMessage = "Publish Confirm Timeout";
                        break;
                    }
                    default: {
                        errorMessage = "Unknown code: " + code;
                    }
                }
                future.setException((Throwable)((Object)new StreamSendException(errorMessage, code)));
            }
        };
    }

    @Override
    public synchronized void close() {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
    }
}

