/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.DoFnRunners;
import com.google.cloud.dataflow.sdk.util.PushbackSideInputDoFnRunner;
import com.google.cloud.dataflow.sdk.util.ReadyCheckingSideInputReader;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.UserCodeException;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.state.CopyOnAccessInMemoryStateInternals;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class ParDoInProcessEvaluator<T>
implements TransformEvaluator<T> {
    private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
    private final AppliedPTransform<PCollection<T>, ?, ?> transform;
    private final CounterSet counters;
    private final Collection<InProcessPipelineRunner.UncommittedBundle<?>> outputBundles;
    private final InProcessExecutionContext.InProcessStepContext stepContext;
    private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;

    public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create(InProcessEvaluationContext evaluationContext, InProcessPipelineRunner.CommittedBundle<InputT> inputBundle, AppliedPTransform<PCollection<InputT>, ?, ?> application, DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, Map<TupleTag<?>, PCollection<?>> outputs) {
        InProcessExecutionContext executionContext = evaluationContext.getExecutionContext(application, inputBundle.getKey());
        String stepName = evaluationContext.getStepName(application);
        InProcessExecutionContext.InProcessStepContext stepContext = (InProcessExecutionContext.InProcessStepContext)executionContext.getOrCreateStepContext(stepName, stepName, null);
        CounterSet counters = evaluationContext.createCounterSet();
        HashMap outputBundles = new HashMap();
        for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) {
            outputBundles.put(outputEntry.getKey(), evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
        }
        ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs);
        DoFnRunner<InputT, OutputT> underlying = DoFnRunners.createDefault(evaluationContext.getPipelineOptions(), SerializableUtils.clone(fn), sideInputReader, BundleOutputManager.create(outputBundles), mainOutputTag, sideOutputTags, stepContext, counters.getAddCounterMutator(), application.getInput().getWindowingStrategy());
        PushbackSideInputDoFnRunner<InputT, OutputT> runner = PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
        try {
            runner.startBundle();
        }
        catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
        return new ParDoInProcessEvaluator<InputT>(runner, application, counters, outputBundles.values(), stepContext);
    }

    private ParDoInProcessEvaluator(PushbackSideInputDoFnRunner<T, ?> fnRunner, AppliedPTransform<PCollection<T>, ?, ?> transform, CounterSet counters, Collection<InProcessPipelineRunner.UncommittedBundle<?>> outputBundles, InProcessExecutionContext.InProcessStepContext stepContext) {
        this.fnRunner = fnRunner;
        this.transform = transform;
        this.counters = counters;
        this.outputBundles = outputBundles;
        this.stepContext = stepContext;
        this.unprocessedElements = ImmutableList.builder();
    }

    @Override
    public void processElement(WindowedValue<T> element) {
        try {
            Iterable<WindowedValue<T>> unprocessed = this.fnRunner.processElementInReadyWindows(element);
            this.unprocessedElements.addAll((Iterable)unprocessed);
        }
        catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
    }

    @Override
    public InProcessTransformResult finishBundle() {
        try {
            this.fnRunner.finishBundle();
        }
        catch (Exception e) {
            throw UserCodeException.wrap(e);
        }
        CopyOnAccessInMemoryStateInternals<?> state = this.stepContext.commitState();
        StepTransformResult.Builder resultBuilder = state != null ? StepTransformResult.withHold(this.transform, state.getEarliestWatermarkHold()).withState(state) : StepTransformResult.withoutHold(this.transform);
        return resultBuilder.addOutput(this.outputBundles).withTimerUpdate(this.stepContext.getTimerUpdate()).withCounters(this.counters).addUnprocessedElements(this.unprocessedElements.build()).build();
    }

    static class BundleOutputManager
    implements DoFnRunners.OutputManager {
        private final Map<TupleTag<?>, InProcessPipelineRunner.UncommittedBundle<?>> bundles;
        private final Map<TupleTag<?>, List<?>> undeclaredOutputs;

        public static BundleOutputManager create(Map<TupleTag<?>, InProcessPipelineRunner.UncommittedBundle<?>> outputBundles) {
            return new BundleOutputManager(outputBundles);
        }

        private BundleOutputManager(Map<TupleTag<?>, InProcessPipelineRunner.UncommittedBundle<?>> bundles) {
            this.bundles = bundles;
            this.undeclaredOutputs = new HashMap();
        }

        @Override
        public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
            InProcessPipelineRunner.UncommittedBundle<?> bundle = this.bundles.get(tag);
            if (bundle == null) {
                List<?> undeclaredContents = this.undeclaredOutputs.get(tag);
                if (undeclaredContents == null) {
                    undeclaredContents = new ArrayList();
                    this.undeclaredOutputs.put(tag, undeclaredContents);
                }
                undeclaredContents.add(output);
            } else {
                bundle.add(output);
            }
        }
    }
}

