/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.runners.inprocess.EvaluatorKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorFactory;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;

final class BoundedReadEvaluatorFactory
implements TransformEvaluatorFactory {
    private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>> sourceEvaluators = new ConcurrentHashMap();

    BoundedReadEvaluatorFactory() {
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, @Nullable InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) throws IOException {
        return this.getTransformEvaluator(application, evaluationContext);
    }

    private <OutputT> BoundedReadEvaluator<OutputT> getTransformEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> transform, InProcessEvaluationContext evaluationContext) {
        EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
        Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue = (ConcurrentLinkedQueue)this.sourceEvaluators.get(key);
        if (evaluatorQueue == null) {
            evaluatorQueue = new ConcurrentLinkedQueue();
            if (this.sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
                BoundedSource<OutputT> source = transform.getTransform().getSource();
                BoundedReadEvaluator<OutputT> evaluator = new BoundedReadEvaluator<OutputT>(transform, evaluationContext, source);
                evaluatorQueue.offer(evaluator);
            } else {
                evaluatorQueue = (Queue)this.sourceEvaluators.get(key);
            }
        }
        return (BoundedReadEvaluator)evaluatorQueue.poll();
    }

    private static class BoundedReadEvaluator<OutputT>
    implements TransformEvaluator<Object> {
        private final AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> transform;
        private final InProcessEvaluationContext evaluationContext;
        private BoundedSource<OutputT> source;

        public BoundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, Read.Bounded<OutputT>> transform, InProcessEvaluationContext evaluationContext, BoundedSource<OutputT> source) {
            this.transform = transform;
            this.evaluationContext = evaluationContext;
            this.source = source;
        }

        @Override
        public void processElement(WindowedValue<Object> element) {
        }

        @Override
        public InProcessTransformResult finishBundle() throws IOException {
            try (BoundedSource.BoundedReader<OutputT> reader = this.source.createReader(this.evaluationContext.getPipelineOptions());){
                boolean contentsRemaining = reader.start();
                InProcessPipelineRunner.UncommittedBundle<OutputT> output = this.evaluationContext.createRootBundle(this.transform.getOutput());
                while (contentsRemaining) {
                    output.add(WindowedValue.timestampedValueInGlobalWindow(reader.getCurrent(), reader.getCurrentTimestamp()));
                    contentsRemaining = reader.advance();
                }
                StepTransformResult stepTransformResult = StepTransformResult.withHold(this.transform, BoundedWindow.TIMESTAMP_MAX_VALUE).addOutput(output, new InProcessPipelineRunner.UncommittedBundle[0]).build();
                return stepTransformResult;
            }
        }
    }
}

