/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.runners.inprocess.EvaluatorKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.UnboundedReadDeduplicator;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class UnboundedReadEvaluatorFactory
implements TransformEvaluatorFactory {
    @VisibleForTesting
    static final int MAX_READER_REUSE_COUNT = 20;
    private final ConcurrentMap<EvaluatorKey, ConcurrentLinkedQueue<? extends UnboundedReadEvaluator<?, ?>>> sourceEvaluators = new ConcurrentHashMap();

    UnboundedReadEvaluatorFactory() {
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, @Nullable InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
        return this.getTransformEvaluator(application, evaluationContext);
    }

    private <OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> TransformEvaluator<?> getTransformEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> transform, InProcessEvaluationContext evaluationContext) {
        EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
        ConcurrentLinkedQueue evaluatorQueue = (ConcurrentLinkedQueue)this.sourceEvaluators.get(key);
        if (evaluatorQueue == null) {
            evaluatorQueue = new ConcurrentLinkedQueue();
            if (this.sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
                UnboundedSource<OutputT, ?> source = transform.getTransform().getSource();
                UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create();
                UnboundedReadEvaluator evaluator = new UnboundedReadEvaluator(transform, evaluationContext, source, deduplicator, evaluatorQueue);
                evaluatorQueue.offer(evaluator);
            } else {
                evaluatorQueue = (ConcurrentLinkedQueue)this.sourceEvaluators.get(key);
            }
        }
        return (TransformEvaluator)evaluatorQueue.poll();
    }

    private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements TransformEvaluator<Object> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> transform;
        private final InProcessEvaluationContext evaluationContext;
        private final ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue;
        private final UnboundedSource<OutputT, CheckpointMarkT> source;
        private final UnboundedReadDeduplicator deduplicator;
        private UnboundedSource.UnboundedReader<OutputT> currentReader;
        private CheckpointMarkT checkpointMark;
        private int outputBundles = 0;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Unbounded<OutputT>> transform, InProcessEvaluationContext evaluationContext, UnboundedSource<OutputT, CheckpointMarkT> source, UnboundedReadDeduplicator deduplicator, ConcurrentLinkedQueue<UnboundedReadEvaluator<OutputT, CheckpointMarkT>> evaluatorQueue) {
            this.transform = transform;
            this.evaluationContext = evaluationContext;
            this.evaluatorQueue = evaluatorQueue;
            this.source = source;
            this.currentReader = null;
            this.deduplicator = deduplicator;
            this.checkpointMark = null;
        }

        @Override
        public void processElement(WindowedValue<Object> element) {
        }

        @Override
        public InProcessTransformResult finishBundle() throws IOException {
            InProcessPipelineRunner.UncommittedBundle<OutputT> output = this.evaluationContext.createRootBundle(this.transform.getOutput());
            try {
                boolean elementAvailable = this.startReader();
                Instant watermark = this.currentReader.getWatermark();
                if (elementAvailable) {
                    int numElements = 0;
                    do {
                        if (!this.deduplicator.shouldOutput(this.currentReader.getCurrentRecordId())) continue;
                        output.add(WindowedValue.timestampedValueInGlobalWindow(this.currentReader.getCurrent(), this.currentReader.getCurrentTimestamp()));
                    } while (++numElements < 10 && this.currentReader.advance());
                    watermark = this.currentReader.getWatermark();
                    this.finishRead();
                }
                StepTransformResult result = StepTransformResult.withHold(this.transform, watermark).addOutput(output, new InProcessPipelineRunner.UncommittedBundle[0]).build();
                this.evaluatorQueue.offer(this);
                return result;
            }
            catch (IOException e) {
                this.closeReader();
                throw e;
            }
        }

        private boolean startReader() throws IOException {
            if (this.currentReader == null) {
                if (this.checkpointMark != null) {
                    this.checkpointMark.finalizeCheckpoint();
                }
                this.currentReader = this.source.createReader(this.evaluationContext.getPipelineOptions(), this.checkpointMark);
                this.checkpointMark = null;
                return this.currentReader.start();
            }
            return this.currentReader.advance();
        }

        private void finishRead() throws IOException {
            CheckpointMarkT oldMark = this.checkpointMark;
            final UnboundedSource.CheckpointMark mark = this.currentReader.getCheckpointMark();
            this.checkpointMark = mark;
            if (oldMark != null) {
                oldMark.finalizeCheckpoint();
            }
            if (!this.currentReader.getWatermark().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                this.evaluationContext.scheduleAfterOutputWouldBeProduced(this.transform.getOutput(), GlobalWindow.INSTANCE, this.transform.getOutput().getWindowingStrategy(), new Runnable(){

                    @Override
                    public void run() {
                        try {
                            mark.finalizeCheckpoint();
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                        }
                    }
                });
            }
            if (this.outputBundles >= 20) {
                this.closeReader();
                this.outputBundles = 0;
            } else {
                ++this.outputBundles;
            }
        }

        private void closeReader() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
                this.currentReader = null;
            }
        }
    }
}

