package org.redisson.reactive;

import java.util.concurrent.atomic.AtomicLong;
import org.redisson.api.RTopic;
import org.redisson.api.listener.MessageListener;
import reactor.core.publisher.Flux;

/* loaded from: input_file:WEB-INF/lib/redisson-3.13.6.jar:org/redisson/reactive/RedissonTopicReactive.class */
public class RedissonTopicReactive {
    private final RTopic topic;

    public RedissonTopicReactive(RTopic rTopic) {
        this.topic = rTopic;
    }

    public <M> Flux<M> getMessages(Class<M> cls) {
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                final AtomicLong atomicLong = new AtomicLong(j);
                this.topic.addListenerAsync(cls, new MessageListener<M>() { // from class: org.redisson.reactive.RedissonTopicReactive.1
                    @Override // org.redisson.api.listener.MessageListener
                    public void onMessage(CharSequence charSequence, M m) {
                        fluxSink.next(m);
                        if (atomicLong.decrementAndGet() == 0) {
                            RedissonTopicReactive.this.topic.removeListenerAsync(this);
                            fluxSink.complete();
                        }
                    }
                }).onComplete((num, th) -> {
                    if (th != null) {
                        fluxSink.error(th);
                    } else {
                        fluxSink.onDispose(() -> {
                            this.topic.removeListenerAsync(num);
                        });
                    }
                });
            });
        });
    }
}
