package org.redisson.reactive;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RFuture;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:WEB-INF/lib/redisson-3.18.0.jar:org/redisson/reactive/ElementsStream.class */
public class ElementsStream {
    /* JADX INFO: Access modifiers changed from: private */
    public static <V> void take(Callable<RFuture<V>> callable, FluxSink<V> fluxSink, AtomicLong atomicLong, AtomicReference<RFuture<V>> atomicReference) {
        try {
            RFuture<V> call = callable.call();
            atomicReference.set(call);
            call.whenComplete((obj, th) -> {
                if (th != null) {
                    fluxSink.error(th);
                    return;
                }
                fluxSink.next(obj);
                if (atomicLong.decrementAndGet() == 0) {
                    return;
                }
                take(callable, fluxSink, atomicLong, atomicReference);
            });
        } catch (Exception e) {
            fluxSink.error(e);
        }
    }

    public static <V> Flux<V> takeElements(Callable<RFuture<V>> callable) {
        return Flux.create(fluxSink -> {
            AtomicReference atomicReference = new AtomicReference();
            fluxSink.onRequest(j -> {
                take(callable, fluxSink, new AtomicLong(j), atomicReference);
            });
            fluxSink.onDispose(() -> {
                ((RFuture) atomicReference.get()).cancel(true);
            });
        });
    }
}
