/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.server.common.serialization.RecordSerde;

public class BatchAccumulator<T>
implements Closeable {
    private final int epoch;
    private final Time time;
    private final SimpleTimer lingerTimer;
    private final int lingerMs;
    private final int maxBatchSize;
    private final CompressionType compressionType;
    private final MemoryPool memoryPool;
    private final ReentrantLock appendLock;
    private final RecordSerde<T> serde;
    private final ConcurrentLinkedQueue<CompletedBatch<T>> completed;
    private volatile DrainStatus drainStatus;
    private long nextOffset;
    private BatchBuilder<T> currentBatch;

    public BatchAccumulator(int epoch, long baseOffset, int lingerMs, int maxBatchSize, MemoryPool memoryPool, Time time, CompressionType compressionType, RecordSerde<T> serde) {
        this.epoch = epoch;
        this.lingerMs = lingerMs;
        this.maxBatchSize = maxBatchSize;
        this.memoryPool = memoryPool;
        this.time = time;
        this.lingerTimer = new SimpleTimer();
        this.compressionType = compressionType;
        this.serde = serde;
        this.nextOffset = baseOffset;
        this.drainStatus = DrainStatus.NONE;
        this.completed = new ConcurrentLinkedQueue();
        this.appendLock = new ReentrantLock();
    }

    public long append(int epoch, List<T> records) {
        return this.append(epoch, records, false);
    }

    public long appendAtomic(int epoch, List<T> records) {
        return this.append(epoch, records, true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long append(int epoch, List<T> records, boolean isAtomic) {
        if (epoch < this.epoch) {
            throw new NotLeaderException("Append failed because the epoch doesn't match");
        }
        if (epoch > this.epoch) {
            throw new IllegalArgumentException("Attempt to append from epoch " + epoch + " which is larger than the current epoch " + this.epoch);
        }
        ObjectSerializationCache serializationCache = new ObjectSerializationCache();
        this.appendLock.lock();
        try {
            this.maybeCompleteDrain();
            BatchBuilder<T> batch = null;
            if (isAtomic) {
                batch = this.maybeAllocateBatch(records, serializationCache);
            }
            for (T record : records) {
                if (!isAtomic) {
                    batch = this.maybeAllocateBatch(Collections.singleton(record), serializationCache);
                }
                if (batch == null) {
                    throw new BufferAllocationException("Append failed because we failed to allocate memory to write the batch");
                }
                batch.appendRecord(record, serializationCache);
                ++this.nextOffset;
            }
            this.maybeResetLinger();
            long l = this.nextOffset - 1L;
            return l;
        }
        finally {
            this.appendLock.unlock();
        }
    }

    private void maybeResetLinger() {
        if (!this.lingerTimer.isRunning()) {
            this.lingerTimer.reset(this.time.milliseconds() + (long)this.lingerMs);
        }
    }

    private BatchBuilder<T> maybeAllocateBatch(Collection<T> records, ObjectSerializationCache serializationCache) {
        if (this.currentBatch == null) {
            this.startNewBatch();
        }
        if (this.currentBatch != null) {
            OptionalInt bytesNeeded = this.currentBatch.bytesNeeded(records, serializationCache);
            if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > this.maxBatchSize) {
                throw new RecordBatchTooLargeException(String.format("The total record(s) size of %s exceeds the maximum allowed batch size of %s", bytesNeeded.getAsInt(), this.maxBatchSize));
            }
            if (bytesNeeded.isPresent()) {
                this.completeCurrentBatch();
                this.startNewBatch();
            }
        }
        return this.currentBatch;
    }

    private void completeCurrentBatch() {
        MemoryRecords data = this.currentBatch.build();
        this.completed.add(new CompletedBatch(this.currentBatch.baseOffset(), this.currentBatch.records(), data, this.memoryPool, this.currentBatch.initialBuffer()));
        this.currentBatch = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void appendControlMessage(Function<ByteBuffer, MemoryRecords> valueCreator) {
        block6: {
            this.appendLock.lock();
            try {
                ByteBuffer buffer = this.memoryPool.tryAllocate(256);
                if (buffer != null) {
                    try {
                        this.forceDrain();
                        this.completed.add(new CompletedBatch(this.nextOffset, 1, valueCreator.apply(buffer), this.memoryPool, buffer));
                        ++this.nextOffset;
                        break block6;
                    }
                    catch (Exception e) {
                        this.memoryPool.release(buffer);
                        throw e;
                    }
                }
                throw new IllegalStateException("Could not allocate buffer for the control record");
            }
            finally {
                this.appendLock.unlock();
            }
        }
    }

    public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimeMs) {
        this.appendControlMessage(buffer -> MemoryRecords.withLeaderChangeMessage((long)this.nextOffset, (long)currentTimeMs, (int)this.epoch, (ByteBuffer)buffer, (LeaderChangeMessage)leaderChangeMessage));
    }

    public void appendSnapshotHeaderMessage(SnapshotHeaderRecord snapshotHeaderRecord, long currentTimeMs) {
        this.appendControlMessage(buffer -> MemoryRecords.withSnapshotHeaderRecord((long)this.nextOffset, (long)currentTimeMs, (int)this.epoch, (ByteBuffer)buffer, (SnapshotHeaderRecord)snapshotHeaderRecord));
    }

    public void appendSnapshotFooterMessage(SnapshotFooterRecord snapshotFooterRecord, long currentTimeMs) {
        this.appendControlMessage(buffer -> MemoryRecords.withSnapshotFooterRecord((long)this.nextOffset, (long)currentTimeMs, (int)this.epoch, (ByteBuffer)buffer, (SnapshotFooterRecord)snapshotFooterRecord));
    }

    public void forceDrain() {
        this.appendLock.lock();
        try {
            this.drainStatus = DrainStatus.STARTED;
            this.maybeCompleteDrain();
        }
        finally {
            this.appendLock.unlock();
        }
    }

    private void maybeCompleteDrain() {
        if (this.drainStatus == DrainStatus.STARTED) {
            if (this.currentBatch != null && this.currentBatch.nonEmpty()) {
                this.completeCurrentBatch();
            }
            this.lingerTimer.reset(Long.MAX_VALUE);
            this.drainStatus = DrainStatus.FINISHED;
        }
    }

    private void startNewBatch() {
        ByteBuffer buffer = this.memoryPool.tryAllocate(this.maxBatchSize);
        if (buffer != null) {
            this.currentBatch = new BatchBuilder<T>(buffer, this.serde, this.compressionType, this.nextOffset, this.time.milliseconds(), false, this.epoch, this.maxBatchSize);
        }
    }

    public boolean needsDrain(long currentTimeMs) {
        return this.timeUntilDrain(currentTimeMs) <= 0L;
    }

    public long timeUntilDrain(long currentTimeMs) {
        if (this.drainStatus == DrainStatus.FINISHED) {
            return 0L;
        }
        return this.lingerTimer.remainingMs(currentTimeMs);
    }

    public int epoch() {
        return this.epoch;
    }

    public List<CompletedBatch<T>> drain() {
        if (this.drainStatus == DrainStatus.NONE) {
            this.drainStatus = DrainStatus.STARTED;
        }
        if (this.appendLock.tryLock()) {
            try {
                this.maybeCompleteDrain();
            }
            finally {
                this.appendLock.unlock();
            }
        }
        if (this.drainStatus == DrainStatus.FINISHED) {
            this.drainStatus = DrainStatus.NONE;
            return this.drainCompleted();
        }
        return Collections.emptyList();
    }

    private List<CompletedBatch<T>> drainCompleted() {
        ArrayList<CompletedBatch<T>> res = new ArrayList<CompletedBatch<T>>(this.completed.size());
        CompletedBatch<T> batch;
        while ((batch = this.completed.poll()) != null) {
            res.add(batch);
        }
        return res;
    }

    public boolean isEmpty() {
        return !this.lingerTimer.isRunning();
    }

    public int numCompletedBatches() {
        return this.completed.size();
    }

    @Override
    public void close() {
        List<CompletedBatch<CompletedBatch>> unwritten = this.drain();
        unwritten.forEach(CompletedBatch::release);
    }

    private static class SimpleTimer {
        private final AtomicLong deadlineMs = new AtomicLong(Long.MAX_VALUE);

        private SimpleTimer() {
        }

        boolean isRunning() {
            return this.deadlineMs.get() != Long.MAX_VALUE;
        }

        void reset(long deadlineMs) {
            this.deadlineMs.set(deadlineMs);
        }

        long remainingMs(long currentTimeMs) {
            return Math.max(0L, this.deadlineMs.get() - currentTimeMs);
        }
    }

    public static class CompletedBatch<T> {
        public final long baseOffset;
        public final int numRecords;
        public final Optional<List<T>> records;
        public final MemoryRecords data;
        private final MemoryPool pool;
        private final ByteBuffer initialBuffer;

        private CompletedBatch(long baseOffset, List<T> records, MemoryRecords data, MemoryPool pool, ByteBuffer initialBuffer) {
            Objects.requireNonNull(data.firstBatch(), "Expected memory records to contain one batch");
            this.baseOffset = baseOffset;
            this.records = Optional.of(records);
            this.numRecords = records.size();
            this.data = data;
            this.pool = pool;
            this.initialBuffer = initialBuffer;
        }

        private CompletedBatch(long baseOffset, int numRecords, MemoryRecords data, MemoryPool pool, ByteBuffer initialBuffer) {
            Objects.requireNonNull(data.firstBatch(), "Expected memory records to contain one batch");
            this.baseOffset = baseOffset;
            this.records = Optional.empty();
            this.numRecords = numRecords;
            this.data = data;
            this.pool = pool;
            this.initialBuffer = initialBuffer;
        }

        public int sizeInBytes() {
            return this.data.sizeInBytes();
        }

        public void release() {
            this.pool.release(this.initialBuffer);
        }

        public long appendTimestamp() {
            return this.data.firstBatch().maxTimestamp();
        }
    }

    private static enum DrainStatus {
        STARTED,
        FINISHED,
        NONE;

    }
}

