/*
 * Decompiled with CFR 0.152.
 */
package com.primeton.pmq.transport.amqp.protocol;

import com.primeton.pmq.command.ConsumerId;
import com.primeton.pmq.command.ConsumerInfo;
import com.primeton.pmq.command.ExceptionResponse;
import com.primeton.pmq.command.LocalTransactionId;
import com.primeton.pmq.command.PMQDestination;
import com.primeton.pmq.command.PMQTempDestination;
import com.primeton.pmq.command.ProducerId;
import com.primeton.pmq.command.ProducerInfo;
import com.primeton.pmq.command.RemoveInfo;
import com.primeton.pmq.command.Response;
import com.primeton.pmq.command.SessionId;
import com.primeton.pmq.command.SessionInfo;
import com.primeton.pmq.command.TransactionId;
import com.primeton.pmq.selector.SelectorParser;
import com.primeton.pmq.transport.amqp.AmqpProtocolConverter;
import com.primeton.pmq.transport.amqp.AmqpProtocolException;
import com.primeton.pmq.transport.amqp.AmqpSupport;
import com.primeton.pmq.transport.amqp.ResponseHandler;
import com.primeton.pmq.transport.amqp.protocol.AmqpConnection;
import com.primeton.pmq.transport.amqp.protocol.AmqpJmsSelectorFilter;
import com.primeton.pmq.transport.amqp.protocol.AmqpNoLocalFilter;
import com.primeton.pmq.transport.amqp.protocol.AmqpReceiver;
import com.primeton.pmq.transport.amqp.protocol.AmqpResource;
import com.primeton.pmq.transport.amqp.protocol.AmqpSender;
import com.primeton.pmq.transport.amqp.protocol.AmqpTransactionCoordinator;
import com.primeton.pmq.util.IntrospectionSupport;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.InvalidSelectorException;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpSession
implements AmqpResource {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
    private final Map<ConsumerId, AmqpSender> consumers = new HashMap<ConsumerId, AmqpSender>();
    private final AmqpConnection connection;
    private final Session protonSession;
    private final SessionId sessionId;
    private boolean enlisted;
    private long nextProducerId = 0L;
    private long nextConsumerId = 0L;

    public AmqpSession(AmqpConnection connection, SessionId sessionId, Session session) {
        this.connection = connection;
        this.sessionId = sessionId;
        this.protonSession = session;
    }

    @Override
    public void open() {
        LOG.debug("Session {} opened", (Object)this.getSessionId());
        this.getEndpoint().setContext((Object)this);
        this.getEndpoint().setIncomingCapacity(Integer.MAX_VALUE);
        this.getEndpoint().open();
        this.connection.sendToPMQ(new SessionInfo(this.getSessionId()));
    }

    @Override
    public void close() {
        LOG.debug("Session {} closed", (Object)this.getSessionId());
        this.connection.sendToPMQ(new RemoveInfo(this.getSessionId()), new ResponseHandler(){

            @Override
            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
                AmqpSession.this.getEndpoint().setContext(null);
                AmqpSession.this.getEndpoint().close();
                AmqpSession.this.getEndpoint().free();
            }
        });
    }

    public void commit(LocalTransactionId txId) throws Exception {
        for (AmqpSender consumer : this.consumers.values()) {
            consumer.commit(txId);
        }
        this.enlisted = false;
    }

    public void rollback(LocalTransactionId txId) throws Exception {
        for (AmqpSender consumer : this.consumers.values()) {
            consumer.rollback(txId);
        }
        this.enlisted = false;
    }

    public void flushPendingMessages() throws Exception {
        for (AmqpSender consumer : this.consumers.values()) {
            consumer.pumpOutbound();
        }
    }

    public void createCoordinator(Receiver protonReceiver) throws Exception {
        AmqpTransactionCoordinator txCoordinator = new AmqpTransactionCoordinator(this, protonReceiver);
        txCoordinator.flow(this.connection.getConfiguredReceiverCredit());
        txCoordinator.open();
    }

    public void createReceiver(Receiver protonReceiver) throws Exception {
        org.apache.qpid.proton.amqp.transport.Target remoteTarget = protonReceiver.getRemoteTarget();
        ProducerInfo producerInfo = new ProducerInfo(this.getNextProducerId());
        final AmqpReceiver receiver = new AmqpReceiver(this, protonReceiver, producerInfo);
        LOG.debug("opening new receiver {} on link: {}", (Object)producerInfo.getProducerId(), (Object)protonReceiver.getName());
        try {
            List<Symbol> list;
            String connectionId;
            Target target = (Target)remoteTarget;
            PMQDestination destination = null;
            String targetNodeName = target.getAddress();
            if (target.getDynamic()) {
                destination = this.connection.createTemporaryDestination((Link)protonReceiver, target.getCapabilities());
                HashMap<Symbol, DeleteOnClose> dynamicNodeProperties = new HashMap<Symbol, DeleteOnClose>();
                dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
                Target actualTarget = new Target();
                actualTarget.setAddress(destination.getQualifiedName());
                actualTarget.setCapabilities(new Symbol[]{AmqpSupport.getDestinationTypeSymbol(destination)});
                actualTarget.setDynamic(true);
                actualTarget.setDynamicNodeProperties(dynamicNodeProperties);
                protonReceiver.setTarget((org.apache.qpid.proton.amqp.transport.Target)actualTarget);
                receiver.addCloseAction(new Runnable(){

                    @Override
                    public void run() {
                        AmqpSession.this.connection.deleteTemporaryDestination((PMQTempDestination)receiver.getDestination());
                    }
                });
            } else if (targetNodeName != null && !targetNodeName.isEmpty() && (destination = AmqpSupport.createDestination(remoteTarget)).isTemporary() && (connectionId = ((PMQTempDestination)destination).getConnectionId()) == null) {
                throw new AmqpProtocolException(AmqpError.PRECONDITION_FAILED.toString(), "Not a broker created temp destination");
            }
            Symbol[] remoteDesiredCapabilities = protonReceiver.getRemoteDesiredCapabilities();
            if (remoteDesiredCapabilities != null && (list = Arrays.asList(remoteDesiredCapabilities)).contains(AmqpSupport.DELAYED_DELIVERY)) {
                protonReceiver.setOfferedCapabilities(new Symbol[]{AmqpSupport.DELAYED_DELIVERY});
            }
            receiver.setDestination(destination);
            this.connection.sendToPMQ(producerInfo, new ResponseHandler(){

                @Override
                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
                    if (response.isException()) {
                        ErrorCondition error = null;
                        Throwable exception = ((ExceptionResponse)response).getException();
                        error = exception instanceof SecurityException ? new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()) : new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage());
                        receiver.close(error);
                    } else {
                        receiver.flow(AmqpSession.this.connection.getConfiguredReceiverCredit());
                        receiver.open();
                    }
                    AmqpSession.this.pumpProtonToSocket();
                }
            });
        }
        catch (AmqpProtocolException exception) {
            receiver.close(new ErrorCondition(Symbol.getSymbol((String)exception.getSymbolicName()), exception.getMessage()));
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void createSender(Sender protonSender) throws Exception {
        Source source = (Source)protonSender.getRemoteSource();
        ConsumerInfo consumerInfo = new ConsumerInfo(this.getNextConsumerId());
        final AmqpSender sender = new AmqpSender(this, protonSender, consumerInfo);
        LOG.debug("opening new sender {} on link: {}", (Object)consumerInfo.getConsumerId(), (Object)protonSender.getName());
        try {
            PMQDestination destination;
            HashMap<Symbol, DescribedType> supportedFilters = new HashMap<Symbol, DescribedType>();
            protonSender.setContext((Object)sender);
            boolean noLocal = false;
            String selector = null;
            if (source != null) {
                Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.JMS_SELECTOR_FILTER_IDS);
                if (filter != null) {
                    selector = filter.getValue().getDescribed().toString();
                    try {
                        SelectorParser.parse(selector);
                    }
                    catch (InvalidSelectorException e) {
                        sender.close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
                        return;
                    }
                    supportedFilters.put(filter.getKey(), filter.getValue());
                }
                if ((filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS)) != null) {
                    noLocal = true;
                    supportedFilters.put(filter.getKey(), filter.getValue());
                }
            }
            if (source == null) {
                ConsumerInfo storedInfo = this.connection.lookupSubscription(protonSender.getName());
                if (storedInfo == null) {
                    sender.close(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + protonSender.getName()));
                    return;
                }
                destination = storedInfo.getDestination();
                source = new Source();
                source.setAddress(destination.getQualifiedName());
                source.setDurable(TerminusDurability.UNSETTLED_STATE);
                source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
                source.setDistributionMode(AmqpSupport.COPY);
                if (storedInfo.isNoLocal()) {
                    supportedFilters.put(AmqpSupport.NO_LOCAL_NAME, AmqpNoLocalFilter.NO_LOCAL);
                }
                if (storedInfo.getSelector() != null && !storedInfo.getSelector().trim().equals("")) {
                    supportedFilters.put(AmqpSupport.JMS_SELECTOR_NAME, new AmqpJmsSelectorFilter(storedInfo.getSelector()));
                }
            } else if (source.getDynamic()) {
                destination = this.connection.createTemporaryDestination((Link)protonSender, source.getCapabilities());
                HashMap<Symbol, DeleteOnClose> dynamicNodeProperties = new HashMap<Symbol, DeleteOnClose>();
                dynamicNodeProperties.put(AmqpSupport.LIFETIME_POLICY, DeleteOnClose.getInstance());
                source = new Source();
                source.setAddress(destination.getQualifiedName());
                source.setCapabilities(new Symbol[]{AmqpSupport.getDestinationTypeSymbol(destination)});
                source.setDynamic(true);
                source.setDynamicNodeProperties(dynamicNodeProperties);
                sender.addCloseAction(new Runnable(){

                    @Override
                    public void run() {
                        AmqpSession.this.connection.deleteTemporaryDestination((PMQTempDestination)sender.getDestination());
                    }
                });
            } else {
                String connectionId;
                destination = AmqpSupport.createDestination(source);
                if (destination.isTemporary() && (connectionId = ((PMQTempDestination)destination).getConnectionId()) == null) {
                    throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), "Not a broker created temp destination");
                }
            }
            source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
            protonSender.setSource((org.apache.qpid.proton.amqp.transport.Source)source);
            int senderCredit = protonSender.getRemoteCredit();
            if (destination.getOptions() != null) {
                Map<String, Object> options = IntrospectionSupport.extractProperties(new HashMap<String, String>(destination.getOptions()), "consumer.");
                IntrospectionSupport.setProperties(consumerInfo, options);
                if (options.size() > 0) {
                    String msg = "There are " + options.size() + " consumer options that couldn't be set on the consumer. Check the options are spelled correctly. Unknown parameters=[" + options + "]. This consumer cannot be started.";
                    LOG.warn(msg);
                    throw new AmqpProtocolException(AmqpError.INVALID_FIELD.toString(), msg);
                }
            }
            consumerInfo.setSelector(selector);
            consumerInfo.setNoRangeAcks(true);
            consumerInfo.setDestination(destination);
            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
            consumerInfo.setDispatchAsync(true);
            consumerInfo.setNoLocal(noLocal);
            if (source.getDistributionMode() == AmqpSupport.COPY && destination.isQueue()) {
                consumerInfo.setBrowser(true);
            }
            if ((TerminusDurability.UNSETTLED_STATE.equals((Object)source.getDurable()) || TerminusDurability.CONFIGURATION.equals((Object)source.getDurable())) && destination.isTopic()) {
                consumerInfo.setSubscriptionName(protonSender.getName());
            }
            this.connection.sendToPMQ(consumerInfo, new ResponseHandler(){

                @Override
                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
                    if (response.isException()) {
                        ErrorCondition error = null;
                        Throwable exception = ((ExceptionResponse)response).getException();
                        error = exception instanceof SecurityException ? new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()) : (exception instanceof InvalidSelectorException ? new ErrorCondition(AmqpError.INVALID_FIELD, exception.getMessage()) : new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
                        sender.close(error);
                    } else {
                        sender.open();
                    }
                    AmqpSession.this.pumpProtonToSocket();
                }
            });
            return;
        }
        catch (AmqpProtocolException e) {
            sender.close(new ErrorCondition(Symbol.getSymbol((String)e.getSymbolicName()), e.getMessage()));
        }
    }

    public void pumpProtonToSocket() {
        this.connection.pumpProtonToSocket();
    }

    public void registerSender(ConsumerId consumerId, AmqpSender sender) {
        this.consumers.put(consumerId, sender);
        this.connection.registerSender(consumerId, sender);
    }

    public void unregisterSender(ConsumerId consumerId) {
        this.consumers.remove(consumerId);
        this.connection.unregisterSender(consumerId);
    }

    public void enlist(TransactionId txId) {
        if (!this.enlisted) {
            this.connection.getTxCoordinator(txId).enlist(this);
            this.enlisted = true;
        }
    }

    public AmqpConnection getConnection() {
        return this.connection;
    }

    public SessionId getSessionId() {
        return this.sessionId;
    }

    public Session getEndpoint() {
        return this.protonSession;
    }

    public long getMaxFrameSize() {
        return this.connection.getMaxFrameSize();
    }

    private ConsumerId getNextConsumerId() {
        return new ConsumerId(this.sessionId, this.nextConsumerId++);
    }

    private ProducerId getNextProducerId() {
        return new ProducerId(this.sessionId, this.nextProducerId++);
    }
}

