/*
 * Decompiled with CFR 0.152.
 */
package com.primeton.pmq.store.jdbc;

import com.primeton.pmq.broker.ConnectionContext;
import com.primeton.pmq.command.Message;
import com.primeton.pmq.command.MessageAck;
import com.primeton.pmq.command.MessageId;
import com.primeton.pmq.command.PMQDestination;
import com.primeton.pmq.command.TransactionId;
import com.primeton.pmq.command.XATransactionId;
import com.primeton.pmq.store.IndexListener;
import com.primeton.pmq.store.MessageStore;
import com.primeton.pmq.store.ProxyMessageStore;
import com.primeton.pmq.store.ProxyTopicMessageStore;
import com.primeton.pmq.store.TopicMessageStore;
import com.primeton.pmq.store.TransactionRecoveryListener;
import com.primeton.pmq.store.jdbc.JDBCMessageStore;
import com.primeton.pmq.store.jdbc.JDBCPersistenceAdapter;
import com.primeton.pmq.store.jdbc.JDBCTopicMessageStore;
import com.primeton.pmq.store.jdbc.TransactionContext;
import com.primeton.pmq.store.memory.MemoryTransactionStore;
import com.primeton.pmq.util.ByteSequence;
import com.primeton.pmq.util.DataByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;

