/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.athena.cdme.mq.consumer.mqtt;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.digiwin.app.iot.mqtt.callback.DWMqttCallback;
import com.digiwin.athena.cdme.core.thread.MqttMsgThread;
import com.digiwin.athena.cdme.core.util.ActionParamsUtil;
import com.digiwin.athena.cdme.core.util.MonitorHelper;
import com.digiwin.athena.cdme.core.util.StringUtil;
import com.digiwin.athena.cdme.repository.model.MonitorRuleCdcModel;
import com.digiwin.athena.cdme.repository.model.MqttServerConfigModel;
import com.digiwin.athena.cdme.service.facade.detection.IMonitorFacadeService;
import com.digiwin.athena.cdme.service.srp.db.IMonitorRuleCdcService;
import com.digiwin.athena.cdme.service.srp.db.IMqttServerConfigService;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

public class MqttConsumerCallback
extends DWMqttCallback {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttConsumerCallback.class);
    private final IMonitorFacadeService monitorFacadeService;
    private final IMonitorRuleCdcService monitorRuleCdcService;
    private final ThreadPoolTaskExecutor cdmeMsgExecutor;
    private final IMqttServerConfigService mqttServerConfigService;

    public MqttConsumerCallback(IMonitorFacadeService monitorFacadeService, IMonitorRuleCdcService monitorRuleCdcService, ThreadPoolTaskExecutor cdmeMsgExecutor, IMqttServerConfigService mqttServerConfigService) {
        this.monitorFacadeService = monitorFacadeService;
        this.monitorRuleCdcService = monitorRuleCdcService;
        this.cdmeMsgExecutor = cdmeMsgExecutor;
        this.mqttServerConfigService = mqttServerConfigService;
    }

    public void messageArrived(String topic, MqttMessage message) {
        LOGGER.info("\u63a5\u6536\u6d88\u606f\u4e3b\u9898 : {}, \u63a5\u6536\u6d88\u606f\u5185\u5bb9 : {}", (Object)topic, (Object)new String(message.getPayload()));
        if (StringUtil.isEmpty(topic)) {
            LOGGER.warn("\u4e3b\u9898\u4e3a\u7a7a\uff0c\u6d88\u606f\u4e0d\u5904\u7406");
            return;
        }
        String[] topicArray = topic.split("/");
        if (topicArray.length < 6) {
            LOGGER.warn("\u4e3b\u9898\u683c\u5f0f\u4e0d\u7b26\u5408\u89c4\u8303\uff0c\u6d88\u606f\u4e0d\u5904\u7406\uff0ctopic={}", (Object)topic);
            return;
        }
        StringBuffer topicName = new StringBuffer();
        topicName.append(topicArray[5]);
        for (int i = 6; i < topicArray.length; ++i) {
            topicName.append("/").append(topicArray[i]);
        }
        JSONObject msgJsonObjec = JSON.parseObject((String)new String(message.getPayload()));
        String tenantId = msgJsonObjec.getString("TenantID");
        LOGGER.info("\u63a5\u6536\u6d88\u606f\u7684\u4e1a\u52a1\u4e3b\u9898\uff1a{}\uff0c\u79df\u6237\uff1a{}", (Object)topicName, (Object)tenantId);
        if (StringUtils.isBlank((CharSequence)tenantId)) {
            LOGGER.warn("\u6d88\u606f\u62a5\u6587\u4e2d\u7f3a\u5c11\u79df\u6237\u4fe1\u606f\uff0ctenantId={}", (Object)tenantId);
            return;
        }
        List<JSONObject> uniJsonObjectList = this.converter(topicArray[0], tenantId, topicName.toString(), msgJsonObjec);
        if (CollectionUtils.isEmpty(uniJsonObjectList)) {
            LOGGER.info("\u6d88\u606f\u6ca1\u6709\u5339\u914d\u7684\u4fa6\u6d4b\u89c4\u5219\uff0c\u4e0d\u505a\u540e\u7eed\u5904\u7406");
            return;
        }
        LOGGER.info("\u8f6c\u5316\u540e\u7684\u6d88\u606f\u6761\u6570\uff1a{}", (Object)uniJsonObjectList.size());
        uniJsonObjectList.forEach(uniJsonObject -> this.cdmeMsgExecutor.execute((Runnable)new MqttMsgThread(this.monitorFacadeService, (JSONObject)uniJsonObject, tenantId, topicName.toString())));
    }

    private List<JSONObject> converter(String topicArea, String tenantId, String topicName, JSONObject msgJsonObjec) {
        ArrayList uniJsonObjectList = Lists.newArrayList();
        List<MonitorRuleCdcModel> byTenantIdAndTableAndChangeType = this.monitorRuleCdcService.getByTenantIdAndTableAndChangeType(tenantId, topicName, MonitorHelper.getChangeType("c"));
        if (CollectionUtils.isNotEmpty(byTenantIdAndTableAndChangeType)) {
            List collect = byTenantIdAndTableAndChangeType.stream().map(res -> res.getDbName()).collect(Collectors.toList());
            List<MqttServerConfigModel> dbNameList = this.mqttServerConfigService.getServerList();
            if (CollectionUtils.isEmpty(dbNameList)) {
                return uniJsonObjectList;
            }
            List<MqttServerConfigModel> collect1 = dbNameList.stream().filter(res -> collect.contains(res.getBusinessSources()) && StringUtils.equals((CharSequence)res.getZone(), (CharSequence)topicArea)).collect(Collectors.toList());
            collect1.forEach(mqttServerConfigModel -> {
                ArrayList<MonitorRuleCdcModel> monitorRuleCdcModelList = new ArrayList<MonitorRuleCdcModel>();
                this.monitorRuleCdcService.queryCdcRuleByTenantIdAndTableAndOp(monitorRuleCdcModelList, tenantId, mqttServerConfigModel.getBusinessSources(), topicName, MonitorHelper.getChangeType("c"));
                if (CollectionUtils.isNotEmpty(monitorRuleCdcModelList)) {
                    monitorRuleCdcModelList.forEach(cdcRule -> {
                        if ("DISABLED".equals(cdcRule.getValid())) {
                            LOGGER.info("\u4fa6\u6d4b:{} \u4fa6\u6d4b\u89c4\u5219\u7981\u7528\uff0c\u65e0\u6cd5\u53d1\u8d77\u4fa6\u6d4b\uff0ctenantId={}\uff0ctopicName={}", new Object[]{cdcRule.getRuleId(), tenantId, topicName});
                            return;
                        }
                        if (StringUtils.isEmpty((CharSequence)cdcRule.getActionParams())) {
                            LOGGER.error("\u4fa6\u6d4b:{} \u4fa6\u6d4b\u89c4\u5219\u6ca1\u6709\u914d\u7f6e\u6570\u636e\u6620\u5c04\uff0c\u65e0\u6cd5\u53d1\u8d77\u4fa6\u6d4b\uff0ctenantId={}\uff0ctopicName={}", new Object[]{cdcRule.getRuleId(), tenantId, topicName});
                            return;
                        }
                        JSONArray actionParams = JSON.parseArray((String)cdcRule.getActionParams());
                        ActionParamsUtil.parse(actionParams, msgJsonObjec).forEach(res -> {
                            JSONObject uniJsonObject = new JSONObject();
                            uniJsonObject.put("op", (Object)"c");
                            ((JSONObject)res).put("tenant_id", (Object)tenantId);
                            ((JSONObject)res).put("ruleId", (Object)cdcRule.getRuleId());
                            uniJsonObject.put("after", res);
                            JSONObject source = new JSONObject();
                            source.put("db", (Object)mqttServerConfigModel.getBusinessSources());
                            source.put("table", (Object)topicName);
                            uniJsonObject.put("source", (Object)source);
                            uniJsonObject.put("ts_ms", (Object)System.currentTimeMillis());
                            uniJsonObjectList.add(uniJsonObject);
                            LOGGER.info("\u4fa6\u6d4b{}:\u7edf\u4e00\u683c\u5f0f\u6d88\u606f\uff1a{}", (Object)cdcRule.getRuleId(), (Object)uniJsonObject.toJSONString());
                        });
                    });
                }
            });
        }
        return uniJsonObjectList;
    }
}

