package com.digiwin.athena.ania.service.conversation.impl;

import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import com.digiwin.athena.ania.common.CacheConstants;
import com.digiwin.athena.ania.common.ResultBean;
import com.digiwin.athena.ania.common.ServiceException;
import com.digiwin.athena.ania.common.enums.AssistantType;
import com.digiwin.athena.ania.configuration.RedisLock;
import com.digiwin.athena.ania.dto.ConversationReceiveDto;
import com.digiwin.athena.ania.dto.conversation.ChatMessageQueryDto;
import com.digiwin.athena.ania.dto.conversation.ConversationMsgListVo;
import com.digiwin.athena.ania.dto.conversation.ConversationMsgQueryDto;
import com.digiwin.athena.ania.dto.conversation.MessageQueryDto;
import com.digiwin.athena.ania.dto.dialogue.FusionAssistantInfoDto;
import com.digiwin.athena.ania.dto.dialogue.FusionAssistantVo;
import com.digiwin.athena.ania.eventbus.message.ConversationMessageEvent;
import com.digiwin.athena.ania.mongo.domain.Conversation;
import com.digiwin.athena.ania.mongo.domain.ConversationMessage;
import com.digiwin.athena.ania.mongo.repository.ConversationDao;
import com.digiwin.athena.ania.mongo.repository.ConversationMessageDao;
import com.digiwin.athena.ania.service.assistant.FusionAssistantService;
import com.digiwin.athena.ania.service.conversation.ConversationIndexService;
import com.digiwin.athena.ania.service.conversation.ConversationMessageService;
import com.digiwin.athena.ania.service.conversation.ConversationService;
import com.digiwin.athena.ania.util.RedisUtils;
import com.digiwin.athena.ania.vo.ChatMessageListVo;
import com.digiwin.athena.appcore.auth.AppAuthContextHolder;
import com.digiwin.athena.appcore.auth.domain.AuthoredUser;
import com.digiwin.athena.appcore.exception.BusinessException;
import com.google.common.eventbus.AsyncEventBus;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:WEB-INF/classes/com/digiwin/athena/ania/service/conversation/impl/ConversationMessageServiceImpl.class */
public class ConversationMessageServiceImpl implements ConversationMessageService {

    @Resource
    private RedisLock redisLock;

    @Resource
    private ConversationDao conversationDao;

    @Resource
    private AsyncEventBus asyncEventBus;

    @Resource
    private ConversationMessageDao conversationMessageDao;

    @Resource
    private ConversationIndexService conversationIndexService;

    @Resource
    private ConversationService conversationService;

    @Resource
    private FusionAssistantService fusionAssistantService;

    @Override // com.digiwin.athena.ania.service.conversation.ConversationMessageService
    public void sendMessage(ConversationMessage conversationMessage) {
        conversationMessage.initData();
        if (Objects.isNull(conversationMessage.getCreateTime())) {
            conversationMessage.setCreateTime(Long.valueOf(System.currentTimeMillis()));
        }
        conversationMessage.setUpdateTime(Long.valueOf(System.currentTimeMillis()));
        if (Objects.isNull(conversationMessage.getIndex())) {
            conversationMessage.setIndex(this.conversationIndexService.incrementIndex(conversationMessage.getUserId(), conversationMessage.getConversationId()));
        }
        sendMessageRetry(conversationMessage, 3);
        this.asyncEventBus.post(new ConversationMessageEvent(conversationMessage, 0));
    }

