/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.athena.cdme.provider;

import com.digiwin.app.common.DWApplicationConfigUtils;
import com.digiwin.app.iot.mqtt.ClientType;
import com.digiwin.app.iot.mqtt.DWMqttClientV2Factory;
import com.digiwin.app.iot.mqtt.config.DWMqttConfig;
import com.digiwin.app.iot.mqtt.config.IMqttConfig;
import com.digiwin.athena.cdme.core.enums.ErrorCodeEnum;
import com.digiwin.athena.cdme.core.exception.ArgumentValidException;
import com.digiwin.athena.cdme.core.handler.MqttClientSingle;
import com.digiwin.athena.cdme.core.util.MqttUtil;
import com.digiwin.athena.cdme.core.util.StringUtil;
import com.digiwin.athena.cdme.mq.consumer.mqtt.MqttConsumerCallback;
import com.digiwin.athena.cdme.pojo.request.MqttRequest;
import com.digiwin.athena.cdme.provider.IMqttOpsService;
import com.digiwin.athena.cdme.repository.model.MonitorRuleCdcModel;
import com.digiwin.athena.cdme.repository.model.MqttServerConfigModel;
import com.digiwin.athena.cdme.service.client.IIamClient;
import com.digiwin.athena.cdme.service.facade.detection.IMonitorFacadeService;
import com.digiwin.athena.cdme.service.srp.db.IMonitorRuleCdcService;
import com.digiwin.athena.cdme.service.srp.db.impl.MqttServerConfigService;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;

