/*
 * Decompiled with CFR 0.152.
 */
package com.digiwin.app.queue.rabbitmq;

import com.digiwin.app.queue.DWQueueApplicationContextProvider;
import com.digiwin.app.queue.DWQueueProducer;
import com.digiwin.app.queue.rabbitmq.DWRabbitmqChannel;
import com.digiwin.app.queue.rabbitmq.RabbitmqTopicSupporter;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.json.JSONArray;
import org.json.JSONObject;
import org.springframework.context.ApplicationContext;

public abstract class DWRabbitmqProducer
implements DWQueueProducer {
    @Override
    public void onSend(String completeTopic, String message) throws Exception {
        this.onSend(completeTopic, message, null, null, null, null);
    }

    @Override
    public void onSend(String completeTopic, String message, DWQueueProducer.CompleteCallback completeCallback) throws Exception {
        this.onSend(completeTopic, message, completeCallback, null, null, null);
    }

    @Override
    public void onSend(String completeTopic, String message, DWQueueProducer.TimeoutCallback timeoutCallback, Integer timeout, TimeUnit timeoutUnit) throws Exception {
        this.onSend(completeTopic, message, null, timeoutCallback, timeout, timeoutUnit);
    }

    @Override
    public void onSend(final String completeTopic, final String message, final DWQueueProducer.CompleteCallback completeCallback, final DWQueueProducer.TimeoutCallback timeoutCallback, final Integer timeout, final TimeUnit timeoutUnit) throws Exception {
        final ExecuteWrapper executeWrapper = this.executeSend(completeTopic, message, completeCallback);
        Executors.newSingleThreadExecutor().submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                BlockingQueue<Boolean> response = executeWrapper.getResponse();
                Channel channel = executeWrapper.getChannel();
                String consumerTag = executeWrapper.getConsumerTag();
                if (timeoutCallback != null) {
                    Boolean result = response.poll(timeout.longValue(), timeoutUnit);
                    channel.basicCancel(consumerTag);
                    if (result == null) {
                        JSONObject jo = new JSONObject();
                        jo.put("topic", (Object)completeTopic);
                        jo.put("result", false);
                        jo.put("message", (Object)message);
                        jo.put("detail", (Object)new JSONArray());
                        timeoutCallback.onCallback(jo.toString());
                    }
                } else {
                    Boolean result;
                    if (completeCallback != null && (result = response.poll(2L, TimeUnit.HOURS)) == null) {
                        JSONObject jo = new JSONObject();
                        jo.put("topic", (Object)completeTopic);
                        jo.put("result", false);
                        jo.put("message", (Object)message);
                        jo.put("detail", (Object)new JSONArray());
                        completeCallback.onCallback(jo.toString());
                    }
                    channel.basicCancel(consumerTag);
                }
                channel.close();
                return null;
            }
        });
    }

    @Override
    public void onFileSend(String completeTopic, String localPath, String fileName, String extension) throws Exception {
        this.onFileSend(completeTopic, localPath, fileName, extension, null, null, null, null);
    }

    @Override
    public void onFileSend(String completeTopic, String localPath, String fileName, String extension, DWQueueProducer.CompleteCallback completeCallback) throws Exception {
        this.onFileSend(completeTopic, localPath, fileName, extension, completeCallback, null, null, null);
    }

    @Override
    public void onFileSend(String completeTopic, String localPath, String fileName, String extension, DWQueueProducer.TimeoutCallback timeoutCallback, Integer timeout, TimeUnit timeoutUnit) throws Exception {
        this.onFileSend(completeTopic, localPath, fileName, extension, null, timeoutCallback, timeout, timeoutUnit);
    }

    @Override
    public void onFileSend(String completeTopic, String localPath, String fileName, String extension, DWQueueProducer.CompleteCallback completeCallback, DWQueueProducer.TimeoutCallback timeoutCallback, Integer timeout, TimeUnit timeoutUnit) throws Exception {
    }

    private ExecuteWrapper executeSend(String completeTopic, String message, final DWQueueProducer.CompleteCallback completeCallback) throws Exception {
        ApplicationContext context = DWQueueApplicationContextProvider.getApplicationContext();
        DWRabbitmqChannel rabbitmqChannel = (DWRabbitmqChannel)context.getBean(DWRabbitmqChannel.class);
        Channel channel = rabbitmqChannel.getChannel();
        final String correlationId = UUID.randomUUID().toString();
        String replyQueueName = channel.queueDeclare().getQueue();
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo(replyQueueName).build();
        RabbitmqTopicSupporter topicSupporter = new RabbitmqTopicSupporter(completeTopic);
        String exchangeName = topicSupporter.getExchangeName();
        final ArrayBlockingQueue<Boolean> response = new ArrayBlockingQueue<Boolean>(1);
        String consumerTag = channel.basicConsume(replyQueueName, true, (Consumer)new DefaultConsumer(channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(correlationId)) {
                    String message = new String(body, "UTF-8");
                    if (completeCallback != null) {
                        completeCallback.onCallback(message);
                    }
                    response.offer(true);
                }
            }
        });
        channel.basicPublish(exchangeName, completeTopic, props, message.getBytes("UTF-8"));
        ExecuteWrapper executeWrapper = new ExecuteWrapper(response, channel, consumerTag);
        return executeWrapper;
    }

    private class ExecuteWrapper {
        private BlockingQueue<Boolean> response;
        private Channel channel;
        private String consumerTag;

        public ExecuteWrapper(BlockingQueue<Boolean> response, Channel channel, String consumerTag) {
            this.response = response;
            this.channel = channel;
            this.consumerTag = consumerTag;
        }

        public BlockingQueue<Boolean> getResponse() {
            return this.response;
        }

        public Channel getChannel() {
            return this.channel;
        }

        public String getConsumerTag() {
            return this.consumerTag;
        }
    }
}