    private ConversationMessage sendMessageRetry(ConversationMessage conversationMessage, int i) {
        String str = "ANIA_" + conversationMessage.getConversationId();
        try {
            String tryLock = this.redisLock.tryLock(str, 5000L);
            if (!Objects.isNull(tryLock)) {
                ConversationMessage insert = this.conversationMessageDao.insert(conversationMessage);
                if (Objects.nonNull(tryLock)) {
                    this.redisLock.unlock(str, tryLock);
                }
                return insert;
            }
            int i2 = i - 1;
            if (i2 <= 0) {
                throw BusinessException.create("send Message fail");
            }
            ConversationMessage sendMessageRetry = sendMessageRetry(conversationMessage, i2);
            if (Objects.nonNull(tryLock)) {
                this.redisLock.unlock(str, tryLock);
            }
            return sendMessageRetry;
        } catch (Throwable th) {
            if (Objects.nonNull(null)) {
                this.redisLock.unlock(str, null);
            }
            throw th;
        }
    }

    @Override // com.digiwin.athena.ania.service.conversation.ConversationMessageService
    public void updateMessage(ConversationMessage conversationMessage) {
        if (StringUtils.isBlank(conversationMessage.getId())) {
            throw new IllegalArgumentException("id to update must not be null!");
        }
        conversationMessage.initData();
        this.conversationMessageDao.save(conversationMessage);
    }

    @Override // com.digiwin.athena.ania.service.conversation.ConversationMessageService
    public ConversationMessage conversationReceive(ConversationReceiveDto conversationReceiveDto) {
        String str;
        AuthoredUser authoredUser = AppAuthContextHolder.getContext().getAuthoredUser();
        if (StrUtil.isBlank(conversationReceiveDto.getTenantId())) {
            conversationReceiveDto.setTenantId(authoredUser.getTenantId());
        }
        Conversation conversation = null;
        if (StrUtil.isNotBlank(conversationReceiveDto.getConversationId())) {
            conversation = this.conversationService.getConversation(conversationReceiveDto.getConversationId(), null, conversationReceiveDto.getTenantId(), conversationReceiveDto.getUserId());
            if (Objects.isNull(conversation)) {
                throw new ServiceException("Conversation Is Not Exist");
            }
            str = CacheConstants.ASSISTANT_ASYNC_SEND_KEY + conversationReceiveDto.getConversationId();
        } else {
            str = "ANIA:ASYNC:SEND:MESSAGE::" + conversationReceiveDto.getTenantId() + ":" + conversationReceiveDto.getUserId() + ":" + conversationReceiveDto.getAssistantId();
        }
        if (!this.redisLock.acquireLock(str, UUID.randomUUID().toString(), 10000L, 10000L)) {
            throw new ServiceException("系统繁忙，请稍后重试");
        }
        try {
            if (Objects.isNull(conversation)) {
                conversation = this.conversationDao.queryLatestConversationByAgentId(conversationReceiveDto.getTenantId(), conversationReceiveDto.getUserId(), conversationReceiveDto.getAssistantId());
                if (Objects.isNull(conversation)) {
                    FusionAssistantVo assistantBaseInfo = this.fusionAssistantService.assistantBaseInfo(new FusionAssistantInfoDto(null, conversationReceiveDto.getAssistantId(), null, false), authoredUser);
                    if (Objects.isNull(assistantBaseInfo)) {
                        throw new ServiceException("Assistant Is Not Exist");
                    }
                    conversation = this.conversationService.creatConversation(assistantBaseInfo.getAssistantName(), assistantBaseInfo, conversationReceiveDto.getUserId(), conversationReceiveDto.getTenantId());
                }
            }
            ConversationMessage externalSendMessage = externalSendMessage(conversationReceiveDto, conversation);
            this.redisLock.releaseLock(str);
            return externalSendMessage;
        } catch (Throwable th) {
            this.redisLock.releaseLock(str);
            throw th;
        }
    }

