package com.digiwin.athena.ania.service.impl;

import com.alibaba.druid.sql.ast.SQLDataType;
import com.alibaba.fastjson.JSONObject;
import com.digiwin.athena.ania.common.AgoraApiConstant;
import com.digiwin.athena.ania.common.AgoraConstant;
import com.digiwin.athena.ania.dto.agora.AgoraResponseDTO;
import com.digiwin.athena.ania.dto.agora.ImportMessageRequestDTO;
import com.digiwin.athena.ania.entity.ImMessageLog;
import com.digiwin.athena.ania.entity.MigrateMessageMapping;
import com.digiwin.athena.ania.env.EnvProperties;
import com.digiwin.athena.ania.helper.AgoraImHelper;
import com.digiwin.athena.ania.mapper.mongo.MessageMongoMapper;
import com.digiwin.athena.ania.mapper.mongo.MigrateMessageMapper;
import com.digiwin.athena.ania.mongo.repository.AsaKnowledgeMessageDao;
import com.digiwin.athena.ania.service.IMigrateMessageService;
import com.digiwin.athena.ania.util.RedisRateLimiter;
import com.digiwin.athena.ania.util.RedisUtils;
import java.util.Map;
import java.util.Objects;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
/* loaded from: input_file:WEB-INF/classes/com/digiwin/athena/ania/service/impl/MigrateMessageServiceImpl.class */
public class MigrateMessageServiceImpl implements IMigrateMessageService {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MigrateMessageServiceImpl.class);

    @Autowired
    private AgoraImHelper agoraImHelper;

    @Autowired
    private RedisRateLimiter redisRateLimiter;

    @Autowired
    private MigrateMessageMapper migrateMessageMapper;

    @Autowired
    private MessageMongoMapper messageMongoMapper;

    @Autowired
    private AsaKnowledgeMessageDao asaKnowledgeMessageDao;

    @Autowired
    private EnvProperties envProperties;

    @Autowired
    private RestTemplate restTemplate;

    @Override // com.digiwin.athena.ania.service.IMigrateMessageService
    public void asyncMigrateMessage(ImMessageLog imMessageLog) throws InterruptedException {
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("from", (Object) imMessageLog.getFromAccount());
        jSONObject.put("to", (Object) imMessageLog.getToAccount());
        jSONObject.put("type", (Object) imMessageLog.getMsgType());
        jSONObject.put("body", (Object) imMessageLog.getMsgBody());
        jSONObject.put("sync_device", (Object) false);
        jSONObject.put("ext", (Object) imMessageLog.getMsgExt());
        jSONObject.put("ext.em_ignore_notification", (Object) false);
        ImportMessageRequestDTO build = ImportMessageRequestDTO.builder().from(imMessageLog.getFromAccount()).target(imMessageLog.getToAccount()).type(imMessageLog.getMsgType()).body(jSONObject).isAckRead(true).msgTimestamp(imMessageLog.getMsgTimestamp().longValue()).needDownload(false).build();
        while (!this.redisRateLimiter.isAllowed("migrateMessage", 100, 1)) {
            Thread.sleep(10L);
            log.info("被限流，等待中........");
        }
        log.info("云信消息导入agora成功,result:{}", JSONObject.toJSONString(this.agoraImHelper.buildPostRequest(AgoraApiConstant.AGORA_SEND_P2P_MESSAGE_IMPORT, JSONObject.parseObject(JSONObject.toJSONString(build)), "")));
    }

    @Override // com.digiwin.athena.ania.service.IMigrateMessageService
    public void syncMigrateMessage(ImMessageLog imMessageLog) {
        JSONObject jSONObject = new JSONObject();
        String str = AgoraConstant.AgoraMessageTypeConstant.AGORA_MASSAGE_TYPE_TXT;
        if (SQLDataType.Constants.TEXT.equals(imMessageLog.getMsgType())) {
            jSONObject.put("msg", (Object) imMessageLog.getMsgBody().getString("text"));
        }
        if ("CUSTOM".equals(imMessageLog.getMsgType())) {
            str = "custom";
            jSONObject.put("customEvent", (Object) "custom_event");
            jSONObject.put("customExts", (Object) imMessageLog.getMsgBody());
        }
        ImportMessageRequestDTO build = ImportMessageRequestDTO.builder().from(imMessageLog.getFromAccount()).target(imMessageLog.getToAccount()).type(str).body(jSONObject).ext(imMessageLog.getMsgExt()).isAckRead(true).msgTimestamp(imMessageLog.getMsgTimestamp().longValue()).needDownload(false).build();
        while (true) {
            try {
                if (RedisUtils.isAllowed("ania:migrateMessage", 100)) {
                    try {
                        log.info("导入消息开始，消息的msgIdServer：{}", imMessageLog.getMsgidServer());
                        AgoraResponseDTO buildPostRequest = this.agoraImHelper.buildPostRequest(AgoraApiConstant.AGORA_SEND_P2P_MESSAGE_IMPORT, JSONObject.parseObject(JSONObject.toJSONString(build)), "");
                        log.info("导入消息完成，消息的msgIdServer：{}", imMessageLog.getMsgidServer());
                        if (!Objects.nonNull(buildPostRequest.getData())) {
                            break;
                        }
                        String string = MapUtils.getString((Map) buildPostRequest.getData(), "msg_id");
                        this.migrateMessageMapper.insert(MigrateMessageMapping.builder().fromAccount(imMessageLog.getFromAccount()).originalMsgidServer(imMessageLog.getMsgidServer()).newMsgidServer(string).createTime(Long.valueOf(System.currentTimeMillis())).build());
                        log.info("保存MigrateMessageMapping完成，agora消息的msgId：{}", string);
                        log.info("云信消息导入agora成功, result:{}", buildPostRequest);
                        break;
                    } catch (Exception e) {
                        if (!e.getMessage().contains("too many queries")) {
                            break;
                        }
                        log.error("导入消息失败，因为被限流........", (Throwable) e);
                        Thread.sleep(10L);
                    }
                } else {
                    log.info("被限流，等待中........");
                    Thread.sleep(10L);
                }
            } catch (InterruptedException e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    @Override // com.digiwin.athena.ania.service.IMigrateMessageService
    public void handleMigrateMessage(MigrateMessageMapping migrateMessageMapping) {
        log.info("开始处理ania_message表,原来的msgIdServer：{}", migrateMessageMapping.getOriginalMsgidServer());
        long currentTimeMillis = System.currentTimeMillis();
        ImMessageLog queryMessageById = this.messageMongoMapper.queryMessageById(migrateMessageMapping.getOriginalMsgidServer());
        if (Objects.nonNull(queryMessageById)) {
            this.messageMongoMapper.updateMessageServerId(queryMessageById, migrateMessageMapping.getNewMsgidServer());
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        log.info("刷新ania_message表耗时：{}ms", Long.valueOf(currentTimeMillis2 - currentTimeMillis));
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.set("t", String.valueOf(System.currentTimeMillis()));
        HttpEntity<?> httpEntity = new HttpEntity<>(migrateMessageMapping, httpHeaders);
        log.info("开始处理workitemInfoData表,原来的msgIdServer：{}", migrateMessageMapping.getOriginalMsgidServer());
        log.debug("刷新 workitemInfoData.lastBnaMsgId 入参：{}", JSONObject.toJSONString(migrateMessageMapping));
        log.debug("刷新 workitemInfoData.lastBnaMsgId 出参：{}", JSONObject.toJSONString(this.restTemplate.exchange(this.envProperties.getAsaUri() + "/asa/test/migrate/message/lastBnaMsgId/refresh", HttpMethod.POST, httpEntity, new ParameterizedTypeReference<JSONObject>() { // from class: com.digiwin.athena.ania.service.impl.MigrateMessageServiceImpl.1
        }, migrateMessageMapping)));
        log.info("刷新workitemInfoData.lastBnaMsgId表耗时：{}ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
    }
}
