package com.jugg.agile.middleware.rocketmq.consumer.push;

import com.jugg.agile.framework.core.config.JaProperty;
import com.jugg.agile.framework.core.config.JaSpringProperty;
import com.jugg.agile.framework.core.dapper.JaDapper;
import com.jugg.agile.framework.core.dapper.log.JaLog;
import com.jugg.agile.framework.core.dapper.meta.NodeKind;
import com.jugg.agile.framework.core.dapper.meta.NodeSpan;
import com.jugg.agile.framework.core.dapper.meta.RespSpan;
import com.jugg.agile.framework.core.meta.JaFunctionP;
import com.jugg.agile.framework.core.util.JaShutdownHookUtil;
import com.jugg.agile.framework.core.util.JaStringUtil;
import com.jugg.agile.framework.core.util.JaValidateUtil;
import com.jugg.agile.framework.core.util.datastructure.JaCollectionUtil;
import com.jugg.agile.framework.core.util.io.serialize.json.JaJson;
import com.jugg.agile.framework.core.util.reflect.clazz.JaClassUtil;
import com.jugg.agile.middleware.rocketmq.consumer.JaRocketMqConsumerType;
import com.jugg.agile.middleware.rocketmq.context.RocketMqContextChain;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

/* loaded from: input_file:com/jugg/agile/middleware/rocketmq/consumer/push/JaRocketMqPushConsumer.class */
public class JaRocketMqPushConsumer {
    public static final String DefaultFlag = "jugg";
    private static final String allTag = "*";
    private static JaFunctionP<RespSpan> throwableHandler = respSpan -> {
    };

    public static void setThrowableHandler(JaFunctionP<RespSpan> jaFunctionP) {
        throwableHandler = jaFunctionP;
    }

    public static void createByListener(List<JaRocketMqPushListener> list) {
        if (JaCollectionUtil.isEmpty(list)) {
            return;
        }
        createByListenerInfo((List) list.stream().map(JaRocketMqPushConsumer::getJaRocketMqPushListenerInfo).collect(Collectors.toList()));
    }

    public static void createByListenerInfo(List<JaRocketMqPushListenerInfo> list) {
        if (JaCollectionUtil.isEmpty(list)) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (JaRocketMqPushListenerInfo jaRocketMqPushListenerInfo : list) {
            JaRocketMqPushListener listener = jaRocketMqPushListenerInfo.getListener();
            String groupName = getGroupName(listener);
            String tag = listener.tag();
            if (allTag.equals(tag)) {
                JaRocketMqPushConsumerConfig config = getConfig(jaRocketMqPushListenerInfo);
                HashMap hashMap2 = new HashMap();
                hashMap2.put(allTag, jaRocketMqPushListenerInfo);
                config.setListenerMap(hashMap2);
                start(config);
            } else {
                JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig = (JaRocketMqPushConsumerConfig) hashMap.get(groupName);
                if (null == jaRocketMqPushConsumerConfig) {
                    JaRocketMqPushConsumerConfig config2 = getConfig(jaRocketMqPushListenerInfo);
                    HashMap hashMap3 = new HashMap();
                    hashMap3.put(tag, jaRocketMqPushListenerInfo);
                    config2.setListenerMap(hashMap3);
                    hashMap.put(groupName, config2);
                } else {
                    if (jaRocketMqPushConsumerConfig.getListenerMap().containsKey(tag)) {
                        throw new RuntimeException(groupName + " duplicate listener tag:" + tag);
                    }
                    jaRocketMqPushConsumerConfig.getListenerMap().put(tag, jaRocketMqPushListenerInfo);
                }
            }
        }
        hashMap.values().forEach(JaRocketMqPushConsumer::start);
    }

