/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.dap.middleware.lmc.service.messaging.impl;

import cn.hutool.core.util.RandomUtil;
import com.digiwin.dap.middleware.lmc.entity.BaseEntity;
import com.digiwin.dap.middleware.lmc.entity.devlog.DevLog;
import com.digiwin.dap.middleware.lmc.repository.impl.DevLogRepositoryImpl;
import com.digiwin.dap.middleware.lmc.repository.impl.EventLogRepositoryImpl;
import com.digiwin.dap.middleware.lmc.repository.impl.OpLogRepositoryImpl;
import com.digiwin.dap.middleware.lmc.service.esp.log.IEspLogService;
import com.digiwin.dap.middleware.lmc.service.messaging.PacketSizeRejectStrategy;
import com.digiwin.dap.middleware.lmc.service.messaging.impl.ByteArrayPacketSizeCalculator;
import com.digiwin.dap.middleware.lmc.service.messaging.impl.KafkaMessageListenerHospice;
import com.digiwin.dap.middleware.lmc.service.messaging.impl.KafkaMessagePublisher;
import com.digiwin.dap.middleware.lmc.support.RateLimiterHelper;
import com.digiwin.dap.middleware.lmc.support.concurrent.DelegateThreadPoolTaskExecutor;
import com.digiwin.dap.middleware.lmc.support.elasticsearch.service.IElasticsearchService;
import com.digiwin.dap.middleware.lmc.support.helper.AlertHelper;
import com.digiwin.dap.middleware.lmc.util.BaseEntityUtils;
import com.digiwin.dap.middleware.util.JsonUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import javax.annotation.PostConstruct;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;

@Configuration
@ConditionalOnExpression(value="#{T(org.springframework.util.StringUtils).hasLength(environment.getProperty('spring.kafka.server'))&&T(org.springframework.util.StringUtils).hasLength(environment.getProperty('spring.elasticsearch.uris'))}")
public class KafkaMessageListenerHospice {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListenerHospice.class);
    private static final int TIMEOUT = 20;
    private static final int SAMPLE_RANGE = 10;
    @Autowired
    RateLimiterHelper rateLimiterHelper;
    @Autowired
    private KafkaMessagePublisher messagePublisher;
    @Autowired
    private DevLogRepositoryImpl devLogRepositoryImpl;
    @Autowired
    private EventLogRepositoryImpl eventLogRepositoryImpl;
    @Autowired
    private OpLogRepositoryImpl opLogRepositoryImpl;
    @Autowired
    private IEspLogService espLogService;
    @Autowired
    private PacketSizeRejectStrategy rejectStrategy;
    @Autowired
    private ByteArrayPacketSizeCalculator packetSizeCalculator;
    @Autowired
    @Qualifier(value="esTaskExecutor")
    private DelegateThreadPoolTaskExecutor daasTaskExecutor;
    @Autowired
    private IElasticsearchService elasticsearchService;
    @Autowired
    AlertHelper alertHelper;
    @Value(value="${spring.kafka.max.request.size}")
    private Long maxRequestSize;
    @Value(value="${spring.kafka.logs.probability:10000}")
    public int samplingProbability;
    @Value(value="${spring.kafka.topic.esp-log}")
    public String espTopic;
    private TpSentOffsetRecorder offsetRecorder;

    @PostConstruct
    public void init() {
        this.daasTaskExecutor.initializeExecutor();
        this.offsetRecorder = new TpSentOffsetRecorder(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void consumeMessages(List<ConsumerRecord<byte[], byte[]>> records, Acknowledgment ack, String topic) {
        long startTime = this.getStartTime();
        ArrayList<DevLog> dtoList = new ArrayList<DevLog>();
        for (int i = 0; i < records.size(); ++i) {
            DevLog devLog;
            ConsumerRecord<byte[], byte[]> r = records.get(i);
            try {
                devLog = (DevLog)JsonUtils.jsonToObj((String)new String((byte[])r.value()), DevLog.class);
            }
            catch (Exception e) {
                LOGGER.error("\u6d88\u8d39kafka\u6d88\u606f\u5f02\u5e38\uff1aJSON\u8f6c\u5316\u5f02\u5e38, \u4f4d\u7f6e\uff1a{}/{}, \u4e3b\u9898topic={}, \u6d88\u606f={}", new Object[]{i, records.size(), topic, new String((byte[])records.get(0).value()), e});
                continue;
            }
            if (devLog.getId() == null) {
                devLog.setId(UUID.randomUUID());
            }
            BaseEntityUtils.setCreateFields((BaseEntity)devLog);
            dtoList.add(devLog);
        }
        try {
            this.elasticsearchService.bulkIndex(dtoList);
        }
        catch (Exception e) {
            LOGGER.error("\u6d88\u8d39kafka\u6d88\u606f\u5f02\u5e38\uff1a\u4fdd\u5b58ES\u9519\u8bef, \u4e3b\u9898topic={},  \u6d88\u606f={}", new Object[]{topic, new String((byte[])records.get(0).value()), e});
        }
        finally {
            ack.acknowledge();
            this.samplePrintLogs(startTime, records.size(), topic);
        }
    }

    private long getStartTime() {
        if (RandomUtil.randomInt((int)(this.samplingProbability * 10)) < 10) {
            return System.currentTimeMillis();
        }
        return 0L;
    }

    private void samplePrintLogs(long startTimeMillis, int size, String topic) {
        if (startTimeMillis == 0L) {
            return;
        }
        LOGGER.info("\u62bd\u6837\u7387\uff1a1/{}, kafka Consumer records size = {}, topic = {}, \u8017\u65f6 = {}ms", new Object[]{this.samplingProbability, size, topic, System.currentTimeMillis() - startTimeMillis});
    }

    static /* synthetic */ DevLogRepositoryImpl access$000(KafkaMessageListenerHospice x0) {
        return x0.devLogRepositoryImpl;
    }

    static /* synthetic */ void access$100(KafkaMessageListenerHospice x0, List x1, Acknowledgment x2, String x3) {
        x0.consumeMessages(x1, x2, x3);
    }

    static /* synthetic */ OpLogRepositoryImpl access$200(KafkaMessageListenerHospice x0) {
        return x0.opLogRepositoryImpl;
    }

    static /* synthetic */ EventLogRepositoryImpl access$300(KafkaMessageListenerHospice x0) {
        return x0.eventLogRepositoryImpl;
    }

    static /* synthetic */ long access$400(KafkaMessageListenerHospice x0) {
        return x0.getStartTime();
    }

    static /* synthetic */ IEspLogService access$500(KafkaMessageListenerHospice x0) {
        return x0.espLogService;
    }

    static /* synthetic */ Logger access$600() {
        return LOGGER;
    }

    static /* synthetic */ void access$700(KafkaMessageListenerHospice x0, long x1, int x2, String x3) {
        x0.samplePrintLogs(x1, x2, x3);
    }

    static /* synthetic */ TpSentOffsetRecorder access$800(KafkaMessageListenerHospice x0) {
        return x0.offsetRecorder;
    }

    static /* synthetic */ DelegateThreadPoolTaskExecutor access$900(KafkaMessageListenerHospice x0) {
        return x0.daasTaskExecutor;
    }

    static /* synthetic */ KafkaMessagePublisher access$1000(KafkaMessageListenerHospice x0) {
        return x0.messagePublisher;
    }

    static /* synthetic */ IElasticsearchService access$1100(KafkaMessageListenerHospice x0) {
        return x0.elasticsearchService;
    }
}

