package com.digiwin.chatbi.reasoning.pipeline;

import com.alibaba.fastjson.JSONObject;
import com.digiwin.chatbi.beans.pojos.Question;
import com.digiwin.chatbi.common.constant.Constants;
import com.digiwin.chatbi.common.exception.PipelineAssert;
import com.digiwin.chatbi.common.util.MdcUtil;
import com.digiwin.chatbi.config.ThreadPoolManager;
import com.digiwin.chatbi.reasoning.executor.Executor;
import com.digiwin.chatbi.reasoning.pipeline.condition.Condition;
import com.digiwin.chatbi.reasoning.pipeline.result.Output;
import io.micrometer.core.instrument.binder.BaseUnits;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:WEB-INF/classes/com/digiwin/chatbi/reasoning/pipeline/PipelineExecutor.class */
public class PipelineExecutor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PipelineExecutor.class);
    private final JSONObject context = new JSONObject();
    private boolean finish = false;
    private Output lastOutput;

    public PipelineExecutor execute(Executor... executorArr) {
        if (this.finish) {
            return this;
        }
        if (executorArr == null || executorArr.length == 0) {
            return this;
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        String str = null;
        if (Objects.nonNull(this.context) && Objects.nonNull(this.context.get(Constants.QUESTION)) && (this.context.get(Constants.QUESTION) instanceof Question)) {
            str = ((Question) this.context.getObject(Constants.QUESTION, Question.class)).getPtxId();
        }
        ExecutorService pipelineExecutorPool = ThreadPoolManager.getPipelineExecutorPool();
        ArrayList arrayList = new ArrayList();
        for (Executor executor : executorArr) {
            if (executor.onCondition(this.context)) {
                String str2 = str;
                arrayList.add(CompletableFuture.runAsync(() -> {
                    try {
                        log.info("PipelineExecutor_execute_ptxId:{}", str2);
                        if (StringUtils.isNotEmpty(str2)) {
                            MdcUtil.setTraceId(str2);
                        }
                        Arrays.stream(executor.requiredParams()).forEach(str3 -> {
                            PipelineAssert.PIPE_LINE_PARAM_MISSING_ASSERT.isTrue(this.context.containsKey(str3), new Object[0]);
                        });
                        Output output = (Output) PipelineAssert.PIPELINE_EXECUTE_FAIL_ASSERT.tryCatch(() -> {
                            StopWatch createStarted = StopWatch.createStarted();
                            Output doProcess = executor.doProcess(this.context);
                            createStarted.stop();
                            log.info("Pipeline: " + executor.pipelineName() + " execution time: " + createStarted.getTime() + BaseUnits.MILLISECONDS);
                            if (createStarted.getTime() > 0) {
                                hashMap.put(executor.pipelineName(), String.valueOf(createStarted.getTime()));
                                hashMap.put(executor.pipelineName() + " startTime ", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date(createStarted.getStartTime())));
                            }
                            hashMap2.put(executor.pipelineName(), doProcess);
                            return doProcess;
                        }, exc -> {
                            log.error("Pipeline异常：{}", (Throwable) exc);
                            JSONObject jSONObject = new JSONObject();
                            jSONObject.put("error", (Object) exc);
                            return Output.finish(jSONObject);
                        });
                        this.context.putAll(output.getOutputContext());
                        synchronized (this) {
                            this.lastOutput = output;
                        }
                        if (output.isEndpoint()) {
                            synchronized (this) {
                                this.finish = true;
                            }
                        }
                    } finally {
                        if (StringUtils.isNotEmpty(str2)) {
                            MDC.clear();
                        }
                    }
                }, pipelineExecutorPool));
            }
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).get();
        } catch (Exception e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            log.error("执行任务时发生异常", (Throwable) e);
        }
        if (!hashMap.isEmpty()) {
            Pipeline.addTimeTrace(hashMap);
        }
        if (!hashMap2.isEmpty()) {
            Pipeline.addOutputTrace(hashMap2);
        }
        return this;
    }

    public void executeWhen(Condition condition) {
        execute(condition.choose(this.context));
    }

    public PipelineExecutor finallyExecute(Executor executor) {
        this.lastOutput = (Output) PipelineAssert.PIPELINE_EXECUTE_FAIL_ASSERT.tryCatch(() -> {
            return executor.doProcess(this.context);
        }, exc -> {
            log.error("Pipeline异常：{}", (Throwable) exc);
            JSONObject jSONObject = new JSONObject();
            jSONObject.put("error", (Object) exc);
            return Output.finish(jSONObject);
        });
        this.context.putAll(this.lastOutput.getOutputContext());
        return this;
    }

    public Output getResult() {
        return this.lastOutput;
    }

    public String toString() {
        return this.context.toJSONString();
    }
}
