/*
 * Decompiled with CFR 0.152.
 */
package com.alicp.jetcache.redis;

import com.alicp.jetcache.CacheConfigException;
import com.alicp.jetcache.CacheManager;
import com.alicp.jetcache.CacheResult;
import com.alicp.jetcache.redis.RedisCache;
import com.alicp.jetcache.redis.RedisCacheConfig;
import com.alicp.jetcache.support.BroadcastManager;
import com.alicp.jetcache.support.CacheMessage;
import com.alicp.jetcache.support.SquashedLogger;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.UnifiedJedis;

public class RedisBroadcastManager
extends BroadcastManager {
    private static final Logger logger = LoggerFactory.getLogger(RedisBroadcastManager.class);
    private final byte[] channel;
    private final String channelStr;
    private final RedisCacheConfig<Object, Object> config;
    private volatile CacheMessagePubSub cacheMessagePubSub;
    private volatile boolean closed;
    private volatile boolean subscribe;
    private boolean subscribeThreadStart;
    private final ReentrantLock reentrantLock = new ReentrantLock();

    public RedisBroadcastManager(CacheManager cacheManager, RedisCacheConfig<Object, Object> config) {
        super(cacheManager);
        this.channelStr = config.getBroadcastChannel();
        this.channel = this.channelStr.getBytes(StandardCharsets.UTF_8);
        this.config = config;
        this.checkConfig(config);
        if (config.getJedis() == null && config.getJedisPool() == null) {
            throw new CacheConfigException("no jedis");
        }
        if (config.getJedis() != null && config.getJedisPool() != null) {
            throw new CacheConfigException("'jedis' and 'jedisPool' can't set simultaneously");
        }
    }

    public void startSubscribe() {
        this.reentrantLock.lock();
        try {
            if (this.subscribeThreadStart) {
                throw new IllegalStateException("subscribe thread is started");
            }
            this.cacheMessagePubSub = new CacheMessagePubSub();
            Thread subThread = new Thread(this::runSubThread, "Sub_" + this.channelStr);
            subThread.setDaemon(true);
            subThread.start();
            this.subscribeThreadStart = true;
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    private void runSubThread() {
        while (!this.closed) {
            this.runSubThread0();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runSubThread0() {
        Object jedisObj = null;
        try {
            jedisObj = this.writeCommands();
            if (jedisObj instanceof Jedis) {
                this.subscribe = true;
                ((Jedis)jedisObj).subscribe((BinaryJedisPubSub)this.cacheMessagePubSub, (byte[][])new byte[][]{this.channel});
            } else if (jedisObj instanceof UnifiedJedis) {
                this.subscribe = true;
                ((UnifiedJedis)jedisObj).subscribe((BinaryJedisPubSub)this.cacheMessagePubSub, (byte[][])new byte[][]{this.channel});
            }
        }
        catch (Throwable e) {
            SquashedLogger.getLogger((Logger)logger).error((CharSequence)"run jedis subscribe thread error: {}", e);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        finally {
            this.subscribe = false;
            RedisCache.closeJedis(jedisObj);
        }
    }

    Object writeCommands() {
        return this.config.getJedis() != null ? this.config.getJedis() : this.config.getJedisPool().getResource();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CacheResult publish(CacheMessage message) {
        Object jedisObj = null;
        try {
            jedisObj = this.writeCommands();
            byte[] value = (byte[])this.config.getValueEncoder().apply(message);
            if (jedisObj instanceof Jedis) {
                ((Jedis)jedisObj).publish(this.channel, value);
            } else {
                ((UnifiedJedis)jedisObj).publish(this.channel, value);
            }
            CacheResult cacheResult = CacheResult.SUCCESS_WITHOUT_MSG;
            return cacheResult;
        }
        catch (Exception ex) {
            SquashedLogger.getLogger((Logger)logger).error((CharSequence)"jetcache publish error", (Throwable)ex);
            CacheResult cacheResult = new CacheResult((Throwable)ex);
            return cacheResult;
        }
        finally {
            RedisCache.closeJedis(jedisObj);
        }
    }

    public void close() {
        this.reentrantLock.lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.subscribe) {
                try {
                    this.cacheMessagePubSub.unsubscribe((Object[])new byte[][]{this.channel});
                }
                catch (Exception e) {
                    logger.warn("unsubscribe {} fail", (Object)this.channelStr, (Object)e);
                }
            }
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    class CacheMessagePubSub
    extends BinaryJedisPubSub {
        CacheMessagePubSub() {
        }

        public void onMessage(byte[] channel, byte[] message) {
            RedisBroadcastManager.this.processNotification(message, RedisBroadcastManager.this.config.getValueDecoder());
        }
    }
}

