/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.Experimental;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.Fetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class IncrementalSourceSplitReader<C extends SourceConfig>
implements SplitReader<SourceRecords, SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceSplitReader.class);
    private final ArrayDeque<SnapshotSplit> snapshotSplits;
    private final ArrayDeque<StreamSplit> streamSplits;
    private final int subtaskId;
    @Nullable
    private Fetcher<SourceRecords, SourceSplitBase> currentFetcher;
    @Nullable
    private IncrementalSourceScanFetcher reusedScanFetcher;
    @Nullable
    private IncrementalSourceStreamFetcher reusedStreamFetcher;
    @Nullable
    private String currentSplitId;
    private final DataSourceDialect<C> dataSourceDialect;
    private final C sourceConfig;
    private final IncrementalSourceReaderContext context;
    private final SnapshotPhaseHooks snapshotHooks;

    public IncrementalSourceSplitReader(int subtaskId, DataSourceDialect<C> dataSourceDialect, C sourceConfig, IncrementalSourceReaderContext context, SnapshotPhaseHooks snapshotHooks) {
        this.subtaskId = subtaskId;
        this.snapshotSplits = new ArrayDeque();
        this.streamSplits = new ArrayDeque(1);
        this.dataSourceDialect = dataSourceDialect;
        this.sourceConfig = sourceConfig;
        this.context = context;
        this.snapshotHooks = snapshotHooks;
    }

    public RecordsWithSplitIds<SourceRecords> fetch() throws IOException {
        try {
            this.suspendStreamReaderIfNeed();
            return this.pollSplitRecords();
        }
        catch (Exception e) {
            LOG.warn("fetch data failed.", (Throwable)e);
            throw new IOException(e);
        }
    }

    private void suspendStreamReaderIfNeed() throws Exception {
        if (this.currentFetcher != null && this.currentFetcher instanceof IncrementalSourceStreamFetcher && this.context.isStreamSplitReaderSuspended() && !this.currentFetcher.isFinished()) {
            ((IncrementalSourceStreamFetcher)this.currentFetcher).stopReadTask();
            LOG.info("Suspend stream reader to wait the stream split update.");
        }
    }

    public void handleSplitsChanges(SplitsChange<SourceSplitBase> splitsChanges) {
        if (!(splitsChanges instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChanges.getClass()));
        }
        LOG.debug("Handling split change {}", splitsChanges);
        for (SourceSplitBase split : splitsChanges.splits()) {
            if (split.isSnapshotSplit()) {
                this.snapshotSplits.add(split.asSnapshotSplit());
                continue;
            }
            this.streamSplits.add(split.asStreamSplit());
        }
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.closeScanFetcher();
        this.closeStreamFetcher();
    }

    private ChangeEventRecords pollSplitRecords() throws InterruptedException {
        Iterator<SourceRecords> dataIt = null;
        if (this.currentFetcher == null) {
            if (this.streamSplits.size() > 0) {
                StreamSplit nextSplit = this.streamSplits.poll();
                this.submitStreamSplit(nextSplit);
            } else if (this.snapshotSplits.size() > 0) {
                this.submitSnapshotSplit(this.snapshotSplits.poll());
            } else {
                LOG.info("No available split to read.");
            }
            if (this.currentFetcher != null) {
                dataIt = this.currentFetcher.pollSplitRecords();
            } else {
                this.currentSplitId = null;
            }
            return dataIt == null ? this.finishedSplit() : this.forRecords(dataIt);
        }
        if (this.currentFetcher instanceof IncrementalSourceScanFetcher) {
            dataIt = this.currentFetcher.pollSplitRecords();
            if (dataIt != null) {
                ChangeEventRecords records;
                if (this.context.isHasAssignedStreamSplit()) {
                    records = this.forNewAddedTableFinishedSplit(this.currentSplitId, dataIt);
                    this.closeScanFetcher();
                    this.closeStreamFetcher();
                } else {
                    records = this.forRecords(dataIt);
                    SnapshotSplit nextSplit = this.snapshotSplits.poll();
                    if (nextSplit != null) {
                        Preconditions.checkState((this.reusedScanFetcher != null ? 1 : 0) != 0);
                        this.submitSnapshotSplit(nextSplit);
                    } else {
                        this.closeScanFetcher();
                    }
                }
                return records;
            }
            return this.finishedSplit();
        }
        if (this.currentFetcher instanceof IncrementalSourceStreamFetcher) {
            dataIt = this.currentFetcher.pollSplitRecords();
            if (dataIt != null) {
                SnapshotSplit nextSplit = this.snapshotSplits.poll();
                if (nextSplit != null) {
                    this.closeStreamFetcher();
                    LOG.info("It's turn to switch next fetch reader to snapshot split reader");
                    this.submitSnapshotSplit(nextSplit);
                }
                return ChangeEventRecords.forRecords("stream-split", dataIt);
            }
            this.closeStreamFetcher();
            return this.finishedSplit();
        }
        throw new IllegalStateException("Unsupported reader type.");
    }

    @VisibleForTesting
    public boolean canAssignNextSplit() {
        return this.currentFetcher == null || this.currentFetcher.isFinished();
    }

    private ChangeEventRecords finishedSplit() {
        ChangeEventRecords finishedRecords = ChangeEventRecords.forFinishedSplit(this.currentSplitId);
        this.currentSplitId = null;
        return finishedRecords;
    }

    private ChangeEventRecords forNewAddedTableFinishedSplit(String splitId, Iterator<SourceRecords> recordsForSplit) {
        HashSet<String> finishedSplits = new HashSet<String>();
        finishedSplits.add(splitId);
        finishedSplits.add("stream-split");
        this.currentSplitId = null;
        return new ChangeEventRecords(splitId, recordsForSplit, finishedSplits);
    }

    private ChangeEventRecords forRecords(Iterator<SourceRecords> dataIt) {
        if (this.currentFetcher instanceof IncrementalSourceScanFetcher) {
            ChangeEventRecords finishedRecords = ChangeEventRecords.forSnapshotRecords(this.currentSplitId, dataIt);
            this.closeScanFetcher();
            return finishedRecords;
        }
        return ChangeEventRecords.forRecords(this.currentSplitId, dataIt);
    }

    private void submitSnapshotSplit(SnapshotSplit snapshotSplit) {
        this.currentSplitId = snapshotSplit.splitId();
        this.currentFetcher = this.getScanFetcher();
        FetchTask<SourceSplitBase> fetchTask = this.dataSourceDialect.createFetchTask(snapshotSplit);
        ((AbstractScanFetchTask)fetchTask).setSnapshotPhaseHooks(this.snapshotHooks);
        this.currentFetcher.submitTask(fetchTask);
    }

    private void submitStreamSplit(StreamSplit streamSplit) {
        this.currentSplitId = streamSplit.splitId();
        this.currentFetcher = this.getStreamFetcher();
        FetchTask<SourceSplitBase> fetchTask = this.dataSourceDialect.createFetchTask(streamSplit);
        this.currentFetcher.submitTask(fetchTask);
    }

    private IncrementalSourceScanFetcher getScanFetcher() {
        if (this.reusedScanFetcher == null) {
            this.reusedScanFetcher = new IncrementalSourceScanFetcher(this.dataSourceDialect.createFetchTaskContext(this.sourceConfig), this.subtaskId);
        }
        return this.reusedScanFetcher;
    }

    private IncrementalSourceStreamFetcher getStreamFetcher() {
        if (this.reusedStreamFetcher == null) {
            this.reusedStreamFetcher = new IncrementalSourceStreamFetcher(this.dataSourceDialect.createFetchTaskContext(this.sourceConfig), this.subtaskId);
        }
        return this.reusedStreamFetcher;
    }

    private void closeScanFetcher() {
        if (this.reusedScanFetcher != null) {
            LOG.debug("Close snapshot reader {}", (Object)this.reusedScanFetcher.getClass().getCanonicalName());
            this.reusedScanFetcher.close();
            if (this.currentFetcher == this.reusedScanFetcher) {
                this.currentFetcher = null;
            }
            this.reusedScanFetcher = null;
        }
    }

    private void closeStreamFetcher() {
        if (this.reusedStreamFetcher != null) {
            LOG.debug("Close stream reader {}", (Object)this.reusedStreamFetcher.getClass().getCanonicalName());
            this.reusedStreamFetcher.close();
            if (this.currentFetcher == this.reusedStreamFetcher) {
                this.currentFetcher = null;
            }
            this.reusedStreamFetcher = null;
        }
    }
}

