package com.digiwin.dap.middleware.lmc.service.messaging.impl;

import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.RandomUtil;
import com.digiwin.dap.middleware.lmc.config.ThreadPoolConfig;
import com.digiwin.dap.middleware.lmc.domain.esplog.CreateEspLogDTO;
import com.digiwin.dap.middleware.lmc.entity.BaseEntity;
import com.digiwin.dap.middleware.lmc.entity.devlog.DevLog;
import com.digiwin.dap.middleware.lmc.entity.esplog.EspLog;
import com.digiwin.dap.middleware.lmc.entity.event.EventLog;
import com.digiwin.dap.middleware.lmc.entity.oplog.OpLog;
import com.digiwin.dap.middleware.lmc.repository.base.EntityRepository;
import com.digiwin.dap.middleware.lmc.repository.impl.DevLogRepositoryImpl;
import com.digiwin.dap.middleware.lmc.repository.impl.EventLogRepositoryImpl;
import com.digiwin.dap.middleware.lmc.repository.impl.OpLogRepositoryImpl;
import com.digiwin.dap.middleware.lmc.service.esp.log.IEspLogService;
import com.digiwin.dap.middleware.lmc.service.messaging.KafkaMessageListener;
import com.digiwin.dap.middleware.lmc.service.messaging.PacketSizeRejectStrategy;
import com.digiwin.dap.middleware.lmc.support.RateLimiterHelper;
import com.digiwin.dap.middleware.lmc.support.concurrent.DelegateThreadPoolTaskExecutor;
import com.digiwin.dap.middleware.lmc.support.concurrent.FutureDecorator;
import com.digiwin.dap.middleware.lmc.support.elasticsearch.service.IElasticsearchService;
import com.digiwin.dap.middleware.lmc.support.exception.ESBusinessException;
import com.digiwin.dap.middleware.lmc.support.exception.ListenerExecuteStateHolder;
import com.digiwin.dap.middleware.lmc.support.helper.AlertHelper;
import com.digiwin.dap.middleware.lmc.util.BaseEntityUtils;
import com.digiwin.dap.middleware.util.JsonUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Configuration
@ConditionalOnExpression("#{T(org.springframework.util.StringUtils).hasLength(environment.getProperty('spring.kafka.server'))&&T(org.springframework.util.StringUtils).hasLength(environment.getProperty('spring.elasticsearch.uris'))}")
/* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice.class */
public class KafkaMessageListenerHospice {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaMessageListenerHospice.class);
    private static final int TIMEOUT = 20;
    private static final int SAMPLE_RANGE = 10;

    @Autowired
    RateLimiterHelper rateLimiterHelper;

    @Autowired
    private KafkaMessagePublisher messagePublisher;

    @Autowired
    private DevLogRepositoryImpl devLogRepositoryImpl;

    @Autowired
    private EventLogRepositoryImpl eventLogRepositoryImpl;

    @Autowired
    private OpLogRepositoryImpl opLogRepositoryImpl;

    @Autowired
    private IEspLogService espLogService;

    @Autowired
    private PacketSizeRejectStrategy rejectStrategy;

    @Autowired
    private ByteArrayPacketSizeCalculator packetSizeCalculator;

    @Autowired
    @Qualifier(ThreadPoolConfig.ES_TASK_EXECUTOR)
    private DelegateThreadPoolTaskExecutor daasTaskExecutor;

    @Autowired
    private IElasticsearchService elasticsearchService;

    @Autowired
    AlertHelper alertHelper;

    @Value("${spring.kafka.max.request.size}")
    private Long maxRequestSize;

    @Value("${spring.kafka.logs.probability:10000}")
    public int samplingProbability;

    @Value("${spring.kafka.topic.esp-log}")
    public String espTopic;
    private TpSentOffsetRecorder offsetRecorder;

    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$AbstractMessageListener.class */
    public abstract class AbstractMessageListener<R extends BaseEntity> implements KafkaMessageListener {
        public AbstractMessageListener() {
        }

        public void getMessageInternal(List<ConsumerRecord<byte[], byte[]>> list, Acknowledgment acknowledgment, EntityRepository<R> entityRepository, Class<R> cls) {
            long startTime = KafkaMessageListenerHospice.this.getStartTime();
            LinkedList linkedList = new LinkedList();
            PacketAggregator packetAggregator = new PacketAggregator(list, KafkaMessageListenerHospice.this.offsetRecorder, entityRepository.getEsSuffixIndex());
            while (packetAggregator.hasMore()) {
                try {
                    try {
                        List aggregate = packetAggregator.aggregate(str -> {
                            return (BaseEntity) JsonUtils.jsonToObj(str, cls);
                        });
                        if (!aggregate.isEmpty()) {
                            linkedList.add(KafkaMessageListenerHospice.this.daasTaskExecutor.submit(() -> {
                                KafkaMessageListenerHospice.this.elasticsearchService.bulkIndex(aggregate);
                            }, aggregate));
                        }
                    } catch (Exception e) {
                        KafkaMessageListenerHospice.LOGGER.error("【发送es前后置出错】", (Throwable) e);
                        list.forEach(consumerRecord -> {
                            KafkaMessageListenerHospice.this.messagePublisher.publish(new String((byte[]) consumerRecord.value()), entityRepository.getTopicName(), entityRepository.getEsSuffixIndex());
                        });
                        KafkaMessageListenerHospice.this.alertHelper.doAlert("发送es前后置告警", e, null);
                        try {
                            acknowledgment.acknowledge();
                            KafkaMessageListenerHospice.this.samplePrintLogs(startTime, list.size(), entityRepository.getTopicName());
                            return;
                        } finally {
                        }
                    }
                } catch (Throwable th) {
                    try {
                        acknowledgment.acknowledge();
                        KafkaMessageListenerHospice.this.samplePrintLogs(startTime, list.size(), entityRepository.getTopicName());
                        throw th;
                    } finally {
                    }
                }
            }
            ListenerExecuteStateHolder.setTasksAdded();
            Iterator it = linkedList.iterator();
            while (it.hasNext()) {
                execute(entityRepository, (FutureDecorator) it.next());
            }
            ListenerExecuteStateHolder.setPayloadsSent();
            if (!CollectionUtils.isEmpty(linkedList)) {
                KafkaMessageListenerHospice.this.offsetRecorder.recordOffset(list);
            }
            try {
                acknowledgment.acknowledge();
                KafkaMessageListenerHospice.this.samplePrintLogs(startTime, list.size(), entityRepository.getTopicName());
            } finally {
            }
        }

        private void execute(EntityRepository<R> entityRepository, FutureDecorator futureDecorator) {
            try {
                futureDecorator.get(20L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                KafkaMessageListenerHospice.LOGGER.warn("Interrupted!", (Throwable) e);
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                if (isEsException(e2)) {
                    KafkaMessageListenerHospice.LOGGER.error("【发送es出错-es】:{}", e2.getMessage());
                } else {
                    KafkaMessageListenerHospice.LOGGER.error("【发送es出错-non-es】", (Throwable) e2);
                    futureDecorator.getParam();
                }
            }
        }

        private boolean isEsException(Exception exc) {
            if (exc instanceof ExecutionException) {
                return exc.getCause() instanceof ESBusinessException;
            }
            return false;
        }
    }

    @Component
    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$DevLogListener.class */
    public class DevLogListener extends AbstractMessageListener<DevLog> {
        public DevLogListener() {
            super();
        }

        @Override // com.digiwin.dap.middleware.lmc.service.messaging.KafkaMessageListener
        @KafkaListener(topics = {"${spring.kafka.topic.dev-log}"}, containerFactory = "commonKafkaListenerContainerFactory", groupId = "${spring.kafka.group.dev-log}")
        public void getMessage(List<ConsumerRecord<byte[], byte[]>> list, Acknowledgment acknowledgment) {
            KafkaMessageListenerHospice.this.consumeMessages(list, acknowledgment, KafkaMessageListenerHospice.this.devLogRepositoryImpl.getTopicName());
        }
    }

    @Component
    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$EspLogListener.class */
    public class EspLogListener extends AbstractMessageListener<EspLog> {
        public EspLogListener() {
            super();
        }

        @Override // com.digiwin.dap.middleware.lmc.service.messaging.KafkaMessageListener
        @KafkaListener(topics = {"${spring.kafka.topic.esp-log}"}, containerFactory = "commonKafkaListenerContainerFactory", groupId = "${spring.kafka.group.esp-log}")
        public void getMessage(List<ConsumerRecord<byte[], byte[]>> list, Acknowledgment acknowledgment) {
            long startTime = KafkaMessageListenerHospice.this.getStartTime();
            try {
                try {
                    ArrayList arrayList = new ArrayList();
                    list.forEach(consumerRecord -> {
                        arrayList.add(JsonUtils.jsonToObj(new String((byte[]) consumerRecord.value()), CreateEspLogDTO.class));
                    });
                    KafkaMessageListenerHospice.this.espLogService.batchSaveEspLog(arrayList);
                    acknowledgment.acknowledge();
                    KafkaMessageListenerHospice.this.samplePrintLogs(startTime, list.size(), KafkaMessageListenerHospice.this.espTopic);
                } catch (Exception e) {
                    KafkaMessageListenerHospice.LOGGER.error("KafkaMessageListenerHospice.EspLogListener.getMessage error,message={}", new String(list.get(0).value()), e);
                    acknowledgment.acknowledge();
                    KafkaMessageListenerHospice.this.samplePrintLogs(startTime, list.size(), KafkaMessageListenerHospice.this.espTopic);
                }
            } catch (Throwable th) {
                acknowledgment.acknowledge();
                KafkaMessageListenerHospice.this.samplePrintLogs(startTime, list.size(), KafkaMessageListenerHospice.this.espTopic);
                throw th;
            }
        }
    }

    @Component
    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$EventLogListener.class */
    public class EventLogListener extends AbstractMessageListener<EventLog> {
        public EventLogListener() {
            super();
        }

        @Override // com.digiwin.dap.middleware.lmc.service.messaging.KafkaMessageListener
        @KafkaListener(topics = {"${spring.kafka.topic.event-log}"}, containerFactory = "commonKafkaListenerContainerFactory", groupId = "${spring.kafka.group.event-log}")
        public void getMessage(List<ConsumerRecord<byte[], byte[]>> list, Acknowledgment acknowledgment) {
            getMessageInternal(list, acknowledgment, KafkaMessageListenerHospice.this.eventLogRepositoryImpl, EventLog.class);
        }
    }

    @Component
    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$OpLogListener.class */
    public class OpLogListener extends AbstractMessageListener<OpLog> {
        public OpLogListener() {
            super();
        }

        @Override // com.digiwin.dap.middleware.lmc.service.messaging.KafkaMessageListener
        @KafkaListener(topics = {"${spring.kafka.topic.op-log}"}, containerFactory = "commonKafkaListenerContainerFactory", groupId = "${spring.kafka.group.op-log}")
        public void getMessage(List<ConsumerRecord<byte[], byte[]>> list, Acknowledgment acknowledgment) {
            getMessageInternal(list, acknowledgment, KafkaMessageListenerHospice.this.opLogRepositoryImpl, OpLog.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$PacketAggregator.class */
    public class PacketAggregator<R> {
        private final List<ConsumerRecord<byte[], byte[]>> records;
        private Long index = 0L;
        private final Integer size;
        private final TpSentOffsetRecorder offsetRecorder;
        private final String esSuffixIndex;

        public PacketAggregator(List<ConsumerRecord<byte[], byte[]>> list, TpSentOffsetRecorder tpSentOffsetRecorder, String str) {
            this.records = list;
            this.size = Integer.valueOf(list.size());
            this.offsetRecorder = tpSentOffsetRecorder;
            this.esSuffixIndex = str;
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r2v2 */
        public List<R> aggregate(Function<String, R> function) {
            String str;
            LinkedList linkedList = new LinkedList();
            long longValue = this.index.longValue();
            while (longValue < this.size.intValue()) {
                ConsumerRecord<byte[], byte[]> consumerRecord = this.records.get((int) longValue);
                if (consumerRecord != null && !ArrayUtil.isEmpty(consumerRecord.value()) && !this.offsetRecorder.filter(consumerRecord)) {
                    String str2 = new String(consumerRecord.value());
                    try {
                        linkedList.add(function.apply(str2));
                    } catch (Exception e) {
                        KafkaMessageListenerHospice.LOGGER.error("【json解析出错】,json={}", str2, e);
                        str = str2;
                        KafkaMessageListenerHospice.this.alertHelper.doAlert("json解析出错", e, str);
                    }
                }
                long j = longValue + 1;
                longValue = str;
                this.index = Long.valueOf(j);
            }
            return linkedList;
        }

        public boolean hasMore() {
            return this.index.longValue() < ((long) this.size.intValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/classes/com/digiwin/dap/middleware/lmc/service/messaging/impl/KafkaMessageListenerHospice$TpSentOffsetRecorder.class */
    public class TpSentOffsetRecorder {
        Map<String, Long> recordMap = null;

        protected TpSentOffsetRecorder() {
        }

        public void recordOffset(List<ConsumerRecord<byte[], byte[]>> list) {
            try {
                if (this.recordMap == null) {
                    this.recordMap = (Map) list.stream().collect(Collectors.toMap(consumerRecord -> {
                        return consumerRecord.topic() + consumerRecord.partition();
                    }, (v0) -> {
                        return v0.offset();
                    }, (v0, v1) -> {
                        return Math.max(v0, v1);
                    }));
                } else {
                    list.forEach(consumerRecord2 -> {
                        String str = consumerRecord2.topic() + consumerRecord2.partition();
                        Long l = this.recordMap.get(str);
                        if (l == null || consumerRecord2.offset() > l.longValue()) {
                            this.recordMap.put(str, Long.valueOf(consumerRecord2.offset()));
                        }
                    });
                }
            } catch (Exception e) {
                KafkaMessageListenerHospice.LOGGER.error("【TPSentOffsetRecorder】记录已经发送es的offset出错", (Throwable) e);
                KafkaMessageListenerHospice.this.alertHelper.doAlert("【TPSentOffsetRecorder】", e, null);
            }
        }

        public boolean filter(ConsumerRecord<byte[], byte[]> consumerRecord) {
            Long l;
            return (this.recordMap == null || (l = this.recordMap.get(new StringBuilder().append(consumerRecord.topic()).append(consumerRecord.partition()).toString())) == null || l.longValue() < consumerRecord.offset()) ? false : true;
        }
    }

    @PostConstruct
    public void init() {
        this.daasTaskExecutor.initializeExecutor();
        this.offsetRecorder = new TpSentOffsetRecorder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void consumeMessages(List<ConsumerRecord<byte[], byte[]>> list, Acknowledgment acknowledgment, String str) {
        long startTime = getStartTime();
        try {
            try {
                this.elasticsearchService.bulkIndex((List) list.stream().map(consumerRecord -> {
                    DevLog devLog = (DevLog) JsonUtils.jsonToObj(new String((byte[]) consumerRecord.value()), DevLog.class);
                    if (devLog.getId() == null) {
                        devLog.setId(UUID.randomUUID());
                    }
                    BaseEntityUtils.setCreateFields(devLog);
                    return devLog;
                }).collect(Collectors.toList()));
                acknowledgment.acknowledge();
                samplePrintLogs(startTime, list.size(), str);
            } catch (Exception e) {
                LOGGER.error("KafkaMessageListenerHospice 消费kafka消息异常, 主题topic={},  消息={}", str, new String(list.get(0).value()), e);
                acknowledgment.acknowledge();
                samplePrintLogs(startTime, list.size(), str);
            }
        } catch (Throwable th) {
            acknowledgment.acknowledge();
            samplePrintLogs(startTime, list.size(), str);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getStartTime() {
        if (RandomUtil.randomInt(this.samplingProbability * 10) < 10) {
            return System.currentTimeMillis();
        }
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void samplePrintLogs(long j, int i, String str) {
        if (j == 0) {
            return;
        }
        LOGGER.info("抽样率：1/{}, kafka Consumer records size = {}, topic = {}, 耗时 = {}ms", Integer.valueOf(this.samplingProbability), Integer.valueOf(i), str, Long.valueOf(System.currentTimeMillis() - j));
    }
}