    private ConversationMessage externalSendMessage(ConversationReceiveDto conversationReceiveDto, Conversation conversation) {
        ConversationMessage conversationReceive = ConversationMessage.conversationReceive(conversationReceiveDto, conversation);
        String str = CacheConstants.ASSISTANT_CHAT_ING_KEY + conversationReceive.getConversationId();
        String cacheStrData = RedisUtils.getCacheStrData(str);
        if (StringUtils.isNotBlank(cacheStrData)) {
            RedisUtils.convertAndSend(Objects.equals(conversation.getAgentType(), AssistantType.HISTORY_ASSISTANT.getType()) ? CacheConstants.SSE_DISCONNECT : CacheConstants.AGENT_SSE_STOP, cacheStrData);
        }
        if (!RedisUtils.noKey(str, 5000L)) {
            throw new ServiceException("正在对话中，发送频率过高，请稍后再试");
        }
        sendMessage(conversationReceive);
        return conversationReceive;
    }

    @Override // com.digiwin.athena.ania.service.conversation.ConversationMessageService
    public ConversationMsgListVo list(ConversationMsgQueryDto conversationMsgQueryDto) {
        AuthoredUser authoredUser = AppAuthContextHolder.getContext().getAuthoredUser();
        String conversationId = conversationMsgQueryDto.getConversationId();
        if (StringUtils.isBlank(conversationId)) {
            String assistantId = conversationMsgQueryDto.getAssistantId();
            if (StringUtils.isBlank(assistantId)) {
                new ServiceException("conversation Not exist");
            }
            Conversation queryLatestConversationByAgentId = this.conversationDao.queryLatestConversationByAgentId(authoredUser.getTenantId(), authoredUser.getUserId(), assistantId);
            Assert.notNull(queryLatestConversationByAgentId, () -> {
                return new ServiceException("conversation Not exist");
            });
            conversationMsgQueryDto.setConversationId(queryLatestConversationByAgentId.getConversationId());
        }
        if (StrUtil.isNotBlank(conversationMsgQueryDto.getAfterId())) {
            ConversationMessage findByMessageId = this.conversationMessageDao.findByMessageId(conversationId, conversationMsgQueryDto.getAfterId());
            Assert.notNull(findByMessageId, () -> {
                return new ServiceException("afterId不存在请核对数据");
            });
            conversationMsgQueryDto.setEndIndex(findByMessageId.getIndex());
        }
        if (StrUtil.isNotBlank(conversationMsgQueryDto.getBeforeId())) {
            ConversationMessage findByMessageId2 = this.conversationMessageDao.findByMessageId(conversationId, conversationMsgQueryDto.getBeforeId());
            Assert.notNull(findByMessageId2, () -> {
                return new ServiceException("beforeId不存在请核对数据");
            });
            conversationMsgQueryDto.setStartIndex(findByMessageId2.getIndex());
        }
        return ConversationMsgListVo.map(conversationMsgQueryDto, this.conversationMessageDao.findLatelyConversationMessage(conversationMsgQueryDto, authoredUser));
    }

    @Override // com.digiwin.athena.ania.service.conversation.ConversationMessageService
    public ResultBean<ChatMessageListVo> queryMessage(ChatMessageQueryDto chatMessageQueryDto, AuthoredUser authoredUser) {
        ChatMessageListVo chatMessageListVo = new ChatMessageListVo();
        List<ConversationMessage> queryMessage = this.conversationMessageDao.queryMessage(chatMessageQueryDto, authoredUser, Integer.valueOf(chatMessageQueryDto.getPageSize() + 1));
        if (CollectionUtils.isNotEmpty(queryMessage) && queryMessage.size() > chatMessageQueryDto.getPageSize()) {
            queryMessage.remove(chatMessageQueryDto.getPageSize());
            chatMessageListVo.setHasMore(true);
        }
        chatMessageListVo.setMessages(queryMessage);
        return ResultBean.success(0, chatMessageListVo);
    }

    @Override // com.digiwin.athena.ania.service.conversation.ConversationMessageService
    public List<ConversationMessage> queryMessage(MessageQueryDto messageQueryDto, AuthoredUser authoredUser) {
        return this.conversationMessageDao.queryMessage(messageQueryDto.getMessageIds(), authoredUser.getUserId(), authoredUser.getTenantId());
    }
}
