/*
 * Decompiled with CFR 0.152.
 */
package com.jugg.agile.middleware.rocketmq.consumer.push;

import com.jugg.agile.framework.core.config.JaProperty;
import com.jugg.agile.framework.core.config.JaSpringProperty;
import com.jugg.agile.framework.core.dapper.JaDapper;
import com.jugg.agile.framework.core.dapper.log.JaLog;
import com.jugg.agile.framework.core.dapper.meta.NodeKind;
import com.jugg.agile.framework.core.dapper.meta.NodeSpan;
import com.jugg.agile.framework.core.dapper.meta.RespSpan;
import com.jugg.agile.framework.core.meta.JaFunctionP;
import com.jugg.agile.framework.core.util.JaShutdownHookUtil;
import com.jugg.agile.framework.core.util.JaStringUtil;
import com.jugg.agile.framework.core.util.JaValidateUtil;
import com.jugg.agile.framework.core.util.datastructure.JaCollectionUtil;
import com.jugg.agile.framework.core.util.io.serialize.json.JaJson;
import com.jugg.agile.framework.core.util.reflect.clazz.JaClassUtil;
import com.jugg.agile.middleware.rocketmq.consumer.JaRocketMqConsumerType;
import com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushConsumerConfig;
import com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushConsumerPreConfig;
import com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener;
import com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListenerInfo;
import com.jugg.agile.middleware.rocketmq.context.RocketMqContextChain;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class JaRocketMqPushConsumer {
    public static final String DefaultFlag = "jugg";
    private static final String allTag = "*";
    private static JaFunctionP<RespSpan> throwableHandler = respSpan -> {};

    public static void setThrowableHandler(JaFunctionP<RespSpan> throwableHandler) {
        JaRocketMqPushConsumer.throwableHandler = throwableHandler;
    }

    public static void createByListener(List<JaRocketMqPushListener> listenerList) {
        if (JaCollectionUtil.isEmpty(listenerList)) {
            return;
        }
        JaRocketMqPushConsumer.createByListenerInfo(listenerList.stream().map(JaRocketMqPushConsumer::getJaRocketMqPushListenerInfo).collect(Collectors.toList()));
    }

    public static void createByListenerInfo(List<JaRocketMqPushListenerInfo> listenerInfoList) {
        if (JaCollectionUtil.isEmpty(listenerInfoList)) {
            return;
        }
        HashMap<String, JaRocketMqPushConsumerConfig> consumerMap = new HashMap<String, JaRocketMqPushConsumerConfig>();
        for (JaRocketMqPushListenerInfo info : listenerInfoList) {
            JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig;
            JaRocketMqPushListener listener = info.getListener();
            String groupName = JaRocketMqPushConsumer.getGroupName(listener);
            String tag = listener.tag();
            if (allTag.equals(tag)) {
                jaRocketMqPushConsumerConfig = JaRocketMqPushConsumer.getConfig(info);
                HashMap<String, JaRocketMqPushListenerInfo> listenerInfoMap = new HashMap<String, JaRocketMqPushListenerInfo>();
                listenerInfoMap.put(allTag, info);
                jaRocketMqPushConsumerConfig.setListenerMap(listenerInfoMap);
                JaRocketMqPushConsumer.start(jaRocketMqPushConsumerConfig);
                continue;
            }
            jaRocketMqPushConsumerConfig = (JaRocketMqPushConsumerConfig)consumerMap.get(groupName);
            if (null == jaRocketMqPushConsumerConfig) {
                jaRocketMqPushConsumerConfig = JaRocketMqPushConsumer.getConfig(info);
                HashMap<String, JaRocketMqPushListenerInfo> listenerMap = new HashMap<String, JaRocketMqPushListenerInfo>();
                listenerMap.put(tag, info);
                jaRocketMqPushConsumerConfig.setListenerMap(listenerMap);
                consumerMap.put(groupName, jaRocketMqPushConsumerConfig);
                continue;
            }
            if (jaRocketMqPushConsumerConfig.getListenerMap().containsKey(tag)) {
                throw new RuntimeException(groupName + " duplicate listener tag:" + tag);
            }
            jaRocketMqPushConsumerConfig.getListenerMap().put(tag, info);
        }
        consumerMap.values().forEach(JaRocketMqPushConsumer::start);
    }

    public static void start(JaRocketMqPushConsumerConfig config) {
        JaRocketMqPushConsumerPreConfig preConfig = config.getPreConfig();
        String consumerGroup = config.getConsumerGroup();
        Map<String, JaRocketMqPushListenerInfo> listenerMap = config.getListenerMap();
        JaValidateUtil.notNull(listenerMap, () -> "[%s] create consumer error:listenerMap isEmpty", () -> Collections.singletonList(consumerGroup));
        String tags = listenerMap.keySet().stream().map(Object::toString).collect(Collectors.joining(" || "));
        config.setTags(tags);
        String info = String.format("[consumerGroup:%s] [tags:%s]", consumerGroup, tags);
        config.setInfo(info);
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(config.getConsumerGroup());
        consumer.setNamesrvAddr(config.getNamesrvAddr());
        consumer.setConsumeThreadMin(preConfig.getConsumeThreadMin());
        consumer.setConsumeThreadMax(preConfig.getConsumeThreadMax());
        consumer.setMaxReconsumeTimes(preConfig.getMaxReconsumeTimes());
        JaRocketMqPushConsumer.setConsumeFromWhere(config, consumer);
        JaRocketMqPushConsumer.setConsumeMessageBatchMaxSize(config, consumer);
        consumer.subscribe(config.getTopic(), tags);
        JaRocketMqConsumerType type = preConfig.getType();
        consumer.setMessageModel(JaRocketMqPushConsumer.getMessageModel(type));
        switch (type) {
            case PushClusteringOrderly: {
                consumer.registerMessageListener((messages, context) -> JaRocketMqPushConsumer.consumeMessage(config, messages, new Object[]{messages, context}) ? ConsumeOrderlyStatus.SUCCESS : ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT);
                break;
            }
            case PushClusteringConcurrently: {
                consumer.registerMessageListener((messages, context) -> JaRocketMqPushConsumer.consumeMessage(config, messages, new Object[]{messages, context}) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER);
                break;
            }
            case PushBroadcasting: {
                throw new RuntimeException("create PushBroadcasting PushConsumer error");
            }
            default: {
                throw new RuntimeException("create unknown PushConsumer error");
            }
        }
        consumer.start();
        JaShutdownHookUtil.add((String)info, () -> ((DefaultMQPushConsumer)consumer).shutdown());
        JaLog.info((String)"rocketmq start success : {}", (Object[])new Object[]{info});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static boolean consumeMessage(JaRocketMqPushConsumerConfig config, List<MessageExt> messages, Object context) {
        try {
            MessageExt messageExt = messages.get(0);
            RocketMqContextChain.getInstance().inherit(messageExt);
            String topic = messageExt.getTopic();
            String tag = messageExt.getTags();
            String apiName = String.format("%s - %s", topic, tag);
            if (messages.size() > 1) {
                JaLog.get().error("[{}] consumeMessage messages size > 1", (Object)config.getInfo());
            }
            JaRocketMqPushListenerInfo listenerInfo = JaRocketMqPushConsumer.getJaRocketMqPushListenerInfo(config, tag);
            Type messageBodyType = listenerInfo.getMessageBodyType();
            String messageBodyStr = new String(messageExt.getBody(), StandardCharsets.UTF_8);
            Object[] reqSpanArgs = new Object[]{messageExt.getMsgId(), messageBodyStr};
            boolean bl = (Boolean)JaDapper.dapperT((NodeSpan)NodeSpan.builder().dapperAnnotation(config.getPreConfig().getDapperPolicy()).id(apiName).nodeKind(NodeKind.Constant.RocketMQConsume).build(), (Object[])reqSpanArgs, () -> listenerInfo.getListener().consumeMessage(messageExt.getKeys(), JaRocketMqPushConsumer.isMessageExt(messageBodyType) ? messageExt : JaJson.toObject((String)messageBodyStr, (Type)messageBodyType), new Object[]{messages, context}), throwableHandler);
            return bl;
        }
        catch (Throwable e) {
            JaLog.get().error("{} mq consume error message:{}, context:{}", new Object[]{config.getInfo(), JaJson.toString(messages), JaJson.toString((Object)context), e});
            boolean bl = false;
            return bl;
        }
        finally {
            RocketMqContextChain.getInstance().remove();
        }
    }

    private static boolean isMessageExt(Type messageBodyType) {
        return Objects.equals(messageBodyType, MessageExt.class) || Objects.equals(messageBodyType, Message.class);
    }

    private static JaRocketMqPushListenerInfo getJaRocketMqPushListenerInfo(JaRocketMqPushConsumerConfig config, String consumerTag) {
        if (allTag.equals(config.getTags())) {
            return config.getListenerMap().get(allTag);
        }
        JaRocketMqPushListenerInfo listenerInfo = config.getListenerMap().get(consumerTag);
        if (null == listenerInfo) {
            throw new RuntimeException(String.format("[%s] create consumer error:listenerMap isEmpty", config.getInfo()));
        }
        return listenerInfo;
    }

    private static JaRocketMqPushConsumerConfig getConfig(JaRocketMqPushListenerInfo info) {
        JaRocketMqPushListener listener = info.getListener();
        JaRocketMqPushConsumerPreConfig preConfig = listener.preConfig();
        JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig = new JaRocketMqPushConsumerConfig();
        jaRocketMqPushConsumerConfig.setNamesrvAddr(JaRocketMqPushConsumer.getNamesrvAddr(preConfig.getNamesrvAddrKey()));
        jaRocketMqPushConsumerConfig.setConsumerGroup(JaRocketMqPushConsumer.getGroupName(listener));
        jaRocketMqPushConsumerConfig.setPreConfig(preConfig);
        jaRocketMqPushConsumerConfig.setTopic(listener.topic());
        return jaRocketMqPushConsumerConfig;
    }

    public static String getNamesrvAddr(String namesrvAddrKey) {
        return JaProperty.get((String)("ja.rocketmq." + namesrvAddrKey + ".namesrvAddr"));
    }

    private static String getGroupName(JaRocketMqPushListener listener) {
        String tag;
        String namesrvAddrKey;
        JaRocketMqPushConsumerPreConfig preConfig = listener.preConfig();
        String applicationName = JaSpringProperty.getApplicationName();
        if (JaStringUtil.isEmpty((String)applicationName)) {
            applicationName = DefaultFlag;
        }
        String groupName = String.format("%s-%s-%s", applicationName, listener.topic(), preConfig.getType().getType());
        String groupNamePrefix = preConfig.getGroupNamePrefix();
        if (JaStringUtil.isNotEmpty((String)groupNamePrefix)) {
            groupName = groupNamePrefix + "-" + groupName;
        }
        if (!DefaultFlag.equals(namesrvAddrKey = preConfig.getNamesrvAddrKey())) {
            groupName = namesrvAddrKey + "-" + groupName;
        }
        if (allTag.equals(tag = listener.tag())) {
            groupName = groupName + "-all";
        }
        return groupName;
    }

    private static JaRocketMqPushListenerInfo getJaRocketMqPushListenerInfo(JaRocketMqPushListener listener) {
        Class<?> targetClass = listener.getClass();
        Type genericInterface = JaClassUtil.getGenericInterface(targetClass, JaRocketMqPushListener.class);
        Type[] actualTypeArguments = ((ParameterizedType)genericInterface).getActualTypeArguments();
        return JaRocketMqPushListenerInfo.builder().listener(listener).messageBodyType(actualTypeArguments[0]).build();
    }

    private static void setConsumeMessageBatchMaxSize(JaRocketMqPushConsumerConfig config, DefaultMQPushConsumer consumer) {
        JaRocketMqPushConsumerPreConfig preConfig = config.getPreConfig();
        int consumeMessageBatchMaxSize = preConfig.getConsumeMessageBatchMaxSize();
        if (consumeMessageBatchMaxSize != 1) {
            throw new RuntimeException(String.format("[%s] create consumer error:consumeMessageBatchMaxSize != 1", config.getInfo()));
        }
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
    }

    private static void setConsumeFromWhere(JaRocketMqPushConsumerConfig config, DefaultMQPushConsumer consumer) {
        JaRocketMqPushConsumerPreConfig preConfig = config.getPreConfig();
        switch (preConfig.getConsumeFromWhere()) {
            case 0: {
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                break;
            }
            case 1: {
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                break;
            }
            case 2: {
                if (JaStringUtil.isEmpty((String)preConfig.getConsumeTimestamp())) {
                    throw new RuntimeException(String.format("[%s] create consumer error:consumeTimestamp is null", config.getInfo()));
                }
                consumer.setConsumeTimestamp(preConfig.getConsumeTimestamp());
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
                break;
            }
            default: {
                throw new RuntimeException(String.format("[%s] create consumer error:consumeFromWhere [0-2]", config.getInfo()));
            }
        }
    }

    private static MessageModel getMessageModel(JaRocketMqConsumerType type) {
        if (JaRocketMqConsumerType.PushBroadcasting == type) {
            return MessageModel.BROADCASTING;
        }
        if (JaRocketMqConsumerType.PushClusteringConcurrently == type || JaRocketMqConsumerType.PushClusteringOrderly == type) {
            return MessageModel.CLUSTERING;
        }
        throw new RuntimeException("create PushConsumer getMessageModel error:" + type.toString());
    }

    public static void main(String[] args) {
        JaRocketMqPushConsumerPreConfig preConfig = new JaRocketMqPushConsumerPreConfig();
        JaProperty.getPropertyMap().put("ja.rocketmq.jugg.namesrvAddr", "gcss-rocketmq-dev.rocketmq.oppo.dev:9876");
        ArrayList<JaRocketMqPushListener> jaRocketMqPushListeners = new ArrayList<JaRocketMqPushListener>();
        jaRocketMqPushListeners.add(new JaRocketMqPushListener<MessageExt>(){

            @Override
            public String topic() {
                return "Test-Dev";
            }

            @Override
            public String tag() {
                return "tag";
            }

            @Override
            public boolean consumeMessage(String key, MessageExt body, Object[] args) {
                return true;
            }
        });
        jaRocketMqPushListeners.add(new JaRocketMqPushListener<MessageExt>(){

            @Override
            public String topic() {
                return "Test-Dev";
            }

            @Override
            public String tag() {
                return JaRocketMqPushConsumer.allTag;
            }

            @Override
            public boolean consumeMessage(String key, MessageExt body, Object[] args) {
                return true;
            }
        });
        JaRocketMqPushConsumer.createByListener(jaRocketMqPushListeners);
    }
}

