/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.app.queue.rabbitmq;

import com.digiwin.app.queue.DWQueueConsumer;
import com.digiwin.app.queue.DWQueueReceiver;
import com.digiwin.app.queue.DWQueueTopicManager;
import com.digiwin.app.queue.rabbitmq.DWRabbitmqChannel;
import com.digiwin.app.queue.rabbitmq.RabbitmqTopicSupporter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;

public class DWRabbitmqConsumer
implements DWQueueConsumer {
    private Map<String, Consumer> consumerHolder = new HashMap<String, Consumer>();
    @Autowired
    private DWQueueTopicManager topicManager;
    @Autowired
    private DWRabbitmqChannel rabbitmqChannel;

    @Override
    public void onCreate(String wildcardTopic) throws Exception {
        RabbitmqTopicSupporter topicSupporter = new RabbitmqTopicSupporter(wildcardTopic);
        Channel channel = this.rabbitmqChannel.getChannel();
        String queueName = topicSupporter.getQueueName();
        String exchangeName = topicSupporter.getExchangeName();
        channel.queueDeclare(queueName, true, false, false, null);
        channel.exchangeDeclare(exchangeName, "topic", true);
        channel.queueBind(queueName, exchangeName, wildcardTopic);
        Consumer consumer = new Consumer(channel);
        channel.basicConsume(queueName, false, (com.rabbitmq.client.Consumer)consumer);
        this.consumerHolder.put(wildcardTopic, consumer);
    }

    @Override
    public void onDestroy(String wildcardTopic) throws Exception {
        Consumer consumer = this.consumerHolder.get(wildcardTopic);
        String consumerTag = consumer.getConsumerTag();
        Channel channel = consumer.getChannel();
        channel.basicCancel(consumerTag);
        this.consumerHolder.remove(wildcardTopic);
    }

    @Override
    public Set<String> getWildcardTopics() throws Exception {
        Set<String> wildcardTopics = this.consumerHolder.keySet();
        return wildcardTopics;
    }

    private class Consumer
    extends DefaultConsumer {
        public Consumer(Channel channel) {
            super(channel);
        }

        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            try {
                final String message = new String(body, "UTF-8");
                String topic = envelope.getRoutingKey();
                List<DWQueueReceiver> receivers = DWRabbitmqConsumer.this.topicManager.getQueueReceivers(topic);
                if (receivers.size() == 0) {
                    super.getChannel().basicRecover();
                    return;
                }
                ArrayBlockingQueue<JSONObject> response = new ArrayBlockingQueue<JSONObject>(receivers.size());
                for (final DWQueueReceiver receiver : receivers) {
                    receiver.setResponse(response);
                    Executors.newSingleThreadExecutor().submit(new Callable<Void>(){

                        @Override
                        public Void call() throws Exception {
                            receiver.execute(message);
                            return null;
                        }
                    });
                }
                while (response.remainingCapacity() != 0) {
                }
                boolean result = true;
                JSONArray detailJa = new JSONArray();
                for (JSONObject jo : response) {
                    detailJa.put((Object)jo);
                    if (jo.getBoolean("result")) continue;
                    result = false;
                }
                JSONObject replyJo = new JSONObject();
                replyJo.put("topic", (Object)topic);
                replyJo.put("result", result);
                replyJo.put("message", (Object)message);
                replyJo.put("detail", (Object)detailJa);
                AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
                super.getChannel().basicPublish("", properties.getReplyTo(), replyProps, replyJo.toString().getBytes("UTF-8"));
                super.getChannel().basicAck(envelope.getDeliveryTag(), false);
                return;
            }
            catch (Exception e) {
                e.printStackTrace();
                return;
            }
        }
    }
}