    public static void start(JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig) {
        JaRocketMqPushConsumerPreConfig preConfig = jaRocketMqPushConsumerConfig.getPreConfig();
        String consumerGroup = jaRocketMqPushConsumerConfig.getConsumerGroup();
        Map<String, JaRocketMqPushListenerInfo> listenerMap = jaRocketMqPushConsumerConfig.getListenerMap();
        JaValidateUtil.notNull(listenerMap, () -> {
            return "[%s] create consumer error:listenerMap isEmpty";
        }, () -> {
            return Collections.singletonList(consumerGroup);
        });
        String str = (String) listenerMap.keySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(" || "));
        jaRocketMqPushConsumerConfig.setTags(str);
        String format = String.format("[consumerGroup:%s] [tags:%s]", consumerGroup, str);
        jaRocketMqPushConsumerConfig.setInfo(format);
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(jaRocketMqPushConsumerConfig.getConsumerGroup());
        defaultMQPushConsumer.setNamesrvAddr(jaRocketMqPushConsumerConfig.getNamesrvAddr());
        defaultMQPushConsumer.setConsumeThreadMin(preConfig.getConsumeThreadMin());
        defaultMQPushConsumer.setConsumeThreadMax(preConfig.getConsumeThreadMax());
        defaultMQPushConsumer.setMaxReconsumeTimes(preConfig.getMaxReconsumeTimes());
        setConsumeFromWhere(jaRocketMqPushConsumerConfig, defaultMQPushConsumer);
        setConsumeMessageBatchMaxSize(jaRocketMqPushConsumerConfig, defaultMQPushConsumer);
        defaultMQPushConsumer.subscribe(jaRocketMqPushConsumerConfig.getTopic(), str);
        JaRocketMqConsumerType type = preConfig.getType();
        defaultMQPushConsumer.setMessageModel(getMessageModel(type));
        switch (type) {
            case PushClusteringOrderly:
                defaultMQPushConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
                    return consumeMessage(jaRocketMqPushConsumerConfig, list, new Object[]{list, consumeOrderlyContext}) ? ConsumeOrderlyStatus.SUCCESS : ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                });
                break;
            case PushClusteringConcurrently:
                defaultMQPushConsumer.registerMessageListener((list2, consumeConcurrentlyContext) -> {
                    return consumeMessage(jaRocketMqPushConsumerConfig, list2, new Object[]{list2, consumeConcurrentlyContext}) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
                });
                break;
            case PushBroadcasting:
                throw new RuntimeException("create PushBroadcasting PushConsumer error");
            default:
                throw new RuntimeException("create unknown PushConsumer error");
        }
        defaultMQPushConsumer.start();
        defaultMQPushConsumer.getClass();
        JaShutdownHookUtil.add(format, defaultMQPushConsumer::shutdown);
        JaLog.info("rocketmq start success : {}", new Object[]{format});
    }

    private static boolean consumeMessage(JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig, List<MessageExt> list, Object obj) {
        try {
            try {
                MessageExt messageExt = list.get(0);
                RocketMqContextChain.getInstance().inherit(messageExt);
                String topic = messageExt.getTopic();
                String tags = messageExt.getTags();
                String format = String.format("%s - %s", topic, tags);
                if (list.size() > 1) {
                    JaLog.get().error("[{}] consumeMessage messages size > 1", jaRocketMqPushConsumerConfig.getInfo());
                }
                JaRocketMqPushListenerInfo jaRocketMqPushListenerInfo = getJaRocketMqPushListenerInfo(jaRocketMqPushConsumerConfig, tags);
                Type messageBodyType = jaRocketMqPushListenerInfo.getMessageBodyType();
                String str = new String(messageExt.getBody(), StandardCharsets.UTF_8);
                boolean booleanValue = ((Boolean) JaDapper.dapperT(NodeSpan.builder().dapperPolicy(jaRocketMqPushConsumerConfig.getPreConfig().getDapperPolicy()).id(format).nodeKind(NodeKind.Constant.RocketMQConsume).build(), new Object[]{messageExt.getMsgId(), str}, () -> {
                    return Boolean.valueOf(jaRocketMqPushListenerInfo.getListener().consumeMessage(messageExt.getKeys(), isMessageExt(messageBodyType) ? messageExt : JaJson.toObject(str, messageBodyType), new Object[]{list, obj}));
                }, throwableHandler)).booleanValue();
                RocketMqContextChain.getInstance().remove();
                return booleanValue;
            } catch (Throwable th) {
                JaLog.get().error("{} mq consume error message:{}, context:{}", new Object[]{jaRocketMqPushConsumerConfig.getInfo(), JaJson.toString(list), JaJson.toString(obj), th});
                RocketMqContextChain.getInstance().remove();
                return false;
            }
        } catch (Throwable th2) {
            RocketMqContextChain.getInstance().remove();
            throw th2;
        }
    }

    private static boolean isMessageExt(Type type) {
        return Objects.equals(type, MessageExt.class) || Objects.equals(type, Message.class);
    }

    private static JaRocketMqPushListenerInfo getJaRocketMqPushListenerInfo(JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig, String str) {
        if (allTag.equals(jaRocketMqPushConsumerConfig.getTags())) {
            return jaRocketMqPushConsumerConfig.getListenerMap().get(allTag);
        }
        JaRocketMqPushListenerInfo jaRocketMqPushListenerInfo = jaRocketMqPushConsumerConfig.getListenerMap().get(str);
        if (null == jaRocketMqPushListenerInfo) {
            throw new RuntimeException(String.format("[%s] create consumer error:listenerMap isEmpty", jaRocketMqPushConsumerConfig.getInfo()));
        }
        return jaRocketMqPushListenerInfo;
    }

    private static JaRocketMqPushConsumerConfig getConfig(JaRocketMqPushListenerInfo jaRocketMqPushListenerInfo) {
        JaRocketMqPushListener listener = jaRocketMqPushListenerInfo.getListener();
        JaRocketMqPushConsumerPreConfig preConfig = listener.preConfig();
        JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig = new JaRocketMqPushConsumerConfig();
        jaRocketMqPushConsumerConfig.setNamesrvAddr(getNamesrvAddr(preConfig.getNamesrvAddrKey()));
        jaRocketMqPushConsumerConfig.setConsumerGroup(getGroupName(listener));
        jaRocketMqPushConsumerConfig.setPreConfig(preConfig);
        jaRocketMqPushConsumerConfig.setTopic(listener.topic());
        return jaRocketMqPushConsumerConfig;
    }

    public static String getNamesrvAddr(String str) {
        return JaProperty.get("ja.rocketmq." + str + ".namesrvAddr");
    }

    private static String getGroupName(JaRocketMqPushListener jaRocketMqPushListener) {
        JaRocketMqPushConsumerPreConfig preConfig = jaRocketMqPushListener.preConfig();
        String applicationName = JaSpringProperty.getApplicationName();
        if (JaStringUtil.isEmpty(applicationName)) {
            applicationName = DefaultFlag;
        }
        String format = String.format("%s-%s-%s", applicationName, jaRocketMqPushListener.topic(), Integer.valueOf(preConfig.getType().getType()));
        String groupNamePrefix = preConfig.getGroupNamePrefix();
        if (JaStringUtil.isNotEmpty(groupNamePrefix)) {
            format = groupNamePrefix + "-" + format;
        }
        String namesrvAddrKey = preConfig.getNamesrvAddrKey();
        if (!DefaultFlag.equals(namesrvAddrKey)) {
            format = namesrvAddrKey + "-" + format;
        }
        if (allTag.equals(jaRocketMqPushListener.tag())) {
            format = format + "-all";
        }
        return format;
    }

    private static JaRocketMqPushListenerInfo getJaRocketMqPushListenerInfo(JaRocketMqPushListener jaRocketMqPushListener) {
        return JaRocketMqPushListenerInfo.builder().listener(jaRocketMqPushListener).messageBodyType(((ParameterizedType) JaClassUtil.getGenericInterface(jaRocketMqPushListener.getClass(), JaRocketMqPushListener.class)).getActualTypeArguments()[0]).build();
    }

    private static void setConsumeMessageBatchMaxSize(JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig, DefaultMQPushConsumer defaultMQPushConsumer) {
        int consumeMessageBatchMaxSize = jaRocketMqPushConsumerConfig.getPreConfig().getConsumeMessageBatchMaxSize();
        if (consumeMessageBatchMaxSize != 1) {
            throw new RuntimeException(String.format("[%s] create consumer error:consumeMessageBatchMaxSize != 1", jaRocketMqPushConsumerConfig.getInfo()));
        }
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
    }

    private static void setConsumeFromWhere(JaRocketMqPushConsumerConfig jaRocketMqPushConsumerConfig, DefaultMQPushConsumer defaultMQPushConsumer) {
        JaRocketMqPushConsumerPreConfig preConfig = jaRocketMqPushConsumerConfig.getPreConfig();
        switch (preConfig.getConsumeFromWhere()) {
            case 0:
                defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                return;
            case 1:
                defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                return;
            case 2:
                if (JaStringUtil.isEmpty(preConfig.getConsumeTimestamp())) {
                    throw new RuntimeException(String.format("[%s] create consumer error:consumeTimestamp is null", jaRocketMqPushConsumerConfig.getInfo()));
                }
                defaultMQPushConsumer.setConsumeTimestamp(preConfig.getConsumeTimestamp());
                defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
                return;
            default:
                throw new RuntimeException(String.format("[%s] create consumer error:consumeFromWhere [0-2]", jaRocketMqPushConsumerConfig.getInfo()));
        }
    }

    private static MessageModel getMessageModel(JaRocketMqConsumerType jaRocketMqConsumerType) {
        if (JaRocketMqConsumerType.PushBroadcasting == jaRocketMqConsumerType) {
            return MessageModel.BROADCASTING;
        }
        if (JaRocketMqConsumerType.PushClusteringConcurrently == jaRocketMqConsumerType || JaRocketMqConsumerType.PushClusteringOrderly == jaRocketMqConsumerType) {
            return MessageModel.CLUSTERING;
        }
        throw new RuntimeException("create PushConsumer getMessageModel error:" + jaRocketMqConsumerType.toString());
    }

    public static void main(String[] strArr) {
        new JaRocketMqPushConsumerPreConfig();
        JaProperty.getPropertyMap().put("ja.rocketmq.jugg.namesrvAddr", "gcss-rocketmq-dev.rocketmq.oppo.dev:9876");
        ArrayList arrayList = new ArrayList();
        arrayList.add(new JaRocketMqPushListener<MessageExt>() { // from class: com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushConsumer.1
            @Override // com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener
            public String topic() {
                return "Test-Dev";
            }

            @Override // com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener
            public String tag() {
                return "tag";
            }

            @Override // com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener
            public boolean consumeMessage(String str, MessageExt messageExt, Object[] objArr) {
                return true;
            }
        });
        arrayList.add(new JaRocketMqPushListener<MessageExt>() { // from class: com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushConsumer.2
            @Override // com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener
            public String topic() {
                return "Test-Dev";
            }

            @Override // com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener
            public String tag() {
                return JaRocketMqPushConsumer.allTag;
            }

            @Override // com.jugg.agile.middleware.rocketmq.consumer.push.JaRocketMqPushListener
            public boolean consumeMessage(String str, MessageExt messageExt, Object[] objArr) {
                return true;
            }
        });
        createByListener(arrayList);
    }
}
