/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.server;

import io.netty.channel.Channel;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.server.StreamManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.base.Preconditions;

public class OneForOneStreamManager
extends StreamManager {
    private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class);
    private final AtomicLong nextStreamId = new AtomicLong((long)new Random().nextInt(Integer.MAX_VALUE) * 1000L);
    private final ConcurrentHashMap<Long, StreamState> streams = new ConcurrentHashMap();

    @Override
    public void registerChannel(Channel channel, long streamId) {
        if (this.streams.containsKey(streamId)) {
            this.streams.get((Object)Long.valueOf((long)streamId)).associatedChannel = channel;
        }
    }

    @Override
    public ManagedBuffer getChunk(long streamId, int chunkIndex) {
        StreamState state = this.streams.get(streamId);
        if (chunkIndex != state.curChunk) {
            throw new IllegalStateException(String.format("Received out-of-order chunk index %s (expected %s)", chunkIndex, state.curChunk));
        }
        if (!state.buffers.hasNext()) {
            throw new IllegalStateException(String.format("Requested chunk index beyond end %s", chunkIndex));
        }
        ++state.curChunk;
        ManagedBuffer nextChunk = state.buffers.next();
        if (!state.buffers.hasNext()) {
            this.logger.trace("Removing stream id {}", (Object)streamId);
            this.streams.remove(streamId);
        }
        return nextChunk;
    }

    @Override
    public void connectionTerminated(Channel channel) {
        for (Map.Entry<Long, StreamState> entry : this.streams.entrySet()) {
            StreamState state = entry.getValue();
            if (state.associatedChannel != channel) continue;
            this.streams.remove(entry.getKey());
            while (state.buffers.hasNext()) {
                state.buffers.next().release();
            }
        }
    }

    public long registerStream(Iterator<ManagedBuffer> buffers) {
        long myStreamId = this.nextStreamId.getAndIncrement();
        this.streams.put(myStreamId, new StreamState(buffers));
        return myStreamId;
    }

    private static class StreamState {
        final Iterator<ManagedBuffer> buffers;
        Channel associatedChannel = null;
        int curChunk = 0;

        StreamState(Iterator<ManagedBuffer> buffers) {
            this.buffers = Preconditions.checkNotNull(buffers);
        }
    }
}

