/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import org.infinispan.AdvancedCache;
import org.infinispan.BaseCacheStream;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.LockedStream;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.Externalizer;
import org.infinispan.commons.marshall.SerializeWith;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.util.EntryWrapper;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.concurrent.locks.KeyAwareLockPromise;
import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.function.SerializablePredicate;

public class LockedStreamImpl<K, V>
implements LockedStream<K, V> {
    final CacheStream<CacheEntry<K, V>> realStream;
    final Predicate<? super CacheEntry<K, V>> predicate;
    final long time;
    final TimeUnit unit;

    public LockedStreamImpl(CacheStream<CacheEntry<K, V>> realStream, long time, TimeUnit unit) {
        this.realStream = Objects.requireNonNull(realStream);
        this.predicate = null;
        if (time <= 0L) {
            throw new IllegalArgumentException("time must be greater than 0");
        }
        this.time = time;
        this.unit = Objects.requireNonNull(unit);
    }

    LockedStreamImpl(CacheStream<CacheEntry<K, V>> realStream, Predicate<? super CacheEntry<K, V>> predicate, long time, TimeUnit unit) {
        this.realStream = realStream;
        this.predicate = predicate;
        this.time = time;
        this.unit = unit;
    }

    private LockedStream<K, V> newOrReuse(CacheStream<CacheEntry<K, V>> resultingStream) {
        if (resultingStream == this.realStream) {
            return this;
        }
        return this.newStream(resultingStream, this.predicate, this.time, this.unit);
    }

    LockedStreamImpl<K, V> newStream(CacheStream<CacheEntry<K, V>> realStream, Predicate<? super CacheEntry<K, V>> predicate, long time, TimeUnit unit) {
        return new LockedStreamImpl<K, V>(realStream, predicate, time, unit);
    }

    @Override
    public LockedStream<K, V> filter(Predicate<? super CacheEntry<K, V>> predicate) {
        Objects.nonNull(predicate);
        SerializablePredicate usedPredicate = this.predicate != null ? e -> this.predicate.test((CacheEntry<K, V>)e) && predicate.test(e) : predicate;
        return this.newStream(this.realStream, usedPredicate, this.time, this.unit);
    }

    @Override
    public void forEach(BiConsumer<Cache<K, V>, ? super CacheEntry<K, V>> biConsumer) {
        this.realStream.forEach(new CacheEntryConsumer(biConsumer, this.predicate));
    }

    @Override
    public BaseCacheStream sequentialDistribution() {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.sequentialDistribution());
    }

    @Override
    public BaseCacheStream parallelDistribution() {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.parallelDistribution());
    }

    @Override
    public BaseCacheStream filterKeySegments(Set<Integer> segments) {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.filterKeySegments((Set)segments));
    }

    @Override
    public BaseCacheStream filterKeys(Set<?> keys) {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.filterKeys((Set)keys));
    }

    @Override
    public BaseCacheStream distributedBatchSize(int batchSize) {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.distributedBatchSize(batchSize));
    }

    @Override
    public LockedStream segmentCompletionListener(BaseCacheStream.SegmentCompletionListener listener) {
        throw new UnsupportedOperationException("LockedStream doesn't support completion listener");
    }

    @Override
    public BaseCacheStream disableRehashAware() {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.disableRehashAware());
    }

    @Override
    public LockedStream timeout(long timeout, TimeUnit unit) {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.timeout(timeout, unit));
    }

    @Override
    public Iterator<CacheEntry<K, V>> iterator() {
        throw new UnsupportedOperationException("LockedStream doesn't support iterator");
    }

    @Override
    public Spliterator<CacheEntry<K, V>> spliterator() {
        throw new UnsupportedOperationException("LockedStream doesn't support spliterator");
    }

    @Override
    public boolean isParallel() {
        return this.realStream.isParallel();
    }

    @Override
    public LockedStream<K, V> sequential() {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.sequential());
    }

    @Override
    public LockedStream<K, V> parallel() {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.parallel());
    }

    @Override
    public LockedStream<K, V> unordered() {
        return this;
    }

    @Override
    public LockedStream<K, V> onClose(Runnable closeHandler) {
        return this.newOrReuse((CacheStream<CacheEntry<K, V>>)this.realStream.onClose(closeHandler));
    }

    @Override
    public void close() {
        this.realStream.close();
    }

    @SerializeWith(value=CacheEntryConsumerExternalizer.class)
    private static class CacheEntryConsumer<K, V>
    implements BiConsumer<Cache<K, V>, CacheEntry<K, V>>,
    Serializable {
        private final BiConsumer<Cache<K, V>, ? super CacheEntry<K, V>> realConsumer;
        private final Predicate<? super CacheEntry<K, V>> predicate;
        private transient LockManager lockManager;

        private CacheEntryConsumer(BiConsumer<Cache<K, V>, ? super CacheEntry<K, V>> realConsumer, Predicate<? super CacheEntry<K, V>> predicate) {
            this.realConsumer = realConsumer;
            this.predicate = predicate;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void accept(Cache<K, V> cache, CacheEntry<K, V> entry) {
            K key = entry.getKey();
            this.lock(key);
            try {
                CacheEntry<K, V> rereadEntry = cache.getAdvancedCache().getCacheEntry(key);
                if (rereadEntry != null && (this.predicate == null || this.predicate.test(rereadEntry))) {
                    AdvancedCache<K, V> cacheToUse = cache.getAdvancedCache().lockAs(key);
                    this.realConsumer.accept(cacheToUse, new EntryWrapper<K, V>(cacheToUse, entry));
                }
            }
            finally {
                this.lockManager.unlock(key, key);
            }
        }

        private void lock(K key) {
            final KeyAwareLockPromise kalp = this.lockManager.lock(key, key, 10L, TimeUnit.SECONDS);
            if (!kalp.isAvailable()) {
                try {
                    ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker(){

                        @Override
                        public boolean block() throws InterruptedException {
                            kalp.lock();
                            return true;
                        }

                        @Override
                        public boolean isReleasable() {
                            return kalp.isAvailable();
                        }
                    });
                }
                catch (InterruptedException e) {
                    throw new CacheException((Throwable)e);
                }
                catch (TimeoutException e) {
                    throw new CacheException("Could not acquire lock for key: " + key + " in 10 seconds");
                }
            }
        }

        @Inject
        public void inject(LockManager manager) {
            this.lockManager = manager;
        }

        public static final class CacheEntryConsumerExternalizer
        implements Externalizer<CacheEntryConsumer> {
            public void writeObject(ObjectOutput output, CacheEntryConsumer object) throws IOException {
                output.writeObject(object.realConsumer);
                output.writeObject(object.predicate);
            }

            public CacheEntryConsumer readObject(ObjectInput input) throws IOException, ClassNotFoundException {
                return new CacheEntryConsumer((BiConsumer)input.readObject(), (Predicate)input.readObject());
            }
        }
    }
}

