package io.seata.rm.datasource;

import com.google.common.collect.Lists;
import io.seata.common.ConfigurationKeys;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.common.util.IOUtil;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.model.BranchStatus;
import io.seata.rm.datasource.undo.UndoLogManager;
import io.seata.rm.datasource.undo.UndoLogManagerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/seata-all-2.0.0.jar:io/seata/rm/datasource/AsyncWorker.class */
public class AsyncWorker {
    private static final int DEFAULT_RESOURCE_SIZE = 16;
    private static final int UNDOLOG_DELETE_LIMIT_SIZE = 1000;
    private final DataSourceManager dataSourceManager;
    private final BlockingQueue<Phase2Context> commitQueue;
    private final ScheduledExecutorService scheduledExecutor;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AsyncWorker.class);
    private static final int ASYNC_COMMIT_BUFFER_LIMIT = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_ASYNC_COMMIT_BUFFER_LIMIT, 10000);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/seata-all-2.0.0.jar:io/seata/rm/datasource/AsyncWorker$Phase2Context.class */
    public static class Phase2Context {
        String xid;
        long branchId;
        String resourceId;

        public Phase2Context(String str, long j, String str2) {
            this.xid = str;
            this.branchId = j;
            this.resourceId = str2;
        }

        public String toString() {
            return "Phase2Context{xid='" + this.xid + "', branchId=" + this.branchId + ", resourceId='" + this.resourceId + "'}";
        }
    }

    public AsyncWorker(DataSourceManager dataSourceManager) {
        this.dataSourceManager = dataSourceManager;
        LOGGER.info("Async Commit Buffer Limit: {}", Integer.valueOf(ASYNC_COMMIT_BUFFER_LIMIT));
        this.commitQueue = new LinkedBlockingQueue(ASYNC_COMMIT_BUFFER_LIMIT);
        this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("AsyncWorker", 2, true));
        this.scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10L, 1000L, TimeUnit.MILLISECONDS);
    }

    public BranchStatus branchCommit(String str, long j, String str2) {
        addToCommitQueue(new Phase2Context(str, j, str2));
        return BranchStatus.PhaseTwo_Committed;
    }

    private void addToCommitQueue(Phase2Context phase2Context) {
        if (this.commitQueue.offer(phase2Context)) {
            return;
        }
        CompletableFuture.runAsync(this::doBranchCommitSafely, this.scheduledExecutor).thenRun(() -> {
            addToCommitQueue(phase2Context);
        });
    }

    private void addAllToCommitQueue(List<Phase2Context> list) {
        Iterator<Phase2Context> it = list.iterator();
        while (it.hasNext()) {
            addToCommitQueue(it.next());
        }
    }

    void doBranchCommitSafely() {
        try {
            doBranchCommit();
        } catch (Throwable th) {
            LOGGER.error("Exception occur when doing branch commit", th);
        }
    }

    private void doBranchCommit() {
        if (this.commitQueue.isEmpty()) {
            return;
        }
        LinkedList linkedList = new LinkedList();
        this.commitQueue.drainTo(linkedList);
        groupedByResourceId(linkedList).forEach(this::dealWithGroupedContexts);
    }

    Map<String, List<Phase2Context>> groupedByResourceId(List<Phase2Context> list) {
        HashMap hashMap = new HashMap(16);
        list.forEach(phase2Context -> {
            if (StringUtils.isBlank(phase2Context.resourceId)) {
                LOGGER.warn("resourceId is empty, resource:{}", phase2Context);
            } else {
                ((List) hashMap.computeIfAbsent(phase2Context.resourceId, str -> {
                    return new LinkedList();
                })).add(phase2Context);
            }
        });
        return hashMap;
    }

    private void dealWithGroupedContexts(String str, List<Phase2Context> list) {
        if (StringUtils.isBlank(str)) {
            LOGGER.warn("resourceId is empty and will skip.");
            return;
        }
        DataSourceProxy dataSourceProxy = this.dataSourceManager.get(str);
        if (dataSourceProxy == null) {
            LOGGER.warn("failed to find resource for {} and requeue", str);
            addAllToCommitQueue(list);
            return;
        }
        Connection connection = null;
        try {
            try {
                connection = dataSourceProxy.getPlainConnection();
                UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
                Iterator it = Lists.partition(list, 1000).iterator();
                while (it.hasNext()) {
                    deleteUndoLog(connection, undoLogManager, (List) it.next());
                }
                IOUtil.close(connection);
            } catch (SQLException e) {
                addAllToCommitQueue(list);
                LOGGER.error("failed to get connection for async committing on {} and requeue", str, e);
                IOUtil.close(connection);
            }
        } catch (Throwable th) {
            IOUtil.close(connection);
            throw th;
        }
    }

    private void deleteUndoLog(Connection connection, UndoLogManager undoLogManager, List<Phase2Context> list) {
        LinkedHashSet linkedHashSet = new LinkedHashSet(list.size());
        LinkedHashSet linkedHashSet2 = new LinkedHashSet(list.size());
        list.forEach(phase2Context -> {
            linkedHashSet.add(phase2Context.xid);
            linkedHashSet2.add(Long.valueOf(phase2Context.branchId));
        });
        try {
            undoLogManager.batchDeleteUndoLog(linkedHashSet, linkedHashSet2, connection);
            if (!connection.getAutoCommit()) {
                connection.commit();
            }
        } catch (SQLException e) {
            LOGGER.error("Failed to batch delete undo log", (Throwable) e);
            try {
                connection.rollback();
                addAllToCommitQueue(list);
            } catch (SQLException e2) {
                LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", (Throwable) e2);
            }
        }
    }
}
