/*
 * Decompiled with CFR 0.152.
 */
package com.primeton.pmq.store.jdbc;

import com.primeton.pmq.PMQMessageAudit;
import com.primeton.pmq.broker.ConnectionContext;
import com.primeton.pmq.command.Message;
import com.primeton.pmq.command.MessageAck;
import com.primeton.pmq.command.MessageId;
import com.primeton.pmq.command.PMQDestination;
import com.primeton.pmq.command.XATransactionId;
import com.primeton.pmq.store.AbstractMessageStore;
import com.primeton.pmq.store.IndexListener;
import com.primeton.pmq.store.MessageRecoveryListener;
import com.primeton.pmq.store.jdbc.JDBCAdapter;
import com.primeton.pmq.store.jdbc.JDBCMessageRecoveryListener;
import com.primeton.pmq.store.jdbc.JDBCPersistenceAdapter;
import com.primeton.pmq.store.jdbc.TransactionContext;
import com.primeton.pmq.util.ByteSequence;
import com.primeton.pmq.util.ByteSequenceData;
import com.primeton.pmq.util.IOExceptionSupport;
import com.primeton.pmq.wireformat.WireFormat;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.LinkedList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCMessageStore
extends AbstractMessageStore {
    private static final Logger LOG = LoggerFactory.getLogger(JDBCMessageStore.class);
    protected final WireFormat wireFormat;
    protected final JDBCAdapter adapter;
    protected final JDBCPersistenceAdapter persistenceAdapter;
    protected PMQMessageAudit audit;
    protected final LinkedList<Long> pendingAdditions = new LinkedList();
    final long[] perPriorityLastRecovered = new long[10];

    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, PMQDestination destination, PMQMessageAudit audit) throws IOException {
        super(destination);
        this.persistenceAdapter = persistenceAdapter;
        this.adapter = adapter;
        this.wireFormat = wireFormat;
        this.audit = audit;
        if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
            this.recordDestinationCreation(destination);
        }
        this.resetBatching();
    }

    private void recordDestinationCreation(PMQDestination destination) throws IOException {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            if (this.adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0L) {
                this.adapter.doRecordDestination(c, destination);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void addMessage(ConnectionContext context, final Message message) throws IOException {
        long sequenceId;
        byte[] data;
        MessageId messageId = message.getMessageId();
        if (this.audit != null && this.audit.isDuplicate(message)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(this.destination.getPhysicalName() + " ignoring duplicated (add) message, already stored: " + messageId);
            }
            return;
        }
        XATransactionId xaXid = context != null ? context.getXid() : null;
        try {
            ByteSequence packet = this.wireFormat.marshal(message);
            data = ByteSequenceData.toByteArray(packet);
        }
        catch (IOException e) {
            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
        }
        TransactionContext c = this.persistenceAdapter.getTransactionContext(context);
        LinkedList<Long> linkedList = this.pendingAdditions;
        synchronized (linkedList) {
            final long sequence = sequenceId = this.persistenceAdapter.getNextSequenceId();
            message.getMessageId().setEntryLocator(sequence);
            if (xaXid == null) {
                this.pendingAdditions.add(sequence);
                c.onCompletion(new Runnable(){

                    @Override
                    public void run() {
                        message.getMessageId().setFutureOrSequenceLong(sequence);
                    }
                });
                if (this.indexListener != null) {
                    this.indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable(){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        @Override
                        public void run() {
                            LinkedList<Long> linkedList = JDBCMessageStore.this.pendingAdditions;
                            synchronized (linkedList) {
                                JDBCMessageStore.this.pendingAdditions.remove(sequence);
                            }
                        }
                    }));
                } else {
                    this.pendingAdditions.remove(sequence);
                }
            }
        }
        try {
            this.adapter.doAddMessage(c, sequenceId, messageId, this.destination, data, message.getExpiration(), this.isPrioritizedMessages() ? message.getPriority() : (byte)0, xaXid);
        }
        catch (SQLException e) {
            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
        }
        finally {
            c.close();
        }
        if (xaXid == null) {
            this.onAdd(message, sequenceId, message.getPriority());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long minPendingSequeunceId() {
        LinkedList<Long> linkedList = this.pendingAdditions;
        synchronized (linkedList) {
            if (!this.pendingAdditions.isEmpty()) {
                return this.pendingAdditions.get(0);
            }
            return this.persistenceAdapter.sequenceGenerator.getLastSequenceId() + 1L;
        }
    }

    @Override
    public void updateMessage(Message message) throws IOException {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            this.adapter.doUpdateMessage(c, this.destination, message.getMessageId(), ByteSequenceData.toByteArray(this.wireFormat.marshal(message)));
        }
    }

    protected void onAdd(Message message, long sequenceId, byte priority) {
    }

    public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext(context);){
            this.adapter.doAddMessageReference(c, this.persistenceAdapter.getNextSequenceId(), messageId, this.destination, expirationTime, messageRef);
        }
    }

    @Override
    public Message getMessage(MessageId messageId) throws IOException {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            Message answer;
            byte[] data = this.adapter.doGetMessage(c, messageId);
            if (data == null) {
                Message message = null;
                return message;
            }
            Message message = answer = (Message)this.wireFormat.unmarshal(new ByteSequence(data));
            return message;
        }
    }

    public String getMessageReference(MessageId messageId) throws IOException {
        long id = messageId.getBrokerSequenceId();
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            String string = this.adapter.doGetMessageReference(c, id);
            return string;
        }
    }

    @Override
    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
        long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null && (Long)ack.getLastMessageId().getFutureOrSequenceLong() != 0L ? (Long)ack.getLastMessageId().getFutureOrSequenceLong() : this.persistenceAdapter.getStoreSequenceIdForMessageId(context, ack.getLastMessageId(), this.destination)[0];
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext(context);){
            this.adapter.doRemoveMessage(c, seq, context != null ? context.getXid() : null);
        }
    }

    @Override
    public void recover(final MessageRecoveryListener listener) throws Exception {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            this.adapter.doRecover(c, this.destination, new JDBCMessageRecoveryListener(){

                @Override
                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                    if (listener.hasSpace()) {
                        Message msg = (Message)JDBCMessageStore.this.wireFormat.unmarshal(new ByteSequence(data));
                        msg.getMessageId().setBrokerSequenceId(sequenceId);
                        return listener.recoverMessage(msg);
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Message recovery limit reached for MessageRecoveryListener");
                    }
                    return false;
                }

                @Override
                public boolean recoverMessageReference(String reference) throws Exception {
                    if (listener.hasSpace()) {
                        return listener.recoverMessageReference(new MessageId(reference));
                    }
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Message recovery limit reached for MessageRecoveryListener");
                    }
                    return false;
                }
            });
        }
    }

    @Override
    public void removeAllMessages(ConnectionContext context) throws IOException {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext(context);){
            this.adapter.doRemoveAllMessages(c, this.destination);
        }
    }

    @Override
    public int getMessageCount() throws IOException {
        int result = 0;
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            result = this.adapter.doGetMessageCount(c, this.destination);
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
        try (TransactionContext c = this.persistenceAdapter.getTransactionContext();){
            if (LOG.isTraceEnabled()) {
                LOG.trace(this + " recoverNext lastRecovered:" + Arrays.toString(this.perPriorityLastRecovered) + ", minPending:" + this.minPendingSequeunceId());
            }
            this.adapter.doRecoverNextMessages(c, this.destination, this.perPriorityLastRecovered, this.minPendingSequeunceId(), maxReturned, this.isPrioritizedMessages(), new JDBCMessageRecoveryListener(){

                @Override
                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
                    if (listener.hasSpace()) {
                        Message msg = (Message)JDBCMessageStore.this.wireFormat.unmarshal(new ByteSequence(data));
                        msg.getMessageId().setBrokerSequenceId(sequenceId);
                        msg.getMessageId().setFutureOrSequenceLong(sequenceId);
                        listener.recoverMessage(msg);
                        JDBCMessageStore.this.trackLastRecovered(sequenceId, msg.getPriority());
                        return true;
                    }
                    return false;
                }

                @Override
                public boolean recoverMessageReference(String reference) throws Exception {
                    if (listener.hasSpace()) {
                        listener.recoverMessageReference(new MessageId(reference));
                        return true;
                    }
                    return false;
                }
            });
        }
    }

    private void trackLastRecovered(long sequenceId, int priority) {
        this.perPriorityLastRecovered[this.isPrioritizedMessages() ? priority : 0] = sequenceId;
    }

    @Override
    public void resetBatching() {
        if (LOG.isTraceEnabled()) {
            LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(this.perPriorityLastRecovered));
        }
        this.setLastRecovered(-1L);
    }

    private void setLastRecovered(long val) {
        for (int i = 0; i < this.perPriorityLastRecovered.length; ++i) {
            this.perPriorityLastRecovered[i] = val;
        }
    }

    @Override
    public void setBatch(MessageId messageId) {
        if (LOG.isTraceEnabled()) {
            LOG.trace(this + " setBatch: last recovered: " + Arrays.toString(this.perPriorityLastRecovered));
        }
        try {
            long[] storedValues = this.persistenceAdapter.getStoreSequenceIdForMessageId(null, messageId, this.destination);
            this.setLastRecovered(storedValues[0]);
        }
        catch (IOException ignoredAsAlreadyLogged) {
            this.resetBatching();
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace(this + " setBatch: new last recovered: " + Arrays.toString(this.perPriorityLastRecovered));
        }
    }

    @Override
    public void setPrioritizedMessages(boolean prioritizedMessages) {
        super.setPrioritizedMessages(prioritizedMessages);
    }

    public String toString() {
        return this.destination.getPhysicalName() + ",pendingSize:" + this.pendingAdditions.size();
    }

    class Duration {
        static final int LIMIT = 100;
        final long start = System.currentTimeMillis();
        final String name;

        Duration(String name) {
            this.name = name;
        }

        void end() {
            this.end(null);
        }

        void end(Object o) {
            long duration = System.currentTimeMillis() - this.start;
            if (duration > 100L) {
                System.err.println(this.name + " took a long time: " + duration + "ms " + o);
            }
        }
    }
}

