package org.springframework.data.redis.listener;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.serializer.RedisElementReader;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:WEB-INF/lib/spring-data-redis-2.5.1.jar:org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer.class */
public class ReactiveRedisMessageListenerContainer implements DisposableBean {
    private final RedisSerializationContext.SerializationPair<String> stringSerializationPair = RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.string());
    private final Map<ReactiveSubscription, Subscribers> subscriptions = new ConcurrentHashMap();

    @Nullable
    private volatile ReactiveRedisConnection connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/spring-data-redis-2.5.1.jar:org/springframework/data/redis/listener/ReactiveRedisMessageListenerContainer$Subscribers.class */
    public static class Subscribers {
        private static final AtomicLongFieldUpdater<Subscribers> SUBSCRIBERS = AtomicLongFieldUpdater.newUpdater(Subscribers.class, "subscribers");
        private volatile long subscribers;

        Subscribers() {
        }

        void registered() {
            SUBSCRIBERS.incrementAndGet(this);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean hasRegistration() {
            return SUBSCRIBERS.get(this) > 0;
        }

        boolean unregister() {
            long j = SUBSCRIBERS.get(this);
            return j > 0 && SUBSCRIBERS.compareAndSet(this, j, j - 1) && j == 1;
        }
    }

    public ReactiveRedisMessageListenerContainer(ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        Assert.notNull(reactiveRedisConnectionFactory, "ReactiveRedisConnectionFactory must not be null!");
        this.connection = reactiveRedisConnectionFactory.getReactiveConnection();
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() {
        destroyLater().block();
    }

    public Mono<Void> destroyLater() {
        return Mono.defer(this::doDestroy);
    }

    private Mono<Void> doDestroy() {
        Flux flux;
        if (this.connection == null) {
            return Mono.empty();
        }
        ReactiveRedisConnection reactiveRedisConnection = this.connection;
        Flux flux2 = null;
        while (true) {
            flux = flux2;
            if (this.subscriptions.isEmpty()) {
                break;
            }
            Stream stream = new HashMap(this.subscriptions).keySet().stream();
            Map<ReactiveSubscription, Subscribers> map = this.subscriptions;
            map.getClass();
            List list = (List) stream.peek((v1) -> {
                r1.remove(v1);
            }).map((v0) -> {
                return v0.cancel();
            }).collect(Collectors.toList());
            flux2 = flux == null ? Flux.concat(list) : flux.mergeWith(Flux.concat(list));
        }
        this.connection = null;
        return flux != null ? flux.then(reactiveRedisConnection.closeLater()) : reactiveRedisConnection.closeLater();
    }

    public Collection<ReactiveSubscription> getActiveSubscriptions() {
        return (Collection) this.subscriptions.entrySet().stream().filter(entry -> {
            return ((Subscribers) entry.getValue()).hasRegistration();
        }).map(entry2 -> {
            return (ReactiveSubscription) entry2.getKey();
        }).collect(Collectors.toList());
    }

    public Flux<ReactiveSubscription.Message<String, String>> receive(ChannelTopic... channelTopicArr) {
        Assert.notNull(channelTopicArr, "ChannelTopics must not be null!");
        Assert.noNullElements(channelTopicArr, "ChannelTopics must not contain null elements!");
        return receive(Arrays.asList(channelTopicArr), this.stringSerializationPair, this.stringSerializationPair);
    }

    public Flux<ReactiveSubscription.PatternMessage<String, String, String>> receive(PatternTopic... patternTopicArr) {
        Assert.notNull(patternTopicArr, "PatternTopic must not be null!");
        Assert.noNullElements(patternTopicArr, "PatternTopic must not contain null elements!");
        return receive(Arrays.asList(patternTopicArr), this.stringSerializationPair, this.stringSerializationPair).map(message -> {
            return (ReactiveSubscription.PatternMessage) message;
        });
    }

    public <C, B> Flux<ReactiveSubscription.Message<C, B>> receive(Iterable<? extends Topic> iterable, RedisSerializationContext.SerializationPair<C> serializationPair, RedisSerializationContext.SerializationPair<B> serializationPair2) {
        Assert.notNull(iterable, "Topics must not be null!");
        verifyConnection();
        ByteBuffer[] targets = getTargets(iterable, PatternTopic.class);
        ByteBuffer[] targets2 = getTargets(iterable, ChannelTopic.class);
        if (ObjectUtils.isEmpty((Object[]) targets) && ObjectUtils.isEmpty((Object[]) targets2)) {
            throw new InvalidDataAccessApiUsageException("No channels or patterns to subscribe to.");
        }
        return doReceive(serializationPair, serializationPair2, this.connection.pubSubCommands().createSubscription(), targets, targets2);
    }

    private <C, B> Flux<ReactiveSubscription.Message<C, B>> doReceive(RedisSerializationContext.SerializationPair<C> serializationPair, RedisSerializationContext.SerializationPair<B> serializationPair2, Mono<ReactiveSubscription> mono, ByteBuffer[] byteBufferArr, ByteBuffer[] byteBufferArr2) {
        return mono.flatMapMany(reactiveSubscription -> {
            Mono<Void> subscribe = subscribe(byteBufferArr, byteBufferArr2, reactiveSubscription);
            MonoProcessor create = MonoProcessor.create();
            return reactiveSubscription.receive().mergeWith(subscribe.then(Mono.defer(() -> {
                getSubscribers(reactiveSubscription).registered();
                return Mono.empty();
            }))).doOnCancel(() -> {
                if (getSubscribers(reactiveSubscription).unregister()) {
                    this.subscriptions.remove(reactiveSubscription);
                    Mono<Void> unsubscribe = reactiveSubscription.unsubscribe();
                    Consumer consumer = r3 -> {
                        create.onComplete();
                    };
                    create.getClass();
                    unsubscribe.subscribe(consumer, create::onError);
                }
            }).mergeWith(create);
        }).map(message -> {
            return readMessage(serializationPair.getReader(), serializationPair2.getReader(), message);
        });
    }

    private static Mono<Void> subscribe(ByteBuffer[] byteBufferArr, ByteBuffer[] byteBufferArr2, ReactiveSubscription reactiveSubscription) {
        Assert.isTrue((ObjectUtils.isEmpty((Object[]) byteBufferArr2) && ObjectUtils.isEmpty((Object[]) byteBufferArr)) ? false : true, "Must provide either channels or patterns!");
        Mono<Void> mono = null;
        if (!ObjectUtils.isEmpty((Object[]) byteBufferArr)) {
            mono = reactiveSubscription.pSubscribe(byteBufferArr);
        }
        if (!ObjectUtils.isEmpty((Object[]) byteBufferArr2)) {
            Mono<Void> subscribe = reactiveSubscription.subscribe(byteBufferArr2);
            mono = mono == null ? subscribe : mono.and(subscribe);
        }
        return mono;
    }

    private boolean isActive() {
        return this.connection != null;
    }

    private void verifyConnection() {
        if (!isActive()) {
            throw new IllegalStateException("ReactiveRedisMessageListenerContainer is already disposed!");
        }
    }

    private Subscribers getSubscribers(ReactiveSubscription reactiveSubscription) {
        return this.subscriptions.computeIfAbsent(reactiveSubscription, reactiveSubscription2 -> {
            return new Subscribers();
        });
    }

    private ByteBuffer[] getTargets(Iterable<? extends Topic> iterable, Class<?> cls) {
        Stream stream = StreamSupport.stream(iterable.spliterator(), false);
        cls.getClass();
        Stream map = stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).map((v0) -> {
            return v0.getTopic();
        });
        RedisSerializationContext.SerializationPair<String> serializationPair = this.stringSerializationPair;
        serializationPair.getClass();
        return (ByteBuffer[]) map.map((v1) -> {
            return r1.write(v1);
        }).toArray(i -> {
            return new ByteBuffer[i];
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <C, B> ReactiveSubscription.Message<C, B> readMessage(RedisElementReader<C> redisElementReader, RedisElementReader<B> redisElementReader2, ReactiveSubscription.Message<ByteBuffer, ByteBuffer> message) {
        if (!(message instanceof ReactiveSubscription.PatternMessage)) {
            return new ReactiveSubscription.ChannelMessage(read(redisElementReader, message.getChannel()), read(redisElementReader2, message.getMessage()));
        }
        ReactiveSubscription.PatternMessage patternMessage = (ReactiveSubscription.PatternMessage) message;
        return new ReactiveSubscription.PatternMessage((String) read(this.stringSerializationPair.getReader(), (ByteBuffer) patternMessage.getPattern()), read(redisElementReader, (ByteBuffer) patternMessage.getChannel()), read(redisElementReader2, (ByteBuffer) patternMessage.getMessage()));
    }

    private static <C> C read(RedisElementReader<C> redisElementReader, ByteBuffer byteBuffer) {
        try {
            byteBuffer.mark();
            return redisElementReader.read(byteBuffer);
        } finally {
            byteBuffer.reset();
        }
    }
}
