/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.athena.cdme.core.config;

import com.digiwin.app.iot.mqtt.ClientType;
import com.digiwin.app.iot.mqtt.DWMqttClientV2Factory;
import com.digiwin.app.iot.mqtt.config.DWMqttConfig;
import com.digiwin.app.iot.mqtt.config.IMqttConfig;
import com.digiwin.athena.cdme.core.constant.ConfigConstant;
import com.digiwin.athena.cdme.core.handler.MqttClientSingle;
import com.digiwin.athena.cdme.core.util.MqttUtil;
import com.digiwin.athena.cdme.mq.consumer.mqtt.MqttConsumerCallback;
import com.digiwin.athena.cdme.repository.model.MonitorRuleCdcModel;
import com.digiwin.athena.cdme.repository.model.MqttServerConfigModel;
import com.digiwin.athena.cdme.service.facade.detection.IMonitorFacadeService;
import com.digiwin.athena.cdme.service.srp.db.IMonitorRuleCdcService;
import com.digiwin.athena.cdme.service.srp.db.impl.MqttServerConfigService;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class CdmeMqttClientListener
implements ApplicationListener<ContextRefreshedEvent> {
    private static final Logger LOGGER = LoggerFactory.getLogger(CdmeMqttClientListener.class);
    private final AtomicBoolean isInit = new AtomicBoolean(false);
    @Autowired
    private MqttServerConfigService mqttServerConfigService;
    @Autowired
    private IMonitorRuleCdcService monitorRuleCdcService;
    @Autowired
    private IMonitorFacadeService monitorFacadeService;
    @Autowired
    @Qualifier(value="cdmeMsgExecutor")
    private ThreadPoolTaskExecutor cdmeMsgExecutor;

    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (!ConfigConstant.MQTT_OPEN) {
            LOGGER.info("MQTT\u5f00\u5173\u5173\u95ed\uff0c\u4e0d\u542f\u52a8==================================");
            return;
        }
        try {
            if (!this.isInit.compareAndSet(false, true)) {
                return;
            }
            List<MqttServerConfigModel> mqttServerEntityList = this.mqttServerConfigService.getServerList();
            if (CollectionUtils.isNotEmpty(mqttServerEntityList)) {
                Map<String, List<MqttServerConfigModel>> collect = mqttServerEntityList.stream().collect(Collectors.groupingBy(res -> res.getBusinessSources()));
                collect.forEach((groupKey, items) -> {
                    try {
                        this.initMqttClient((List<MqttServerConfigModel>)items);
                    }
                    catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }
        catch (Exception e) {
            LOGGER.error("MQTT \u542f\u52a8\u5931\u8d25\uff0c\u5931\u8d25\u539f\u56e0\uff1a{}==================================", (Throwable)e);
        }
    }

    private void initMqttClient(List<MqttServerConfigModel> mqttServerEntityList) throws Exception {
        for (MqttServerConfigModel mqttServerConfigModel : mqttServerEntityList) {
            DWMqttConfig dwMqttConfig = DWMqttConfig.getDefaultMqttConfig().clone();
            dwMqttConfig.setServerURI(mqttServerConfigModel.getServerHost());
            dwMqttConfig.setClientId("MQME_SUB_" + UUID.randomUUID().toString().replaceAll("-", ""));
            MqttConnectOptions connectionOptions = dwMqttConfig.getConnectOptions();
            if (StringUtils.isNotBlank((CharSequence)mqttServerConfigModel.getUsername())) {
                connectionOptions.setUserName(mqttServerConfigModel.getUsername());
            }
            if (StringUtils.isNotBlank((CharSequence)mqttServerConfigModel.getPwd())) {
                connectionOptions.setPassword(mqttServerConfigModel.getPwd().toCharArray());
            }
            dwMqttConfig.setConnectOptions(connectionOptions);
            DWMqttClientV2Factory clientFactory = new DWMqttClientV2Factory((IMqttConfig)dwMqttConfig);
            IMqttAsyncClient mqttClient = (IMqttAsyncClient)clientFactory.createAndConnectClient(dwMqttConfig.getClientId(), ClientType.ASYNC, (MqttCallback)new MqttConsumerCallback(this.monitorFacadeService, this.monitorRuleCdcService, this.cdmeMsgExecutor, this.mqttServerConfigService));
            LOGGER.info("\u5ba2\u6237\u7aef\uff1a{}MQTT \u542f\u52a8\u6210\u529f,ops:{}", (Object)dwMqttConfig.getClientId(), (Object)mqttServerConfigModel.getOps());
            LOGGER.info("MQTT \u5f00\u59cb\u8ba2\u9605\u4e3b\u9898\u6765\u6e90\u4e3a\uff1a{} \uff0c\u5ba2\u6237\u7aef\u7aef\u4e3a\uff1a{},\u4e0b\u7684topic", (Object)mqttServerConfigModel.getBusinessSources(), (Object)mqttServerConfigModel.getClientId());
            this.subscribeTopics(dwMqttConfig, mqttClient, mqttServerConfigModel, mqttServerConfigModel.getOps());
            MqttClientSingle.getInstance().push(mqttServerConfigModel.getBusinessSources(), mqttClient);
            MqttClientSingle.getInstance().pushConfig(mqttServerConfigModel.getBusinessSources(), mqttServerConfigModel);
        }
    }

    private void subscribeTopics(DWMqttConfig mqttConfig, IMqttAsyncClient mqttClient, MqttServerConfigModel mqttServerConfigModel, int ops) {
        List<MonitorRuleCdcModel> ruleList = this.monitorRuleCdcService.getByCategory("MQTT", mqttServerConfigModel.getBusinessSources());
        if (CollectionUtils.isEmpty(ruleList)) {
            return;
        }
        StringBuffer errorMsg = new StringBuffer();
        if (CollectionUtils.isNotEmpty(ruleList)) {
            ruleList.forEach(rule -> {
                try {
                    mqttClient.subscribe(MqttUtil.getTopicByDbname(rule.getTenantSid(), rule.getTableName(), mqttServerConfigModel.getBusinessSources(), mqttServerConfigModel.getZone()), ops).waitForCompletion(mqttConfig.getWaitForCompletion());
                    LOGGER.info("\u8ba2\u9605topic, tenantId={}\uff0c topic={} \u6210\u529f", (Object)rule.getTenantId(), (Object)rule.getTableName());
                }
                catch (MqttException e) {
                    LOGGER.warn("\u8ba2\u9605topic\u5931\u8d25, tenantId={}\uff0c topic={}\uff0c\u9519\u8bef\uff1a{}", new Object[]{rule.getTenantId(), rule.getTableName(), e});
                    errorMsg.append(rule.getTenantId()).append(",").append(rule.getTableName()).append(";");
                }
            });
        }
        if (StringUtils.isNotBlank((CharSequence)errorMsg.toString())) {
            LOGGER.warn(String.format("\u8ba2\u9605topic\u5931\u8d25\uff0c\u5931\u8d25\u7684\u79df\u6237\u548ctopic\uff1a{}", errorMsg));
        }
    }
}

