/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.redis.state;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.redis.common.config.JedisPoolConfig;
import org.apache.storm.redis.common.container.JedisCommandsContainerBuilder;
import org.apache.storm.redis.common.container.JedisCommandsInstanceContainer;
import org.apache.storm.redis.state.RedisKeyValueStateIterator;
import org.apache.storm.redis.utils.RedisEncoder;
import org.apache.storm.state.DefaultStateSerializer;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.state.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisCommands;

public class RedisKeyValueState<K, V>
implements KeyValueState<K, V> {
    public static final int ITERATOR_CHUNK_SIZE = 100;
    private static final Logger LOG = LoggerFactory.getLogger(RedisKeyValueState.class);
    private static final String COMMIT_TXID_KEY = "commit";
    private static final String PREPARE_TXID_KEY = "prepare";
    private final String namespace;
    private final String prepareNamespace;
    private final String txidNamespace;
    private final RedisEncoder<K, V> encoder;
    private final JedisCommandsInstanceContainer jedisContainer;
    private Map<String, String> pendingPrepare;
    private Map<String, String> pendingCommit;
    private Map<String, String> txIds;

    public RedisKeyValueState(String namespace) {
        this(namespace, new JedisPoolConfig.Builder().build());
    }

    public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig) {
        this(namespace, poolConfig, (Serializer<K>)new DefaultStateSerializer(), (Serializer<V>)new DefaultStateSerializer());
    }

    public RedisKeyValueState(String namespace, JedisPoolConfig poolConfig, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this(namespace, JedisCommandsContainerBuilder.build(poolConfig), keySerializer, valueSerializer);
    }

    public RedisKeyValueState(String namespace, JedisCommandsInstanceContainer jedisContainer, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.namespace = namespace;
        this.prepareNamespace = namespace + "$prepare";
        this.txidNamespace = namespace + "$txid";
        this.encoder = new RedisEncoder<K, V>(keySerializer, valueSerializer);
        this.jedisContainer = jedisContainer;
        this.pendingPrepare = new ConcurrentHashMap<String, String>();
        this.initTxids();
        this.initPendingCommit();
    }

    private void initTxids() {
        JedisCommands commands = null;
        try {
            commands = this.jedisContainer.getInstance();
            this.txIds = commands.exists(this.txidNamespace) != false ? commands.hgetAll(this.txidNamespace) : new HashMap<String, String>();
            LOG.debug("initTxids, txIds {}", this.txIds);
        }
        finally {
            this.jedisContainer.returnInstance(commands);
        }
    }

    private void initPendingCommit() {
        JedisCommands commands = null;
        try {
            commands = this.jedisContainer.getInstance();
            if (commands.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Loading previously prepared commit from {}", (Object)this.prepareNamespace);
                this.pendingCommit = Collections.unmodifiableMap(commands.hgetAll(this.prepareNamespace));
            } else {
                LOG.debug("No previously prepared commits.");
                this.pendingCommit = Collections.emptyMap();
            }
        }
        finally {
            this.jedisContainer.returnInstance(commands);
        }
    }

    public void put(K key, V value) {
        LOG.debug("put key '{}', value '{}'", key, value);
        String redisKey = this.encoder.encodeKey(key);
        this.pendingPrepare.put(redisKey, this.encoder.encodeValue(value));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public V get(K key) {
        LOG.debug("get key '{}'", key);
        String redisKey = this.encoder.encodeKey(key);
        String redisValue = null;
        if (this.pendingPrepare.containsKey(redisKey)) {
            redisValue = this.pendingPrepare.get(redisKey);
        } else if (this.pendingCommit.containsKey(redisKey)) {
            redisValue = this.pendingCommit.get(redisKey);
        } else {
            JedisCommands commands = null;
            try {
                commands = this.jedisContainer.getInstance();
                redisValue = commands.hget(this.namespace, redisKey);
            }
            finally {
                this.jedisContainer.returnInstance(commands);
            }
        }
        V value = null;
        if (redisValue != null) {
            value = this.encoder.decodeValue(redisValue);
        }
        LOG.debug("Value for key '{}' is '{}'", key, value);
        return value;
    }

    public V get(K key, V defaultValue) {
        V val = this.get(key);
        return val != null ? val : defaultValue;
    }

    public V delete(K key) {
        LOG.debug("delete key '{}'", key);
        String redisKey = this.encoder.encodeKey(key);
        V curr = this.get(key);
        this.pendingPrepare.put(redisKey, RedisEncoder.TOMBSTONE);
        return curr;
    }

    public Iterator<Map.Entry<K, V>> iterator() {
        return new RedisKeyValueStateIterator<K, V>(this.namespace, this.jedisContainer, this.pendingPrepare.entrySet().iterator(), this.pendingCommit.entrySet().iterator(), 100, this.encoder.getKeySerializer(), this.encoder.getValueSerializer());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void prepareCommit(long txid) {
        LOG.debug("prepareCommit txid {}", (Object)txid);
        this.validatePrepareTxid(txid);
        JedisCommands commands = null;
        try {
            Map<String, String> currentPending = this.pendingPrepare;
            this.pendingPrepare = new ConcurrentHashMap<String, String>();
            commands = this.jedisContainer.getInstance();
            if (commands.exists(this.prepareNamespace).booleanValue()) {
                LOG.debug("Prepared txn already exists, will merge", (Object)txid);
                for (Map.Entry<String, String> e : this.pendingCommit.entrySet()) {
                    if (currentPending.containsKey(e.getKey())) continue;
                    currentPending.put(e.getKey(), e.getValue());
                }
            }
            if (!currentPending.isEmpty()) {
                commands.hmset(this.prepareNamespace, currentPending);
            } else {
                LOG.debug("Nothing to save for prepareCommit, txid {}.", (Object)txid);
            }
            this.txIds.put(PREPARE_TXID_KEY, String.valueOf(txid));
            commands.hmset(this.txidNamespace, this.txIds);
            this.pendingCommit = Collections.unmodifiableMap(currentPending);
            this.jedisContainer.returnInstance(commands);
        }
        catch (Throwable throwable) {
            this.jedisContainer.returnInstance(commands);
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void commit(long txid) {
        LOG.debug("commit txid {}", (Object)txid);
        this.validateCommitTxid(txid);
        JedisCommands commands = null;
        try {
            commands = this.jedisContainer.getInstance();
            if (!this.pendingCommit.isEmpty()) {
                ArrayList<String> keysToDelete = new ArrayList<String>();
                HashMap<String, String> keysToAdd = new HashMap<String, String>();
                for (Map.Entry<String, String> entry : this.pendingCommit.entrySet()) {
                    if (RedisEncoder.TOMBSTONE.equals(entry.getValue())) {
                        keysToDelete.add(entry.getKey());
                        continue;
                    }
                    keysToAdd.put(entry.getKey(), entry.getValue());
                }
                if (!keysToAdd.isEmpty()) {
                    commands.hmset(this.namespace, keysToAdd);
                }
                if (!keysToDelete.isEmpty()) {
                    commands.hdel(this.namespace, keysToDelete.toArray(new String[0]));
                }
            } else {
                LOG.debug("Nothing to save for commit, txid {}.", (Object)txid);
            }
            this.txIds.put(COMMIT_TXID_KEY, String.valueOf(txid));
            commands.hmset(this.txidNamespace, this.txIds);
            commands.del(this.prepareNamespace);
            this.pendingCommit = Collections.emptyMap();
        }
        finally {
            this.jedisContainer.returnInstance(commands);
        }
    }

    public void commit() {
        JedisCommands commands = null;
        try {
            commands = this.jedisContainer.getInstance();
            if (!this.pendingPrepare.isEmpty()) {
                commands.hmset(this.namespace, this.pendingPrepare);
            } else {
                LOG.debug("Nothing to save for commit");
            }
            this.pendingPrepare = new ConcurrentHashMap<String, String>();
        }
        finally {
            this.jedisContainer.returnInstance(commands);
        }
    }

    public void rollback() {
        LOG.debug("rollback");
        JedisCommands commands = null;
        try {
            commands = this.jedisContainer.getInstance();
            if (commands.exists(this.prepareNamespace).booleanValue()) {
                commands.del(this.prepareNamespace);
            } else {
                LOG.debug("Nothing to rollback, prepared data is empty");
            }
            Long lastCommittedId = this.lastCommittedTxid();
            if (lastCommittedId != null) {
                this.txIds.put(PREPARE_TXID_KEY, String.valueOf(lastCommittedId));
            } else {
                this.txIds.remove(PREPARE_TXID_KEY);
            }
            if (!this.txIds.isEmpty()) {
                LOG.debug("hmset txidNamespace {}, txIds {}", (Object)this.txidNamespace, this.txIds);
                commands.hmset(this.txidNamespace, this.txIds);
            }
            this.pendingCommit = Collections.emptyMap();
            this.pendingPrepare = new ConcurrentHashMap<String, String>();
        }
        finally {
            this.jedisContainer.returnInstance(commands);
        }
    }

    private void validatePrepareTxid(long txid) {
        Long committedTxid = this.lastCommittedTxid();
        if (committedTxid != null && txid <= committedTxid) {
            throw new RuntimeException("Invalid txid '" + txid + "' for prepare. Txid '" + committedTxid + "' is already committed");
        }
    }

    private void validateCommitTxid(long txid) {
        Long committedTxid = this.lastCommittedTxid();
        if (committedTxid != null && txid < committedTxid) {
            throw new RuntimeException("Invalid txid '" + txid + "' txid '" + committedTxid + "' is already committed");
        }
        Long preparedTxid = this.lastPreparedTxid();
        if (preparedTxid != null && txid != preparedTxid) {
            throw new RuntimeException("Invalid txid '" + txid + "' not same as prepared txid '" + preparedTxid + "'");
        }
    }

    private Long lastCommittedTxid() {
        return this.lastId(COMMIT_TXID_KEY);
    }

    private Long lastPreparedTxid() {
        return this.lastId(PREPARE_TXID_KEY);
    }

    private Long lastId(String key) {
        Long lastId = null;
        String str = this.txIds.get(key);
        if (str != null) {
            lastId = Long.valueOf(str);
        }
        return lastId;
    }
}

