package com.digiwin.athena.ania.knowledge.client.sse;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.digiwin.athena.ania.knowledge.context.SseContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okio.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/* loaded from: input_file:WEB-INF/classes/com/digiwin/athena/ania/knowledge/client/sse/SSEListener.class */
public class SSEListener extends EventSourceListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SSEListener.class);
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private SSEMsgHandler msgHandler;

    public SSEListener(SSEMsgHandler sSEMsgHandler) {
        this.msgHandler = sSEMsgHandler;
    }

    @Override // okhttp3.sse.EventSourceListener
    public void onOpen(EventSource eventSource, Response response) {
        log.info("建立sse连接...");
    }

    @Override // okhttp3.sse.EventSourceListener
    public void onEvent(EventSource eventSource, String str, String str2, String str3) {
        try {
            if ("finish".equals(str2)) {
                log.info("请求结束");
            }
            if ("error".equals(str2)) {
                log.error("消息端异常 {} ", str3);
            }
            this.msgHandler.handleMsg(str3, str2);
        } catch (Exception e) {
            log.error("消息错误id:{},data:{},type:{}", str, str3, str2, e);
            this.countDownLatch.countDown();
            throw new RuntimeException(e);
        }
    }

    @Override // okhttp3.sse.EventSourceListener
    public void onClosed(EventSource eventSource) {
        log.info("sse连接关闭");
        clostSseServerEmitter(eventSource);
        this.countDownLatch.countDown();
    }

    @Override // okhttp3.sse.EventSourceListener
    public void onFailure(EventSource eventSource, Throwable th, Response response) {
        log.error("使用事件源时出现异常... [异常信息]...:{},返回信息：{}", JSONObject.toJSONString(th), JSONObject.toJSONString(response));
        clostSseServerEmitter(eventSource);
        this.countDownLatch.countDown();
    }

    public CountDownLatch getCountDownLatch() {
        return this.countDownLatch;
    }

    private void clostSseServerEmitter(EventSource eventSource) {
        RequestBody body = eventSource.request().body();
        Buffer buffer = new Buffer();
        try {
            try {
                body.writeTo(buffer);
                SseEmitter sseEmitter = SseContext.getSseEmitter(JSON.parseObject(buffer.readString(StandardCharsets.UTF_8)).getString("messageId"));
                if (sseEmitter != null) {
                    sseEmitter.complete();
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } finally {
            buffer.close();
        }
    }
}
