/*
 * Decompiled with CFR 0.152.
 */
package com.primeton.pmq.broker.scheduler;

import com.primeton.pmq.broker.Broker;
import com.primeton.pmq.broker.BrokerFilter;
import com.primeton.pmq.broker.BrokerService;
import com.primeton.pmq.broker.Connection;
import com.primeton.pmq.broker.ConnectionContext;
import com.primeton.pmq.broker.Connector;
import com.primeton.pmq.broker.ProducerBrokerExchange;
import com.primeton.pmq.broker.region.ConnectionStatistics;
import com.primeton.pmq.broker.scheduler.Job;
import com.primeton.pmq.broker.scheduler.JobListener;
import com.primeton.pmq.broker.scheduler.JobScheduler;
import com.primeton.pmq.broker.scheduler.JobSchedulerFacade;
import com.primeton.pmq.broker.scheduler.JobSchedulerStore;
import com.primeton.pmq.command.Command;
import com.primeton.pmq.command.ConnectionControl;
import com.primeton.pmq.command.ExceptionResponse;
import com.primeton.pmq.command.Message;
import com.primeton.pmq.command.MessageId;
import com.primeton.pmq.command.PMQDestination;
import com.primeton.pmq.command.ProducerId;
import com.primeton.pmq.command.ProducerInfo;
import com.primeton.pmq.command.Response;
import com.primeton.pmq.openwire.OpenWireFormat;
import com.primeton.pmq.security.SecurityContext;
import com.primeton.pmq.state.ProducerState;
import com.primeton.pmq.transaction.Synchronization;
import com.primeton.pmq.usage.JobSchedulerUsage;
import com.primeton.pmq.usage.SystemUsage;
import com.primeton.pmq.util.ByteSequence;
import com.primeton.pmq.util.IdGenerator;
import com.primeton.pmq.util.LongSequenceGenerator;
import com.primeton.pmq.util.TypeConversionSupport;
import com.primeton.pmq.wireformat.WireFormat;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchedulerBroker
extends BrokerFilter
implements JobListener {
    private static final Logger LOG = LoggerFactory.getLogger(SchedulerBroker.class);
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
    private static final LongSequenceGenerator longGenerator = new LongSequenceGenerator();
    private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
    private final AtomicBoolean started = new AtomicBoolean();
    private final WireFormat wireFormat = new OpenWireFormat();
    private final ConnectionContext context = new ConnectionContext();
    private final ProducerId producerId = new ProducerId();
    private final SystemUsage systemUsage;
    private final JobSchedulerStore store;
    private JobScheduler scheduler;

    public SchedulerBroker(BrokerService brokerService, Broker next, JobSchedulerStore store2) throws Exception {
        super(next);
        this.store = store2;
        this.producerId.setConnectionId(ID_GENERATOR.generateId());
        this.context.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT);
        this.context.setConnection(new Connection(){

            @Override
            public Connector getConnector() {
                return null;
            }

            @Override
            public void dispatchSync(Command message) {
                if (message instanceof ExceptionResponse) {
                    LOG.warn("Unexpected response: " + message);
                }
            }

            @Override
            public void dispatchAsync(Command command) {
                if (command instanceof ExceptionResponse) {
                    LOG.warn("Unexpected response: " + command);
                }
            }

            @Override
            public Response service(Command command) {
                return null;
            }

            @Override
            public void serviceException(Throwable error) {
                LOG.warn("Unexpected exception: " + error, error);
            }

            @Override
            public boolean isSlow() {
                return false;
            }

            @Override
            public boolean isBlocked() {
                return false;
            }

            @Override
            public boolean isConnected() {
                return false;
            }

            @Override
            public boolean isActive() {
                return false;
            }

            @Override
            public int getDispatchQueueSize() {
                return 0;
            }

            @Override
            public ConnectionStatistics getStatistics() {
                return null;
            }

            @Override
            public boolean isManageable() {
                return false;
            }

            @Override
            public String getRemoteAddress() {
                return null;
            }

            @Override
            public void serviceExceptionAsync(IOException e) {
                LOG.warn("Unexpected async ioexception: " + e, (Throwable)e);
            }

            @Override
            public String getConnectionId() {
                return null;
            }

            @Override
            public boolean isNetworkConnection() {
                return false;
            }

            @Override
            public boolean isFaultTolerantConnection() {
                return false;
            }

            @Override
            public void updateClient(ConnectionControl control) {
            }

            @Override
            public int getActiveTransactionCount() {
                return 0;
            }

            @Override
            public Long getOldestActiveTransactionDuration() {
                return null;
            }

            @Override
            public void start() throws Exception {
            }

            @Override
            public void stop() throws Exception {
            }
        });
        this.context.setBroker(next);
        this.systemUsage = brokerService.getSystemUsage();
        this.wireFormat.setVersion(brokerService.getStoreOpenWireVersion());
    }

    public synchronized JobScheduler getJobScheduler() throws Exception {
        return new JobSchedulerFacade(this);
    }

    @Override
    public void start() throws Exception {
        this.started.set(true);
        this.getInternalScheduler();
        super.start();
    }

    @Override
    public void stop() throws Exception {
        if (this.started.compareAndSet(true, false)) {
            if (this.store != null) {
                this.store.stop();
            }
            if (this.scheduler != null) {
                this.scheduler.removeListener(this);
                this.scheduler = null;
            }
        }
        super.stop();
    }

    @Override
    public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
        ConnectionContext context = producerExchange.getConnectionContext();
        String jobId = (String)messageSend.getProperty("scheduledJobId");
        final Object cronValue = messageSend.getProperty("PMQ_SCHEDULED_CRON");
        final Object periodValue = messageSend.getProperty("PMQ_SCHEDULED_PERIOD");
        final Object delayValue = messageSend.getProperty("PMQ_SCHEDULED_DELAY");
        String physicalName = messageSend.getDestination().getPhysicalName();
        boolean schedularManage = physicalName.regionMatches(true, 0, "PMQ.Scheduler.Management", 0, "PMQ.Scheduler.Management".length());
        if (schedularManage) {
            JobScheduler scheduler = this.getInternalScheduler();
            PMQDestination replyTo = messageSend.getReplyTo();
            String action = (String)messageSend.getProperty("PMQ_SCHEDULER_ACTION");
            if (action != null) {
                long finish;
                long start2;
                Object startTime = messageSend.getProperty("ACTION_START_TIME");
                Object endTime = messageSend.getProperty("ACTION_END_TIME");
                if (replyTo != null && action.equals("BROWSE")) {
                    if (startTime != null && endTime != null) {
                        start2 = (Long)TypeConversionSupport.convert(startTime, Long.class);
                        finish = (Long)TypeConversionSupport.convert(endTime, Long.class);
                        for (Job job : scheduler.getAllJobs(start2, finish)) {
                            this.sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
                        }
                    } else {
                        for (Job job : scheduler.getAllJobs()) {
                            this.sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
                        }
                    }
                }
                if (jobId != null && action.equals("REMOVE")) {
                    scheduler.remove(jobId);
                } else if (action.equals("REMOVEALL")) {
                    if (startTime != null && endTime != null) {
                        start2 = (Long)TypeConversionSupport.convert(startTime, Long.class);
                        finish = (Long)TypeConversionSupport.convert(endTime, Long.class);
                        scheduler.removeAllJobs(start2, finish);
                    } else {
                        scheduler.removeAllJobs();
                    }
                }
            }
        } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
            JobSchedulerUsage usage;
            if (this.systemUsage.getJobSchedulerUsage() != null && (usage = this.systemUsage.getJobSchedulerUsage()).isFull()) {
                long start3;
                String logMessage = "Job Scheduler Store is Full (" + usage.getPercentUsage() + "% of " + usage.getLimit() + "). Stopping producer (" + messageSend.getProducerId() + ") to prevent flooding of the job scheduler store. See http://pmq.primeton.com/producer-flow-control.html for more info";
                long nextWarn = start3 = System.currentTimeMillis();
                while (!usage.waitForSpace(1000L)) {
                    if (context.getStopping().get()) {
                        throw new IOException("Connection closed, send aborted.");
                    }
                    long now = System.currentTimeMillis();
                    if (now < nextWarn) continue;
                    LOG.info("" + usage + ": " + logMessage + " (blocking for: " + (now - start3) / 1000L + "s)");
                    nextWarn = now + 30000L;
                }
            }
            if (context.isInTransaction()) {
                context.getTransaction().addSynchronization(new Synchronization(){

                    @Override
                    public void afterCommit() throws Exception {
                        SchedulerBroker.this.doSchedule(messageSend, cronValue, periodValue, delayValue);
                    }
                });
            } else {
                this.doSchedule(messageSend, cronValue, periodValue, delayValue);
            }
        } else {
            super.send(producerExchange, messageSend);
        }
    }

    private void doSchedule(Message messageSend, Object cronValue, Object periodValue, Object delayValue) throws Exception {
        Object repeatValue;
        long delay = 0L;
        long period = 0L;
        int repeat = 0;
        String cronEntry = "";
        Message msg = messageSend.copy();
        msg.setTransactionId(null);
        ByteSequence packet = this.wireFormat.marshal(msg);
        if (cronValue != null) {
            cronEntry = cronValue.toString();
        }
        if (periodValue != null) {
            period = (Long)TypeConversionSupport.convert(periodValue, Long.class);
        }
        if (delayValue != null) {
            delay = (Long)TypeConversionSupport.convert(delayValue, Long.class);
        }
        if ((repeatValue = msg.getProperty("PMQ_SCHEDULED_REPEAT")) != null) {
            repeat = (Integer)TypeConversionSupport.convert(repeatValue, Integer.class);
        }
        MessageId jobId = new MessageId(messageSend.getMessageId().getProducerId(), longGenerator.getNextSequenceId());
        this.getInternalScheduler().schedule(jobId.toString(), new ByteSequence(packet.data, packet.offset, packet.length), cronEntry, delay, period, repeat);
    }

    @Override
    public void scheduledJob(String id, ByteSequence job) {
        ByteSequence packet = new ByteSequence(job.getData(), job.getOffset(), job.getLength());
        try {
            Message messageSend = (Message)this.wireFormat.unmarshal(packet);
            messageSend.setOriginalTransactionId(null);
            Object repeatValue = messageSend.getProperty("PMQ_SCHEDULED_REPEAT");
            Object cronValue = messageSend.getProperty("PMQ_SCHEDULED_CRON");
            String cronStr = cronValue != null ? cronValue.toString() : null;
            int repeat = 0;
            if (repeatValue != null) {
                repeat = (Integer)TypeConversionSupport.convert(repeatValue, Integer.class);
            }
            if (repeat != 0 || cronStr != null && cronStr.length() > 0) {
                messageSend.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
            }
            messageSend.setProperty("scheduledJobId", id);
            messageSend.removeProperty("PMQ_SCHEDULED_PERIOD");
            messageSend.removeProperty("PMQ_SCHEDULED_DELAY");
            messageSend.removeProperty("PMQ_SCHEDULED_REPEAT");
            messageSend.removeProperty("PMQ_SCHEDULED_CRON");
            if (messageSend.getTimestamp() > 0L && messageSend.getExpiration() > 0L) {
                long expiration;
                long oldExpiration = messageSend.getExpiration();
                long newTimeStamp = System.currentTimeMillis();
                long timeToLive = 0L;
                long oldTimestamp = messageSend.getTimestamp();
                if (oldExpiration > 0L) {
                    timeToLive = oldExpiration - oldTimestamp;
                }
                if ((expiration = timeToLive + newTimeStamp) > oldExpiration) {
                    if (timeToLive > 0L && expiration > 0L) {
                        messageSend.setExpiration(expiration);
                    }
                    messageSend.setTimestamp(newTimeStamp);
                    LOG.debug("Set message {} timestamp from {} to {}", new Object[]{messageSend.getMessageId(), oldTimestamp, newTimeStamp});
                }
            }
            messageSend.beforeMarshall(this.wireFormat);
            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
            producerExchange.setConnectionContext(this.context);
            producerExchange.setMutable(true);
            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
            super.send(producerExchange, messageSend);
        }
        catch (Exception e) {
            LOG.error("Failed to send scheduled message {}", (Object)id, (Object)e);
        }
    }

    protected synchronized JobScheduler getInternalScheduler() throws Exception {
        if (this.started.get()) {
            if (this.scheduler == null && this.store != null) {
                this.scheduler = this.store.getJobScheduler("JMS");
                this.scheduler.addListener(this);
                this.scheduler.startDispatching();
            }
            return this.scheduler;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendScheduledJob(ConnectionContext context, Job job, PMQDestination replyTo) throws Exception {
        ByteSequence packet = new ByteSequence(job.getPayload());
        try {
            Message msg = (Message)this.wireFormat.unmarshal(packet);
            msg.setOriginalTransactionId(null);
            msg.setPersistent(false);
            msg.setType("Advisory");
            msg.setMessageId(new MessageId(this.producerId, this.messageIdGenerator.getNextSequenceId()));
            msg.setOriginalDestination(msg.getDestination());
            msg.setDestination(replyTo);
            msg.setResponseRequired(false);
            msg.setProducerId(this.producerId);
            msg.setProperty("scheduledJobId", job.getJobId());
            boolean originalFlowControl = context.isProducerFlowControl();
            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
            producerExchange.setConnectionContext(context);
            producerExchange.setMutable(true);
            producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
            try {
                context.setProducerFlowControl(false);
                this.next.send(producerExchange, msg);
            }
            finally {
                context.setProducerFlowControl(originalFlowControl);
            }
        }
        catch (Exception e) {
            LOG.error("Failed to send scheduled message {}", (Object)job.getJobId(), (Object)e);
        }
    }
}

