/*
 * Decompiled with CFR 0.152.
 */
package com.primeton.pmq.store.kahadb.scheduler;

import com.primeton.pmq.broker.scheduler.CronParser;
import com.primeton.pmq.broker.scheduler.Job;
import com.primeton.pmq.broker.scheduler.JobListener;
import com.primeton.pmq.broker.scheduler.JobScheduler;
import com.primeton.pmq.protobuf.Buffer;
import com.primeton.pmq.store.kahadb.data.KahaAddScheduledJobCommand;
import com.primeton.pmq.store.kahadb.data.KahaRemoveScheduledJobCommand;
import com.primeton.pmq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
import com.primeton.pmq.store.kahadb.data.KahaRescheduleJobCommand;
import com.primeton.pmq.store.kahadb.disk.index.BTreeIndex;
import com.primeton.pmq.store.kahadb.disk.journal.Location;
import com.primeton.pmq.store.kahadb.disk.page.Transaction;
import com.primeton.pmq.store.kahadb.disk.util.LongMarshaller;
import com.primeton.pmq.store.kahadb.scheduler.JobImpl;
import com.primeton.pmq.store.kahadb.scheduler.JobLocation;
import com.primeton.pmq.store.kahadb.scheduler.JobLocationsMarshaller;
import com.primeton.pmq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import com.primeton.pmq.util.ByteSequence;
import com.primeton.pmq.util.IdGenerator;
import com.primeton.pmq.util.ServiceStopper;
import com.primeton.pmq.util.ServiceSupport;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.MessageFormatException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSchedulerImpl
extends ServiceSupport
implements Runnable,
JobScheduler {
    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerImpl.class);
    private final JobSchedulerStoreImpl store;
    private final AtomicBoolean running = new AtomicBoolean();
    private String name;
    private BTreeIndex<Long, List<JobLocation>> index;
    private Thread thread;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private final List<JobListener> jobListeners = new CopyOnWriteArrayList<JobListener>();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
    private final ScheduleTime scheduleTime = new ScheduleTime();

    JobSchedulerImpl(JobSchedulerStoreImpl store2) {
        this.store = store2;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public void addListener(JobListener l) {
        this.jobListeners.add(l);
    }

    @Override
    public void removeListener(JobListener l) {
        this.jobListeners.remove(l);
    }

    @Override
    public void schedule(String jobId, ByteSequence payload, long delay) throws IOException {
        this.doSchedule(jobId, payload, "", 0L, delay, 0);
    }

    @Override
    public void schedule(String jobId, ByteSequence payload, String cronEntry) throws Exception {
        this.doSchedule(jobId, payload, cronEntry, 0L, 0L, 0);
    }

    @Override
    public void schedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException {
        this.doSchedule(jobId, payload, cronEntry, delay, period, repeat);
    }

    @Override
    public void remove(long time) throws IOException {
        this.doRemoveRange(time, time);
    }

    @Override
    public void remove(String jobId) throws IOException {
        this.doRemove(-1L, jobId);
    }

    @Override
    public void removeAllJobs() throws IOException {
        this.doRemoveRange(0L, Long.MAX_VALUE);
    }

    @Override
    public void removeAllJobs(long start2, long finish) throws IOException {
        this.doRemoveRange(start2, finish);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public long getNextScheduleTime() throws IOException {
        this.store.readLockIndex();
        try {
            Map.Entry<Long, List<JobLocation>> first = this.index.getFirst(this.store.getPageFile().tx());
            long l = first != null ? first.getKey() : -1L;
            return l;
        }
        finally {
            this.store.readUnlockIndex();
        }
    }

    @Override
    public List<Job> getNextScheduleJobs() throws IOException {
        final ArrayList<Job> result = new ArrayList<Job>();
        this.store.readLockIndex();
        try {
            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    Map.Entry first = JobSchedulerImpl.this.index.getFirst(tx);
                    if (first != null) {
                        for (JobLocation jl : (List)first.getValue()) {
                            ByteSequence bs = JobSchedulerImpl.this.getPayload(jl.getLocation());
                            JobImpl job = new JobImpl(jl, bs);
                            result.add(job);
                        }
                    }
                }
            });
        }
        finally {
            this.store.readUnlockIndex();
        }
        return result;
    }

    private Map.Entry<Long, List<JobLocation>> getNextToSchedule() throws IOException {
        this.store.readLockIndex();
        try {
            if (!this.store.isStopped() && !this.store.isStopping()) {
                Map.Entry<Long, List<JobLocation>> first;
                Map.Entry<Long, List<JobLocation>> entry = first = this.index.getFirst(this.store.getPageFile().tx());
                return entry;
            }
        }
        finally {
            this.store.readUnlockIndex();
        }
        return null;
    }

    @Override
    public List<Job> getAllJobs() throws IOException {
        final ArrayList<Job> result = new ArrayList<Job>();
        this.store.readLockIndex();
        try {
            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    Map.Entry next;
                    Iterator iter = JobSchedulerImpl.this.index.iterator(JobSchedulerImpl.this.store.getPageFile().tx());
                    while (iter.hasNext() && (next = iter.next()) != null) {
                        for (JobLocation jl : (List)next.getValue()) {
                            ByteSequence bs = JobSchedulerImpl.this.getPayload(jl.getLocation());
                            JobImpl job = new JobImpl(jl, bs);
                            result.add(job);
                        }
                    }
                }
            });
        }
        finally {
            this.store.readUnlockIndex();
        }
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Job> getAllJobs(final long start2, final long finish) throws IOException {
        final ArrayList<Job> result = new ArrayList<Job>();
        this.store.readLockIndex();
        try {
            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>(){

                @Override
                public void execute(Transaction tx) throws IOException {
                    Map.Entry next;
                    Iterator iter = JobSchedulerImpl.this.index.iterator(tx, start2);
                    while (iter.hasNext() && (next = iter.next()) != null && next.getKey() <= finish) {
                        for (JobLocation jl : (List)next.getValue()) {
                            ByteSequence bs = JobSchedulerImpl.this.getPayload(jl.getLocation());
                            JobImpl job = new JobImpl(jl, bs);
                            result.add(job);
                        }
                    }
                }
            });
        }
        finally {
            this.store.readUnlockIndex();
        }
        return result;
    }

    private void doSchedule(String jobId, ByteSequence payload, String cronEntry, long delay, long period, int repeat) throws IOException {
        long startTime = System.currentTimeMillis();
        startTime = (startTime + 500L) / 500L * 500L;
        long time = 0L;
        if (cronEntry != null && cronEntry.length() > 0) {
            try {
                time = CronParser.getNextScheduledTime(cronEntry, startTime);
            }
            catch (MessageFormatException e) {
                throw new IOException(e.getMessage());
            }
        }
        if (time == 0L) {
            time = startTime;
        }
        time = delay > 0L ? (time += delay) : (time += period);
        KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand();
        newJob.setScheduler(this.name);
        newJob.setJobId(jobId);
        newJob.setStartTime(startTime);
        newJob.setCronEntry(cronEntry);
        newJob.setDelay(delay);
        newJob.setPeriod(period);
        newJob.setRepeat(repeat);
        newJob.setNextExecutionTime(time);
        newJob.setPayload(new Buffer(payload.getData(), payload.getOffset(), payload.getLength()));
        this.store.store(newJob);
    }

    private void doReschedule(String jobId, long executionTime, long nextExecutionTime, int rescheduledCount) throws IOException {
        KahaRescheduleJobCommand update = new KahaRescheduleJobCommand();
        update.setScheduler(this.name);
        update.setJobId(jobId);
        update.setExecutionTime(executionTime);
        update.setNextExecutionTime(nextExecutionTime);
        update.setRescheduledCount(rescheduledCount);
        this.store.store(update);
    }

    private void doRemove(long executionTime, List<JobLocation> jobs) throws IOException {
        for (JobLocation job : jobs) {
            this.doRemove(executionTime, job.getJobId());
        }
    }

    private void doRemove(long executionTime, String jobId) throws IOException {
        KahaRemoveScheduledJobCommand remove2 = new KahaRemoveScheduledJobCommand();
        remove2.setScheduler(this.name);
        remove2.setJobId(jobId);
        remove2.setNextExecutionTime(executionTime);
        this.store.store(remove2);
    }

    private void doRemoveRange(long start2, long end) throws IOException {
        KahaRemoveScheduledJobsCommand destroy = new KahaRemoveScheduledJobsCommand();
        destroy.setScheduler(this.name);
        destroy.setStartTime(start2);
        destroy.setEndTime(end);
        this.store.store(destroy);
    }

    protected void process(Transaction tx, KahaAddScheduledJobCommand command, Location location) throws IOException {
        JobLocation jobLocation = new JobLocation(location);
        jobLocation.setJobId(command.getJobId());
        jobLocation.setStartTime(command.getStartTime());
        jobLocation.setCronEntry(command.getCronEntry());
        jobLocation.setDelay(command.getDelay());
        jobLocation.setPeriod(command.getPeriod());
        jobLocation.setRepeat(command.getRepeat());
        long nextExecutionTime = command.getNextExecutionTime();
        List<JobLocation> values = null;
        jobLocation.setNextTime(nextExecutionTime);
        if (this.index.containsKey(tx, nextExecutionTime)) {
            values = this.index.remove(tx, nextExecutionTime);
        }
        if (values == null) {
            values = new ArrayList<JobLocation>();
        }
        if (!values.contains(jobLocation)) {
            values.add(jobLocation);
            this.store.incrementJournalCount(tx, location);
            this.index.put(tx, nextExecutionTime, values);
            this.scheduleTime.newJob();
        } else {
            this.index.put(tx, nextExecutionTime, values);
            LOG.trace("Job {} already in scheduler at this time {}", (Object)jobLocation.getJobId(), (Object)jobLocation.getNextTime());
        }
    }

    protected void process(Transaction tx, KahaRescheduleJobCommand command, Location location) throws IOException {
        JobLocation result = null;
        List<JobLocation> current = this.index.remove(tx, command.getExecutionTime());
        if (current != null) {
            for (int i = 0; i < current.size(); ++i) {
                JobLocation jl = current.get(i);
                if (!jl.getJobId().equals(command.getJobId())) continue;
                current.remove(i);
                if (!current.isEmpty()) {
                    this.index.put(tx, command.getExecutionTime(), current);
                }
                result = jl;
                break;
            }
        } else {
            LOG.debug("Process reschedule command for job {} non-existent executime time {}.", (Object)command.getJobId(), (Object)command.getExecutionTime());
        }
        if (result != null) {
            Location previousUpdate = result.getLastUpdate();
            List<JobLocation> target = null;
            result.setNextTime(command.getNextExecutionTime());
            result.setLastUpdate(location);
            result.setRescheduledCount(command.getRescheduledCount());
            if (!result.isCron() && result.getRepeat() > 0) {
                result.setRepeat(result.getRepeat() - 1);
            }
            if (this.index.containsKey(tx, command.getNextExecutionTime())) {
                target = this.index.remove(tx, command.getNextExecutionTime());
            }
            if (target == null) {
                target = new ArrayList<JobLocation>();
            }
            target.add(result);
            this.store.incrementJournalCount(tx, location);
            if (previousUpdate != null) {
                this.store.decrementJournalCount(tx, previousUpdate);
            }
            this.index.put(tx, command.getNextExecutionTime(), target);
            this.scheduleTime.newJob();
        } else {
            LOG.debug("Process reschedule command for non-scheduled job {} at executime time {}.", (Object)command.getJobId(), (Object)command.getExecutionTime());
        }
    }

    void process(Transaction tx, KahaRemoveScheduledJobCommand command, Location location) throws IOException {
        Long executionTime = command.getNextExecutionTime();
        List<JobLocation> values = null;
        if (executionTime == -1L) {
            Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx);
            block0: while (i.hasNext()) {
                Map.Entry<Long, List<JobLocation>> entry = i.next();
                List<JobLocation> candidates = entry.getValue();
                if (candidates == null) continue;
                for (JobLocation jl : candidates) {
                    if (!jl.getJobId().equals(command.getJobId())) continue;
                    LOG.trace("Entry {} contains the remove target: {}", (Object)entry.getKey(), (Object)command.getJobId());
                    executionTime = entry.getKey();
                    values = this.index.remove(tx, executionTime);
                    continue block0;
                }
            }
        } else {
            values = this.index.remove(tx, executionTime);
        }
        JobLocation removed = null;
        if (values != null) {
            for (JobLocation job : values) {
                if (!job.getJobId().equals(command.getJobId())) continue;
                removed = job;
                values.remove(removed);
                break;
            }
            if (!values.isEmpty()) {
                this.index.put(tx, executionTime, values);
            }
        }
        if (removed != null) {
            LOG.trace("{} removed from scheduler {}", removed, (Object)this);
            this.store.decrementJournalCount(tx, removed.getLocation());
            if (removed.getLastUpdate() != null) {
                this.store.decrementJournalCount(tx, removed.getLastUpdate());
            }
            if (removed.getLocation().getDataFileId() != location.getDataFileId()) {
                this.store.referenceRemovedLocation(tx, location, removed);
            }
        }
    }

    protected void process(Transaction tx, KahaRemoveScheduledJobsCommand command, Location location) throws IOException {
        this.removeInRange(tx, command.getStartTime(), command.getEndTime(), location);
    }

    protected void removeAll(Transaction tx) throws IOException {
        this.removeInRange(tx, 0L, Long.MAX_VALUE, null);
    }

    protected void removeInRange(Transaction tx, long start2, long finish, Location location) throws IOException {
        Map.Entry<Long, List<JobLocation>> entry;
        ArrayList<Long> keys = new ArrayList<Long>();
        Iterator<Map.Entry<Long, List<JobLocation>>> i = this.index.iterator(tx, start2);
        while (i.hasNext() && (entry = i.next()).getKey() <= finish) {
            keys.add(entry.getKey());
        }
        ArrayList<Integer> removedJobFileIds = new ArrayList<Integer>();
        HashMap<Integer, Integer> decrementJournalCount = new HashMap<Integer, Integer>();
        for (Long executionTime : keys) {
            List<JobLocation> values = this.index.remove(tx, executionTime);
            if (location == null) continue;
            for (JobLocation job : values) {
                LOG.trace("Removing {} scheduled at: {}", (Object)job, (Object)executionTime);
                decrementJournalCount.compute(job.getLocation().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
                if (job.getLastUpdate() != null) {
                    decrementJournalCount.compute(job.getLastUpdate().getDataFileId(), (key, value) -> value == null ? 1 : value + 1);
                }
                if (job.getLocation().getDataFileId() == location.getDataFileId()) continue;
                removedJobFileIds.add(job.getLocation().getDataFileId());
            }
        }
        if (!removedJobFileIds.isEmpty()) {
            this.store.referenceRemovedLocation(tx, location, removedJobFileIds);
        }
        if (decrementJournalCount.size() > 0) {
            this.store.decrementJournalCount(tx, decrementJournalCount);
        }
    }

    protected boolean removeJobAtTime(Transaction tx, String jobId, long executionTime) throws IOException {
        boolean result = false;
        List<JobLocation> jobs = this.index.remove(tx, executionTime);
        Iterator<JobLocation> jobsIter = jobs.iterator();
        while (jobsIter.hasNext()) {
            JobLocation job = jobsIter.next();
            if (!job.getJobId().equals(jobId)) continue;
            jobsIter.remove();
            this.store.decrementJournalCount(tx, job.getLocation());
            if (job.getLastUpdate() != null) {
                this.store.decrementJournalCount(tx, job.getLastUpdate());
            }
            result = true;
            break;
        }
        this.index.put(tx, executionTime, jobs);
        return result;
    }

    protected Iterator<JobLocation> getAllScheduledJobs(final Transaction tx) throws IOException {
        return new Iterator<JobLocation>(){
            final Iterator<Map.Entry<Long, List<JobLocation>>> mapIterator;
            Iterator<JobLocation> iterator;
            {
                this.mapIterator = JobSchedulerImpl.this.index.iterator(tx);
            }

            @Override
            public boolean hasNext() {
                while ((this.iterator == null || !this.iterator.hasNext()) && this.mapIterator.hasNext()) {
                    this.iterator = new ArrayList(this.mapIterator.next().getValue()).iterator();
                }
                return this.iterator != null && this.iterator.hasNext();
            }

            @Override
            public JobLocation next() {
                return this.iterator.next();
            }
        };
    }

    @Override
    public void run() {
        try {
            this.mainLoop();
        }
        catch (Throwable e) {
            if (this.running.get() && this.isStarted()) {
                LOG.error("{} Caught exception in mainloop", (Object)this, (Object)e);
            }
        }
        finally {
            if (this.running.get()) {
                try {
                    this.stop();
                }
                catch (Exception e) {
                    LOG.error("Failed to stop {}", (Object)this);
                }
            }
        }
    }

    public String toString() {
        return "JobScheduler: " + this.name;
    }

    protected void mainLoop() {
        while (this.running.get()) {
            this.scheduleTime.clearNewJob();
            try {
                long currentTime = System.currentTimeMillis();
                Map.Entry<Long, List<JobLocation>> first = this.getNextToSchedule();
                if (first != null) {
                    ArrayList list = new ArrayList(first.getValue());
                    ArrayList<JobLocation> toRemove = new ArrayList<JobLocation>(list.size());
                    long executionTime = first.getKey();
                    long nextExecutionTime = 0L;
                    if (executionTime <= currentTime) {
                        long timeUntilNextScheduled;
                        for (JobLocation job : list) {
                            if (!this.running.get()) break;
                            int repeat = job.getRepeat();
                            nextExecutionTime = this.calculateNextExecutionTime(job, currentTime, repeat);
                            long waitTime = nextExecutionTime - currentTime;
                            this.scheduleTime.setWaitTime(waitTime);
                            if (!job.isCron()) {
                                this.fireJob(job);
                                if (repeat != 0) {
                                    this.doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
                                    continue;
                                }
                                toRemove.add(job);
                                continue;
                            }
                            if (repeat == 0) {
                                this.fireJob(job);
                            }
                            if (nextExecutionTime > currentTime) {
                                this.doReschedule(job.getJobId(), executionTime, nextExecutionTime, job.getRescheduledCount() + 1);
                                if (repeat == 0) continue;
                                String jobId = ID_GENERATOR.generateId();
                                ByteSequence payload = this.getPayload(job.getLocation());
                                this.schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat());
                                waitTime = job.getDelay() != 0L ? job.getDelay() : job.getPeriod();
                                this.scheduleTime.setWaitTime(waitTime);
                                continue;
                            }
                            toRemove.add(job);
                        }
                        this.doRemove(executionTime, toRemove);
                        Map.Entry<Long, List<JobLocation>> nextUp = this.getNextToSchedule();
                        if (nextUp != null && (timeUntilNextScheduled = nextUp.getKey() - currentTime) < this.scheduleTime.getWaitTime()) {
                            this.scheduleTime.setWaitTime(timeUntilNextScheduled);
                        }
                    } else {
                        this.scheduleTime.setWaitTime(executionTime - currentTime);
                    }
                }
                this.scheduleTime.pause();
            }
            catch (Exception ioe) {
                LOG.error("{} Failed to schedule job", (Object)this.name, (Object)ioe);
                try {
                    this.store.stop();
                }
                catch (Exception e) {
                    LOG.error("{} Failed to shutdown JobSchedulerStore", (Object)this.name, (Object)e);
                }
            }
        }
    }

    void fireJob(JobLocation job) throws IllegalStateException, IOException {
        LOG.debug("Firing: {}", (Object)job);
        ByteSequence bs = this.store.getPayload(job.getLocation());
        for (JobListener l : this.jobListeners) {
            l.scheduledJob(job.getJobId(), bs);
        }
    }

    @Override
    public void startDispatching() throws Exception {
        if (!this.running.get()) {
            return;
        }
        if (this.started.compareAndSet(false, true)) {
            this.thread = new Thread((Runnable)this, "JobScheduler:" + this.name);
            this.thread.setDaemon(true);
            this.thread.start();
        }
    }

    @Override
    public void stopDispatching() throws Exception {
        if (this.started.compareAndSet(true, false)) {
            this.scheduleTime.wakeup();
            Thread t = this.thread;
            this.thread = null;
            if (t != null) {
                t.join(3000L);
            }
        }
    }

    @Override
    protected void doStart() throws Exception {
        this.running.set(true);
    }

    @Override
    protected void doStop(ServiceStopper stopper) throws Exception {
        this.running.set(false);
        this.stopDispatching();
    }

    private ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
        return this.store.getPayload(location);
    }

    long calculateNextExecutionTime(JobLocation job, long currentTime, int repeat) throws MessageFormatException {
        long result = currentTime;
        String cron = job.getCronEntry();
        if (cron != null && cron.length() > 0) {
            result = CronParser.getNextScheduledTime(cron, result);
        } else if (job.getRepeat() != 0) {
            result += job.getPeriod();
        }
        return result;
    }

    void createIndexes(Transaction tx) throws IOException {
        this.index = new BTreeIndex(this.store.getPageFile(), tx.allocate().getPageId());
    }

    void load(Transaction tx) throws IOException {
        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
        this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
        this.index.load(tx);
    }

    void read(DataInput in) throws IOException {
        this.name = in.readUTF();
        this.index = new BTreeIndex(this.store.getPageFile(), in.readLong());
        this.index.setKeyMarshaller(LongMarshaller.INSTANCE);
        this.index.setValueMarshaller(JobLocationsMarshaller.INSTANCE);
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.name);
        out.writeLong(this.index.getPageId());
    }

    static class ScheduleTime {
        private final int DEFAULT_WAIT = 500;
        private final int DEFAULT_NEW_JOB_WAIT = 100;
        private boolean newJob;
        private long waitTime = 500L;
        private final Object mutex = new Object();

        ScheduleTime() {
        }

        long getWaitTime() {
            return this.waitTime;
        }

        void setWaitTime(long waitTime) {
            if (!this.newJob) {
                this.waitTime = waitTime > 0L ? waitTime : 500L;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void pause() {
            Object object = this.mutex;
            synchronized (object) {
                try {
                    this.mutex.wait(this.waitTime);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }

        void newJob() {
            this.newJob = true;
            this.waitTime = 100L;
            this.wakeup();
        }

        void clearNewJob() {
            this.newJob = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void wakeup() {
            Object object = this.mutex;
            synchronized (object) {
                this.mutex.notifyAll();
            }
        }
    }
}

