package org.springframework.integration.redis.channel;

import com.digiwin.loadbalance.util.DWInstanceUtils;
import java.util.concurrent.Executor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.BroadcastCapableChannel;
import org.springframework.integration.channel.ChannelUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:WEB-INF/lib/spring-integration-redis-5.3.1.RELEASE.jar:org/springframework/integration/redis/channel/SubscribableRedisChannel.class */
public class SubscribableRedisChannel extends AbstractMessageChannel implements BroadcastCapableChannel, SmartLifecycle {
    private final RedisConnectionFactory connectionFactory;
    private final RedisTemplate redisTemplate;
    private final String topicName;
    private volatile Integer maxSubscribers;
    private volatile boolean initialized;
    private final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(true);
    private Executor taskExecutor = new SimpleAsyncTaskExecutor();
    private RedisSerializer<?> serializer = new StringRedisSerializer();
    private MessageConverter messageConverter = new SimpleMessageConverter();

    /* loaded from: input_file:WEB-INF/lib/spring-integration-redis-5.3.1.RELEASE.jar:org/springframework/integration/redis/channel/SubscribableRedisChannel$MessageListenerDelegate.class */
    private class MessageListenerDelegate {
        MessageListenerDelegate() {
        }

        public void handleMessage(Object obj) {
            Message<?> message = SubscribableRedisChannel.this.messageConverter.toMessage(obj, null);
            try {
                SubscribableRedisChannel.this.dispatcher.dispatch(message);
            } catch (MessageDispatchingException e) {
                String message2 = e.getMessage();
                throw new MessageDeliveryException(message, (message2 == null ? e.getClass().getSimpleName() : message2) + " for redis-channel '" + (StringUtils.hasText(SubscribableRedisChannel.this.topicName) ? SubscribableRedisChannel.this.topicName : DWInstanceUtils.LOADBALANCE_VERSION_UNKNOWN) + "' (" + SubscribableRedisChannel.this.getFullChannelName() + ").", e);
            }
        }
    }

    public SubscribableRedisChannel(RedisConnectionFactory redisConnectionFactory, String str) {
        Assert.notNull(redisConnectionFactory, "'connectionFactory' must not be null");
        Assert.hasText(str, "'topicName' must not be empty");
        this.connectionFactory = redisConnectionFactory;
        this.redisTemplate = new StringRedisTemplate(redisConnectionFactory);
        this.topicName = str;
    }

    public void setTaskExecutor(Executor executor) {
        Assert.notNull(executor, "'taskExecutor' must not be null");
        this.taskExecutor = executor;
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel
    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "'messageConverter' must not be null");
        this.messageConverter = messageConverter;
    }

    public void setSerializer(RedisSerializer<?> redisSerializer) {
        Assert.notNull(redisSerializer, "'serializer' must not be null");
        this.serializer = redisSerializer;
    }

    public void setMaxSubscribers(int i) {
        this.maxSubscribers = Integer.valueOf(i);
        this.dispatcher.setMaxSubscribers(i);
    }

    @Override // org.springframework.messaging.SubscribableChannel
    public boolean subscribe(MessageHandler messageHandler) {
        return this.dispatcher.addHandler(messageHandler);
    }

    @Override // org.springframework.messaging.SubscribableChannel
    public boolean unsubscribe(MessageHandler messageHandler) {
        return this.dispatcher.removeHandler(messageHandler);
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel
    protected boolean doSend(Message<?> message, long j) {
        this.redisTemplate.convertAndSend(this.topicName, this.messageConverter.fromMessage(message, Object.class));
        return true;
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        if (this.initialized) {
            return;
        }
        super.onInit();
        if (this.maxSubscribers == null) {
            setMaxSubscribers(((Integer) getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class)).intValue());
        }
        if (this.messageConverter == null) {
            this.messageConverter = new SimpleMessageConverter();
        }
        BeanFactory beanFactory = getBeanFactory();
        if (this.messageConverter instanceof BeanFactoryAware) {
            ((BeanFactoryAware) this.messageConverter).setBeanFactory(beanFactory);
        }
        this.container.setConnectionFactory(this.connectionFactory);
        if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
            this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, ChannelUtils.getErrorHandler(beanFactory));
        }
        this.container.setTaskExecutor(this.taskExecutor);
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageListenerDelegate());
        messageListenerAdapter.setSerializer(this.serializer);
        messageListenerAdapter.afterPropertiesSet();
        this.container.addMessageListener(messageListenerAdapter, new ChannelTopic(this.topicName));
        this.container.afterPropertiesSet();
        this.dispatcher.setBeanFactory(beanFactory);
        this.initialized = true;
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return this.container.isAutoStartup();
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return this.container.getPhase();
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.container.isRunning();
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        this.container.start();
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        this.container.stop();
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        this.container.stop(runnable);
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.support.management.IntegrationManagement, org.springframework.beans.factory.DisposableBean
    public void destroy() {
        try {
            this.container.destroy();
        } catch (Exception e) {
            throw new IllegalStateException("Cannot destroy " + this.container, e);
        }
    }
}
