package org.springframework.data.mongodb.gridfs;

import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.class */
public class AsyncInputStreamAdapter implements AsyncInputStream {
    private static final AtomicLongFieldUpdater<AsyncInputStreamAdapter> DEMAND = AtomicLongFieldUpdater.newUpdater(AsyncInputStreamAdapter.class, "demand");
    private static final AtomicIntegerFieldUpdater<AsyncInputStreamAdapter> SUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(AsyncInputStreamAdapter.class, "subscribed");
    private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0;
    private static final int SUBSCRIPTION_SUBSCRIBED = 1;
    private final Publisher<? extends DataBuffer> buffers;
    private final Context subscriberContext;
    private volatile Subscription subscription;
    private volatile boolean cancelled;
    private volatile boolean allDataBuffersReceived;
    private volatile Throwable error;
    volatile long demand;
    private final Queue<ReadRequest> readRequests = (Queue) Queues.small().get();
    private final Queue<DataBuffer> bufferQueue = (Queue) Queues.small().get();
    volatile int subscribed = 0;

    /* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter$DataBufferCoreSubscriber.class */
    private class DataBufferCoreSubscriber implements CoreSubscriber<DataBuffer> {
        private DataBufferCoreSubscriber() {
        }

        @Override // reactor.core.CoreSubscriber
        public Context currentContext() {
            return AsyncInputStreamAdapter.this.subscriberContext;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            AsyncInputStreamAdapter.this.subscription = subscription;
            subscription.request(1L);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(DataBuffer dataBuffer) {
            if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
                DataBufferUtils.release(dataBuffer);
                Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
            } else if (((ReadRequest) AsyncInputStreamAdapter.this.readRequests.peek()) != null) {
                AsyncInputStreamAdapter.this.bufferQueue.offer(dataBuffer);
                AsyncInputStreamAdapter.this.drainLoop();
            } else {
                DataBufferUtils.release(dataBuffer);
                Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
                AsyncInputStreamAdapter.this.subscription.cancel();
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
                Operators.onErrorDropped(th, AsyncInputStreamAdapter.this.subscriberContext);
                return;
            }
            AsyncInputStreamAdapter.this.error = th;
            AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
            AsyncInputStreamAdapter.this.terminatePendingReads();
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
            if (AsyncInputStreamAdapter.this.bufferQueue.isEmpty()) {
                AsyncInputStreamAdapter.this.terminatePendingReads();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/spring-data-mongodb-2.2.5.RELEASE.jar:org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter$ReadRequest.class */
    public class ReadRequest {
        private final FluxSink<Integer> sink;
        private final ByteBuffer dst;
        private int writtenBytes = -1;

        ReadRequest(FluxSink<Integer> fluxSink, ByteBuffer byteBuffer) {
            this.sink = fluxSink;
            this.dst = byteBuffer;
        }

        public void onComplete() {
            if (AsyncInputStreamAdapter.this.error != null) {
                AsyncInputStreamAdapter.this.onError(this.sink, AsyncInputStreamAdapter.this.error);
            } else {
                AsyncInputStreamAdapter.this.onComplete(this.sink, this.writtenBytes);
            }
        }

        public void transferBytes(DataBuffer dataBuffer, int i) {
            try {
                try {
                    if (AsyncInputStreamAdapter.this.error != null) {
                        AsyncInputStreamAdapter.this.onError(this.sink, AsyncInputStreamAdapter.this.error);
                        if (dataBuffer.readableByteCount() == 0) {
                            DataBufferUtils.release(dataBuffer);
                            return;
                        }
                        return;
                    }
                    ByteBuffer asByteBuffer = dataBuffer.asByteBuffer();
                    int min = Math.min(asByteBuffer.position() + Math.min(this.dst.remaining(), asByteBuffer.remaining()), asByteBuffer.capacity()) - asByteBuffer.position();
                    if (min == 0) {
                        AsyncInputStreamAdapter.this.onComplete(this.sink, this.writtenBytes);
                        if (dataBuffer.readableByteCount() == 0) {
                            DataBufferUtils.release(dataBuffer);
                            return;
                        }
                        return;
                    }
                    int position = asByteBuffer.position();
                    asByteBuffer.limit(min);
                    this.dst.put(asByteBuffer);
                    asByteBuffer.limit(asByteBuffer.capacity());
                    asByteBuffer.position(position);
                    dataBuffer.readPosition(dataBuffer.readPosition() + min);
                    if (this.writtenBytes == -1) {
                        this.writtenBytes = i;
                    } else {
                        this.writtenBytes += i;
                    }
                    if (dataBuffer.readableByteCount() == 0) {
                        DataBufferUtils.release(dataBuffer);
                    }
                } catch (Exception e) {
                    AsyncInputStreamAdapter.this.onError(this.sink, e);
                    if (dataBuffer.readableByteCount() == 0) {
                        DataBufferUtils.release(dataBuffer);
                    }
                }
            } catch (Throwable th) {
                if (dataBuffer.readableByteCount() == 0) {
                    DataBufferUtils.release(dataBuffer);
                }
                throw th;
            }
        }
    }

    public Publisher<Integer> read(ByteBuffer byteBuffer) {
        return Flux.create(fluxSink -> {
            this.readRequests.offer(new ReadRequest(fluxSink, byteBuffer));
            fluxSink.onCancel(this::terminatePendingReads);
            fluxSink.onDispose(this::terminatePendingReads);
            fluxSink.onRequest(this::request);
        });
    }

    void onError(FluxSink<Integer> fluxSink, Throwable th) {
        this.readRequests.poll();
        fluxSink.error(th);
    }

    void onComplete(FluxSink<Integer> fluxSink, int i) {
        this.readRequests.poll();
        DEMAND.decrementAndGet(this);
        fluxSink.next(Integer.valueOf(i));
        fluxSink.complete();
    }

    public Publisher<Long> skip(long j) {
        throw new UnsupportedOperationException("Skip is currently not implemented");
    }

    public Publisher<Success> close() {
        return Mono.create(monoSink -> {
            this.cancelled = true;
            if (this.error != null) {
                terminatePendingReads();
                monoSink.error(this.error);
            } else {
                terminatePendingReads();
                monoSink.success(Success.SUCCESS);
            }
        });
    }

    protected void request(long j) {
        if (this.allDataBuffersReceived && this.bufferQueue.isEmpty()) {
            terminatePendingReads();
            return;
        }
        Operators.addCap(DEMAND, this, j);
        if (SUBSCRIBED.get(this) == 0) {
            if (SUBSCRIBED.compareAndSet(this, 0, 1)) {
                this.buffers.subscribe(new DataBufferCoreSubscriber());
            }
        } else {
            Subscription subscription = this.subscription;
            if (subscription != null) {
                requestFromSubscription(subscription);
            }
        }
    }

    void requestFromSubscription(Subscription subscription) {
        if (this.cancelled) {
            subscription.cancel();
        }
        drainLoop();
    }

    void drainLoop() {
        DataBuffer peek;
        while (DEMAND.get(this) > 0 && (peek = this.bufferQueue.peek()) != null) {
            if (peek.readableByteCount() == 0) {
                this.bufferQueue.poll();
            } else {
                ReadRequest peek2 = this.readRequests.peek();
                if (peek2 == null) {
                    break;
                } else {
                    peek2.transferBytes(peek, peek.readableByteCount());
                }
            }
        }
        if (this.bufferQueue.isEmpty()) {
            if (this.allDataBuffersReceived) {
                terminatePendingReads();
            } else if (this.demand > 0) {
                this.subscription.request(1L);
            }
        }
    }

    void terminatePendingReads() {
        while (true) {
            ReadRequest poll = this.readRequests.poll();
            if (poll == null) {
                return;
            } else {
                poll.onComplete();
            }
        }
    }

    public AsyncInputStreamAdapter(Publisher<? extends DataBuffer> publisher, Context context) {
        this.buffers = publisher;
        this.subscriberContext = context;
    }
}