@Service(value="cdmeMqttOpsService")
public class MqttOpsService
implements IMqttOpsService {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqttOpsService.class);
    @Autowired
    private IMonitorRuleCdcService monitorRuleCdcService;
    @Autowired
    private MqttServerConfigService mqttServerConfigService;
    @Autowired
    private IMonitorFacadeService monitorFacadeService;
    @Autowired
    @Qualifier(value="cdmeMsgExecutor")
    private ThreadPoolTaskExecutor cdmeMsgExecutor;
    private final IIamClient iamClient;

    public MqttOpsService(IIamClient iamClient) {
        this.iamClient = iamClient;
    }

    @Override
    public String connect(MqttRequest request) {
        this.checkTenant(request.getTenantId());
        if (StringUtils.isBlank((CharSequence)request.getSource())) {
            return "MQTT\u6765\u6e90\u4e0d\u53ef\u4e3a\u7a7a";
        }
        IMqttAsyncClient mqttAsyncClient = MqttOpsService.getiMqttAsyncClient(request);
        if (null == mqttAsyncClient) {
            return "MQTT\u5ba2\u6237\u7aef\u4e0d\u5b58\u5728";
        }
        if (mqttAsyncClient.isConnected()) {
            return "MQTT\u5ba2\u6237\u7aef\u5df2\u8fde\u63a5\uff0c\u8bf7\u52ff\u91cd\u590d\u8fde\u63a5";
        }
        try {
            mqttAsyncClient.connect();
        }
        catch (MqttException e) {
            return "MQTT\u8fde\u63a5\u5931\u8d25\uff1a" + e.toString();
        }
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            LOGGER.info(e.getMessage());
        }
        List<MonitorRuleCdcModel> ruleList = this.monitorRuleCdcService.getListByCategory("MQTT");
        MqttServerConfigModel mqttServerConfigModel = MqttClientSingle.getInstance().pullConfig(request.getSource());
        if (CollectionUtils.isNotEmpty(ruleList)) {
            Map<String, List<MonitorRuleCdcModel>> map = ruleList.stream().collect(Collectors.groupingBy(t -> t.getDbName()));
            map.forEach((dbname, rules) -> {
                if (CollectionUtils.isNotEmpty((Collection)rules)) {
                    rules.forEach(rule -> {
                        try {
                            mqttAsyncClient.subscribe(MqttUtil.getTopicByDbname(rule.getTenantSid(), rule.getTableName(), dbname, mqttServerConfigModel.getZone()), mqttServerConfigModel.getOps()).waitForCompletion(1000L);
                        }
                        catch (Exception e) {
                            LOGGER.warn("\u8ba2\u9605topic\u5931\u8d25, tenantId={}\uff0c topic={}\uff0c\u9519\u8bef\uff1a{}", new Object[]{rule.getTenantId(), rule.getTableName(), e});
                        }
                    });
                }
            });
        }
        return "MQTT\u8fde\u63a5\u6210\u529f";
    }

    private static IMqttAsyncClient getiMqttAsyncClient(MqttRequest request) {
        IMqttAsyncClient mqttAsyncClient = MqttClientSingle.getInstance().pull(request.getSource());
        return mqttAsyncClient;
    }

    @Override
    public String disconnect(MqttRequest request) {
        this.checkTenant(request.getTenantId());
        if (StringUtils.isBlank((CharSequence)request.getSource())) {
            return "MQTT\u6765\u6e90\u4e0d\u53ef\u4e3a\u7a7a";
        }
        IMqttAsyncClient mqttAsyncClient = MqttOpsService.getiMqttAsyncClient(request);
        if (null == mqttAsyncClient) {
            return "MQTT\u5ba2\u6237\u7aef\u4e0d\u5b58\u5728";
        }
        if (!mqttAsyncClient.isConnected()) {
            return "MQTT\u5ba2\u6237\u7aef\u5df2\u65ad\u5f00\uff0c\u8bf7\u52ff\u91cd\u590d\u65ad\u5f00";
        }
        try {
            mqttAsyncClient.disconnect();
        }
        catch (MqttException e) {
            return "MQTT\u65ad\u5f00\u5931\u8d25\uff1a" + (Object)((Object)e);
        }
        return "MQTT\u65ad\u5f00\u6210\u529f";
    }

    @Override
    public String sub(MqttRequest request) {
        if (StringUtil.isEmpty(request.getTopic())) {
            return "\u4e3b\u9898\u4e0d\u80fd\u4e3a\u7a7a";
        }
        IMqttAsyncClient mqttAsyncClient = MqttOpsService.getiMqttAsyncClient(request);
        if (StringUtils.isBlank((CharSequence)request.getSource())) {
            return "MQTT\u6765\u6e90\u4e0d\u53ef\u4e3a\u7a7a";
        }
        if (null == mqttAsyncClient) {
            return "MQTT\u5ba2\u6237\u7aef\u4e0d\u5b58\u5728";
        }
        if (!mqttAsyncClient.isConnected()) {
            return "MQTT\u5ba2\u6237\u7aef\u5df2\u65ad\u5f00\uff0c\u8bf7\u5148\u8fde\u63a5MQTT\u5ba2\u6237\u7aef";
        }
        MqttServerConfigModel mqttServerConfigModel = MqttClientSingle.getInstance().pullConfig(request.getSource());
        try {
            mqttAsyncClient.subscribe(request.getTopic(), mqttServerConfigModel.getOps()).waitForCompletion(1000L);
        }
        catch (MqttException e) {
            return "MQTT\u8ba2\u9605\u5931\u8d25\uff1a" + e.toString();
        }
        return "\u8ba2\u9605\u6210\u529f";
    }

    @Override
    public String unsub(MqttRequest request) {
        if (StringUtil.isEmpty(request.getTopic())) {
            return "\u4e3b\u9898\u4e0d\u80fd\u4e3a\u7a7a";
        }
        IMqttAsyncClient mqttAsyncClient = MqttOpsService.getiMqttAsyncClient(request);
        if (StringUtils.isBlank((CharSequence)request.getSource())) {
            return "MQTT\u6765\u6e90\u4e0d\u53ef\u4e3a\u7a7a";
        }
        if (null == mqttAsyncClient) {
            return "MQTT\u5ba2\u6237\u7aef\u4e0d\u5b58\u5728";
        }
        if (!mqttAsyncClient.isConnected()) {
            return "MQTT\u5ba2\u6237\u7aef\u5df2\u65ad\u5f00\uff0c\u8bf7\u5148\u8fde\u63a5MQTT\u5ba2\u6237\u7aef";
        }
        try {
            mqttAsyncClient.unsubscribe(request.getTopic());
        }
        catch (MqttException e) {
            return "MQTT\u89e3\u9664\u8ba2\u9605\u5931\u8d25\uff1a" + (Object)((Object)e);
        }
        return "\u89e3\u9664\u8ba2\u9605\u6210\u529f";
    }

    @Override
    public void reset(MqttRequest request) {
        if (!Boolean.parseBoolean(DWApplicationConfigUtils.getProperty((String)"cdme.mqttOpen"))) {
            LOGGER.info("MQTT\u5f00\u5173\u5173\u95ed\uff0c\u4e0d\u542f\u52a8==================================");
            return;
        }
        try {
            List<MqttServerConfigModel> mqttServerEntityList = this.mqttServerConfigService.getServerList();
            if (CollectionUtils.isNotEmpty(mqttServerEntityList)) {
                if (StringUtils.isNotBlank((CharSequence)request.getBusinessSources())) {
                    mqttServerEntityList = mqttServerEntityList.stream().filter(res -> StringUtils.equals((CharSequence)res.getBusinessSources(), (CharSequence)request.getBusinessSources())).collect(Collectors.toList());
                }
                if (CollectionUtils.isNotEmpty(mqttServerEntityList)) {
                    Map<String, List<MqttServerConfigModel>> collect = mqttServerEntityList.stream().collect(Collectors.groupingBy(res -> res.getBusinessSources() + "_" + res.getClusterGroup()));
                    collect.forEach((groupKey, items) -> {
                        try {
                            this.initMqttClient((List<MqttServerConfigModel>)items);
                        }
                        catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                    });
                }
            }
        }
        catch (Exception e) {
            LOGGER.error("MQTT \u542f\u52a8\u5931\u8d25\uff0c\u5931\u8d25\u539f\u56e0\uff1a{}==================================", (Throwable)e);
        }
    }

    @Override
    public void initMqttByBusinessSource(String businessSource) throws Exception {
        MqttServerConfigModel serverListByBusinessSources = this.mqttServerConfigService.getServerListByBusinessSources(businessSource);
        this.initMqttClient(Lists.newArrayList((Object[])new MqttServerConfigModel[]{serverListByBusinessSources}));
    }

    public void initMqttClient(List<MqttServerConfigModel> mqttServerEntityList) throws Exception {
        if (!Boolean.parseBoolean(DWApplicationConfigUtils.getProperty((String)"cdme.mqttOpen"))) {
            LOGGER.info("MQTT\u5f00\u5173\u5173\u95ed\uff0c\u4e0d\u542f\u52a8==================================");
            return;
        }
        for (MqttServerConfigModel mqttServerConfigModel : mqttServerEntityList) {
            DWMqttConfig dwMqttConfig = DWMqttConfig.getDefaultMqttConfig().clone();
            dwMqttConfig.setServerURI(mqttServerConfigModel.getServerHost());
            dwMqttConfig.setClientId("MQME_SUB_" + UUID.randomUUID().toString().replaceAll("-", ""));
            MqttConnectOptions connectionOptions = dwMqttConfig.getConnectOptions();
            if (StringUtils.isNotBlank((CharSequence)mqttServerConfigModel.getUsername())) {
                connectionOptions.setUserName(mqttServerConfigModel.getUsername());
            }
            if (StringUtils.isNotBlank((CharSequence)mqttServerConfigModel.getPwd())) {
                connectionOptions.setPassword(mqttServerConfigModel.getPwd().toCharArray());
            }
            dwMqttConfig.setConnectOptions(connectionOptions);
            DWMqttClientV2Factory clientFactory = new DWMqttClientV2Factory((IMqttConfig)dwMqttConfig);
            IMqttAsyncClient mqttClient = (IMqttAsyncClient)clientFactory.createAndConnectClient(dwMqttConfig.getClientId(), ClientType.ASYNC, (MqttCallback)new MqttConsumerCallback(this.monitorFacadeService, this.monitorRuleCdcService, this.cdmeMsgExecutor, this.mqttServerConfigService));
            LOGGER.info("\u5ba2\u6237\u7aef\uff1a{}MQTT \u542f\u52a8\u6210\u529f,ops:{}", (Object)dwMqttConfig.getClientId(), (Object)mqttServerConfigModel.getOps());
            LOGGER.info("MQTT \u5f00\u59cb\u8ba2\u9605\u4e3b\u9898\u6765\u6e90\u4e3a\uff1a{} \uff0c\u5ba2\u6237\u7aef\u7aef\u4e3a\uff1a{},\u4e0b\u7684topic", (Object)mqttServerConfigModel.getBusinessSources(), (Object)mqttServerConfigModel.getClientId());
            this.subscribeTopics(dwMqttConfig, mqttClient, mqttServerConfigModel, mqttServerConfigModel.getOps());
            MqttClientSingle.getInstance().push(mqttServerConfigModel.getBusinessSources(), mqttClient);
            MqttClientSingle.getInstance().pushConfig(mqttServerConfigModel.getBusinessSources(), mqttServerConfigModel);
        }
    }

    private void subscribeTopics(DWMqttConfig mqttConfig, IMqttAsyncClient mqttClient, MqttServerConfigModel mqttServerConfigModel, int ops) {
        List<MonitorRuleCdcModel> ruleList = this.monitorRuleCdcService.getByCategory("MQTT", mqttServerConfigModel.getBusinessSources());
        if (CollectionUtils.isEmpty(ruleList)) {
            return;
        }
        StringBuffer errorMsg = new StringBuffer();
        if (CollectionUtils.isNotEmpty(ruleList)) {
            ruleList.forEach(rule -> {
                try {
                    mqttClient.subscribe(MqttUtil.getTopicByDbname(rule.getTenantSid(), rule.getTableName(), mqttServerConfigModel.getBusinessSources(), mqttServerConfigModel.getZone()), ops).waitForCompletion(mqttConfig.getWaitForCompletion());
                    LOGGER.info("\u8ba2\u9605topic, tenantId={}\uff0c topic={} \u6210\u529f", (Object)rule.getTenantId(), (Object)rule.getTableName());
                }
                catch (MqttException e) {
                    LOGGER.warn("\u8ba2\u9605topic\u5931\u8d25, tenantId={}\uff0c topic={}\uff0c\u9519\u8bef\uff1a{}", new Object[]{rule.getTenantId(), rule.getTableName(), e});
                    errorMsg.append(rule.getTenantId()).append(",").append(rule.getTableName()).append(";");
                }
            });
        }
        if (StringUtils.isNotBlank((CharSequence)errorMsg.toString())) {
            LOGGER.warn(String.format("\u8ba2\u9605topic\u5931\u8d25\uff0c\u5931\u8d25\u7684\u79df\u6237\u548ctopic\uff1a{}", errorMsg));
        }
    }

    private void checkTenant(String tenantId) {
        if (StringUtil.isBlank(tenantId)) {
            throw new ArgumentValidException(ErrorCodeEnum.PARAM_EMPTY_ERR);
        }
        if (!this.iamClient.isTenantIdMatchByToken(tenantId)) {
            throw new ArgumentValidException(ErrorCodeEnum.TOKEN_NOT_MATCH_TENANT_ID);
        }
    }
}