public class JdbcMemoryTransactionStore
extends MemoryTransactionStore {
    private HashMap<PMQDestination, MessageStore> topicStores = new HashMap();
    private HashMap<PMQDestination, MessageStore> queueStores = new HashMap();

    public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
        super(jdbcPersistenceAdapter);
    }

    @Override
    public void prepare(TransactionId txid) throws IOException {
        MemoryTransactionStore.Tx tx = (MemoryTransactionStore.Tx)this.inflightTransactions.remove(txid);
        if (tx == null) {
            return;
        }
        ConnectionContext ctx = new ConnectionContext();
        ctx.setXid((XATransactionId)txid);
        this.persistenceAdapter.beginTransaction(ctx);
        try {
            for (MemoryTransactionStore.AddMessageCommand addMessageCommand : tx.messages) {
                addMessageCommand.run(ctx);
            }
            for (MemoryTransactionStore.RemoveMessageCommand removeMessageCommand : tx.acks) {
                removeMessageCommand.run(ctx);
            }
        }
        catch (IOException e) {
            this.persistenceAdapter.rollbackTransaction(ctx);
            throw e;
        }
        this.persistenceAdapter.commitTransaction(ctx);
        ctx.setXid(null);
        ArrayList<CommitAddOutcome> updateFromPreparedStateCommands = new ArrayList<CommitAddOutcome>();
        for (MemoryTransactionStore.AddMessageCommand addMessageCommand : tx.messages) {
            updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand));
        }
        tx.messages = updateFromPreparedStateCommands;
        this.preparedTransactions.put(txid, tx);
    }

    @Override
    public void rollback(TransactionId txid) throws IOException {
        MemoryTransactionStore.Tx tx = (MemoryTransactionStore.Tx)this.inflightTransactions.remove(txid);
        if (tx == null && (tx = (MemoryTransactionStore.Tx)this.preparedTransactions.remove(txid)) != null) {
            ConnectionContext ctx = new ConnectionContext();
            this.persistenceAdapter.beginTransaction(ctx);
            try {
                Iterator<Object> iter = tx.messages.iterator();
                while (iter.hasNext()) {
                    Message message = iter.next().getMessage();
                    ((JDBCPersistenceAdapter)this.persistenceAdapter).commitRemove(ctx, new MessageAck(message, 2, 1));
                }
                for (MemoryTransactionStore.RemoveMessageCommand removeMessageCommand : tx.acks) {
                    if (removeMessageCommand instanceof LastAckCommand) {
                        ((LastAckCommand)removeMessageCommand).rollback(ctx);
                        continue;
                    }
                    MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
                    ((JDBCPersistenceAdapter)this.persistenceAdapter).commitAdd(ctx, messageId, (Long)messageId.getEntryLocator());
                }
            }
            catch (IOException e) {
                this.persistenceAdapter.rollbackTransaction(ctx);
                throw e;
            }
            this.persistenceAdapter.commitTransaction(ctx);
        }
    }

    @Override
    public void recover(TransactionRecoveryListener listener) throws IOException {
        ((JDBCPersistenceAdapter)this.persistenceAdapter).recover(this);
        super.recover(listener);
    }

    public void recoverAdd(long id, byte[] messageBytes) throws IOException {
        Message message = (Message)((JDBCPersistenceAdapter)this.persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
        message.getMessageId().setFutureOrSequenceLong(id);
        message.getMessageId().setEntryLocator(id);
        MemoryTransactionStore.Tx tx = this.getPreparedTx(message.getTransactionId());
        tx.add(new CommitAddOutcome(null, message));
    }

    public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
        Message msg = (Message)((JDBCPersistenceAdapter)this.persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
        msg.getMessageId().setFutureOrSequenceLong(id);
        msg.getMessageId().setEntryLocator(id);
        MemoryTransactionStore.Tx tx = this.getPreparedTx(new XATransactionId(xid));
        final MessageAck ack = new MessageAck(msg, 2, 1);
        tx.add(new MemoryTransactionStore.RemoveMessageCommand(){

            @Override
            public MessageAck getMessageAck() {
                return ack;
            }

            @Override
            public void run(ConnectionContext context) throws IOException {
                ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).commitRemove(context, ack);
            }

            @Override
            public MessageStore getMessageStore() {
                return null;
            }
        });
    }

    public void recoverLastAck(byte[] encodedXid, final PMQDestination destination, final String subName, final String clientId) throws IOException {
        MemoryTransactionStore.Tx tx = this.getPreparedTx(new XATransactionId(encodedXid));
        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
        inputStream.skipBytes(1);
        final long lastAck = inputStream.readLong();
        final byte priority = inputStream.readByte();
        final MessageAck ack = new MessageAck();
        ack.setDestination(destination);
        tx.add(new LastAckCommand(){
            JDBCTopicMessageStore jdbcTopicMessageStore;

            @Override
            public MessageAck getMessageAck() {
                return ack;
            }

            @Override
            public MessageStore getMessageStore() {
                return this.jdbcTopicMessageStore;
            }

            @Override
            public void run(ConnectionContext context) throws IOException {
                ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
                this.jdbcTopicMessageStore.complete(clientId, subName);
            }

            @Override
            public void rollback(ConnectionContext context) throws IOException {
                ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).rollbackLastAck(context, priority, this.jdbcTopicMessageStore.getDestination(), subName, clientId);
                this.jdbcTopicMessageStore.complete(clientId, subName);
            }

            @Override
            public String getClientId() {
                return clientId;
            }

            @Override
            public String getSubName() {
                return subName;
            }

            @Override
            public long getSequence() {
                return lastAck;
            }

            @Override
            public byte getPriority() {
                return priority;
            }

            @Override
            public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
                this.jdbcTopicMessageStore = jdbcTopicMessageStore;
            }
        });
    }

    @Override
    protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) {
        this.topicStores.put(proxyTopicMessageStore.getDestination(), proxyTopicMessageStore.getDelegate());
    }

    @Override
    protected void onProxyQueueStore(ProxyMessageStore proxyQueueMessageStore) {
        this.queueStores.put(proxyQueueMessageStore.getDestination(), proxyQueueMessageStore.getDelegate());
    }

    @Override
    protected void onRecovered(MemoryTransactionStore.Tx tx) {
        for (MemoryTransactionStore.RemoveMessageCommand removeMessageCommand : tx.acks) {
            if (removeMessageCommand instanceof LastAckCommand) {
                LastAckCommand lastAckCommand = (LastAckCommand)removeMessageCommand;
                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)this.topicStores.get(lastAckCommand.getMessageAck().getDestination());
                jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
                lastAckCommand.setMessageStore(jdbcTopicMessageStore);
                continue;
            }
            ((JDBCPersistenceAdapter)this.persistenceAdapter).getBrokerService().getRegionBroker().getDestinationMap().get(removeMessageCommand.getMessageAck().getDestination()).getDestinationStatistics().getMessages().increment();
        }
        Iterator<Object> iterator = tx.messages.iterator();
        while (iterator.hasNext()) {
            MemoryTransactionStore.AddMessageCommand addMessageCommand;
            PMQDestination destination = (addMessageCommand = (MemoryTransactionStore.AddMessageCommand)iterator.next()).getMessage().getDestination();
            addMessageCommand.setMessageStore(destination.isQueue() ? this.queueStores.get(destination) : this.topicStores.get(destination));
        }
    }

    @Override
    public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName, final MessageId messageId, final MessageAck ack) throws IOException {
        if (ack.isInTransaction()) {
            MemoryTransactionStore.Tx tx = this.getTx(ack.getTransactionId());
            tx.add(new LastAckCommand(){

                @Override
                public MessageAck getMessageAck() {
                    return ack;
                }

                @Override
                public void run(ConnectionContext ctx) throws IOException {
                    topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
                }

                @Override
                public MessageStore getMessageStore() {
                    return topicMessageStore;
                }

                @Override
                public void rollback(ConnectionContext context) throws IOException {
                    JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
                    ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).rollbackLastAck(context, jdbcTopicMessageStore, ack, subscriptionName, clientId);
                    jdbcTopicMessageStore.complete(clientId, subscriptionName);
                }

                @Override
                public String getClientId() {
                    return clientId;
                }

                @Override
                public String getSubName() {
                    return subscriptionName;
                }

                @Override
                public long getSequence() {
                    throw new IllegalStateException("Sequence id must be inferred from ack");
                }

                @Override
                public byte getPriority() {
                    throw new IllegalStateException("Priority must be inferred from ack or row");
                }

                @Override
                public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
                    throw new IllegalStateException("message store already known!");
                }
            });
        } else {
            topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
        }
    }

    static interface LastAckCommand
    extends MemoryTransactionStore.RemoveMessageCommand {
        public void rollback(ConnectionContext var1) throws IOException;

        public String getClientId();

        public String getSubName();

        public long getSequence();

        public byte getPriority();

        public void setMessageStore(JDBCTopicMessageStore var1);
    }

    class CommitAddOutcome
    implements MemoryTransactionStore.AddMessageCommand {
        final Message message;
        JDBCMessageStore jdbcMessageStore;

        public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) {
            this.jdbcMessageStore = jdbcMessageStore;
            this.message = message;
        }

        public CommitAddOutcome(MemoryTransactionStore.AddMessageCommand addMessageCommand) {
            this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage());
        }

        @Override
        public Message getMessage() {
            return this.message;
        }

        @Override
        public MessageStore getMessageStore() {
            return this.jdbcMessageStore;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run(ConnectionContext context) throws IOException {
            JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter;
            Long preparedEntrySequence = (Long)this.message.getMessageId().getEntryLocator();
            TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
            LinkedList<Long> linkedList = this.jdbcMessageStore.pendingAdditions;
            synchronized (linkedList) {
                this.message.getMessageId().setEntryLocator(jdbcPersistenceAdapter.getNextSequenceId());
                c.onCompletion(new Runnable(){

                    @Override
                    public void run() {
                        CommitAddOutcome.this.message.getMessageId().setFutureOrSequenceLong(CommitAddOutcome.this.message.getMessageId().getEntryLocator());
                    }
                });
                if (this.jdbcMessageStore.getIndexListener() != null) {
                    this.jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, this.message, null));
                }
            }
            jdbcPersistenceAdapter.commitAdd(context, this.message.getMessageId(), preparedEntrySequence);
            this.jdbcMessageStore.onAdd(this.message, (Long)this.message.getMessageId().getEntryLocator(), this.message.getPriority());
        }

        @Override
        public void setMessageStore(MessageStore messageStore) {
            this.jdbcMessageStore = (JDBCMessageStore)messageStore;
        }
    }
}

