package org.springframework.data.mongodb.gridfs;

import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import io.github.resilience4j.circuitbreaker.utils.MetricNames;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.class */
class DataBufferPublisherAdapter {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter$AsyncInputStreamHandler.class */
    public static class AsyncInputStreamHandler {
        private static final AtomicLongFieldUpdater<AsyncInputStreamHandler> DEMAND = AtomicLongFieldUpdater.newUpdater(AsyncInputStreamHandler.class, "demand");
        private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> STATE = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamHandler.class, MetricNames.STATE);
        private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> DRAIN = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamHandler.class, "drain");
        private static final AtomicIntegerFieldUpdater<AsyncInputStreamHandler> READ = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamHandler.class, "read");
        private static final int STATE_OPEN = 0;
        private static final int STATE_CLOSED = 1;
        private static final int DRAIN_NONE = 0;
        private static final int DRAIN_COMPLETION = 1;
        private static final int READ_NONE = 0;
        private static final int READ_IN_PROGRESS = 1;
        final AsyncInputStream inputStream;
        final DataBufferFactory dataBufferFactory;
        final int bufferSize;
        volatile long demand;
        volatile int state = 0;
        volatile int drain = 0;
        volatile int read = 0;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter$AsyncInputStreamHandler$BufferCoreSubscriber.class */
        public class BufferCoreSubscriber implements CoreSubscriber<Integer> {
            private final FluxSink<DataBuffer> sink;
            private final DataBufferFactory factory;
            private final ByteBuffer transport;
            private volatile Subscription subscription;

            BufferCoreSubscriber(FluxSink<DataBuffer> fluxSink, DataBufferFactory dataBufferFactory, ByteBuffer byteBuffer) {
                this.sink = fluxSink;
                this.factory = dataBufferFactory;
                this.transport = byteBuffer;
            }

            @Override // reactor.core.CoreSubscriber
            public Context currentContext() {
                return this.sink.currentContext();
            }

            @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                subscription.request(1L);
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(Integer num) {
                if (AsyncInputStreamHandler.this.isClosed()) {
                    return;
                }
                if (num.intValue() > 0) {
                    this.sink.next(readNextChunk());
                    AsyncInputStreamHandler.this.decrementDemand();
                }
                if (num.intValue() == -1) {
                    this.sink.complete();
                } else {
                    this.subscription.request(1L);
                }
            }

            private DataBuffer readNextChunk() {
                this.transport.flip();
                DataBuffer allocateBuffer = this.factory.allocateBuffer(this.transport.remaining());
                allocateBuffer.write(this.transport);
                this.transport.clear();
                return allocateBuffer;
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                if (AsyncInputStreamHandler.this.isClosed()) {
                    Operators.onErrorDropped(th, this.sink.currentContext());
                } else {
                    AsyncInputStreamHandler.this.close();
                    this.sink.error(th);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                AsyncInputStreamHandler.this.onReadDone();
                if (AsyncInputStreamHandler.this.isClosed() || !AsyncInputStreamHandler.this.enterDrainLoop()) {
                    return;
                }
                try {
                    AsyncInputStreamHandler.this.drainLoop(this.sink);
                } finally {
                    AsyncInputStreamHandler.this.leaveDrainLoop();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void request(FluxSink<DataBuffer> fluxSink, long j) {
            Operators.addCap(DEMAND, this, j);
            drainLoop(fluxSink);
        }

        void drainLoop(FluxSink<DataBuffer> fluxSink) {
            while (onShouldRead()) {
                emitNext(fluxSink);
            }
        }

        boolean onShouldRead() {
            return !isClosed() && getDemand() > 0 && onWantRead();
        }

        boolean onWantRead() {
            return READ.compareAndSet(this, 0, 1);
        }

        void onReadDone() {
            READ.compareAndSet(this, 1, 0);
        }

        long getDemand() {
            return DEMAND.get(this);
        }

        void decrementDemand() {
            DEMAND.decrementAndGet(this);
        }

        void close() {
            STATE.compareAndSet(this, 0, 1);
        }

        boolean enterDrainLoop() {
            return DRAIN.compareAndSet(this, 0, 1);
        }

        void leaveDrainLoop() {
            DRAIN.set(this, 0);
        }

        boolean isClosed() {
            return STATE.get(this) == 1;
        }

        private void emitNext(FluxSink<DataBuffer> fluxSink) {
            ByteBuffer allocate = ByteBuffer.allocate(this.bufferSize);
            try {
                this.inputStream.read(allocate).subscribe(new BufferCoreSubscriber(fluxSink, this.dataBufferFactory, allocate));
            } catch (Throwable th) {
                fluxSink.error(th);
            }
        }

        public AsyncInputStreamHandler(AsyncInputStream asyncInputStream, DataBufferFactory dataBufferFactory, int i) {
            this.inputStream = asyncInputStream;
            this.dataBufferFactory = dataBufferFactory;
            this.bufferSize = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter$DelegatingAsyncInputStream.class */
    public static class DelegatingAsyncInputStream implements AsyncInputStream {
        private final AsyncInputStream inputStream;
        private final DataBufferFactory dataBufferFactory;
        private int bufferSize;

        DelegatingAsyncInputStream(AsyncInputStream asyncInputStream, DataBufferFactory dataBufferFactory, int i) {
            this.inputStream = asyncInputStream;
            this.dataBufferFactory = dataBufferFactory;
            this.bufferSize = i;
        }

        public Publisher<Integer> read(ByteBuffer byteBuffer) {
            return this.inputStream.read(byteBuffer);
        }

        public Publisher<Long> skip(long j) {
            return this.inputStream.skip(j);
        }

        public Publisher<Success> close() {
            return this.inputStream.close();
        }
    }

    DataBufferPublisherAdapter() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Flux<DataBuffer> createBinaryStream(AsyncInputStream asyncInputStream, DataBufferFactory dataBufferFactory, int i) {
        return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(asyncInputStream, dataBufferFactory, i)), DataBufferPublisherAdapter::doRead, (v0) -> {
            return v0.close();
        }, (delegatingAsyncInputStream, th) -> {
            return delegatingAsyncInputStream.close();
        }, (v0) -> {
            return v0.close();
        });
    }

    private static Flux<DataBuffer> doRead(DelegatingAsyncInputStream delegatingAsyncInputStream) {
        AsyncInputStreamHandler asyncInputStreamHandler = new AsyncInputStreamHandler(delegatingAsyncInputStream, delegatingAsyncInputStream.dataBufferFactory, delegatingAsyncInputStream.bufferSize);
        return Flux.create(fluxSink -> {
            asyncInputStreamHandler.getClass();
            fluxSink.onDispose(asyncInputStreamHandler::close);
            asyncInputStreamHandler.getClass();
            fluxSink.onCancel(asyncInputStreamHandler::close);
            fluxSink.onRequest(j -> {
                asyncInputStreamHandler.request(fluxSink, j);
            });
        });
    }
}
