/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.runners.inprocess.ForwardingPTransform;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.PTransformOverrideFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.StructuralKey;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorFactory;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class GroupByKeyEvaluatorFactory
implements TransformEvaluatorFactory {
    GroupByKeyEvaluatorFactory() {
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
        TransformEvaluator evaluator = this.createEvaluator(application, inputBundle, evaluationContext);
        return evaluator;
    }

    private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(AppliedPTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, InProcessGroupByKeyOnly<K, V>> application, InProcessPipelineRunner.CommittedBundle<KV<K, V>> inputBundle, InProcessEvaluationContext evaluationContext) {
        return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application);
    }

    public static final class InProcessGroupByKeyOnly<K, V>
    extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
        @Override
        public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
            return PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
        }

        @VisibleForTesting
        InProcessGroupByKeyOnly() {
        }
    }

    private static final class InProcessGroupByKey<K, V>
    extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
        private final GroupByKey<K, V> original;

        private InProcessGroupByKey(GroupByKey<K, V> from) {
            this.original = from;
        }

        @Override
        public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
            return this.original;
        }

        @Override
        public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
            KvCoder inputCoder = (KvCoder)input.getCoder();
            WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
            DoFn groupAlsoByWindow = this.groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
            return ((PCollection)((Object)((PCollection)((PCollection)((PCollection)input.apply(new GroupByKey.ReifyTimestampsAndWindows())).apply(new InProcessGroupByKeyOnly())).setCoder((Coder)KeyedWorkItemCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))).apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow)))).setWindowingStrategyInternal(this.original.updateWindowingStrategy(windowingStrategy)).setCoder((Coder)KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
        }

        private <W extends BoundedWindow> DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(WindowingStrategy<?, W> windowingStrategy, Coder<V> inputCoder) {
            return GroupAlsoByWindowViaWindowSetDoFn.create(windowingStrategy, SystemReduceFn.buffering(inputCoder));
        }
    }

    public static final class InProcessGroupByKeyOverrideFactory
    implements PTransformOverrideFactory {
        @Override
        public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
            if (transform instanceof GroupByKey) {
                InProcessGroupByKey override = new InProcessGroupByKey((GroupByKey)transform);
                return override;
            }
            return transform;
        }
    }

    private static class GroupByKeyEvaluator<K, V>
    implements TransformEvaluator<KV<K, WindowedValue<V>>> {
        private final InProcessEvaluationContext evaluationContext;
        private final InProcessPipelineRunner.CommittedBundle<KV<K, V>> inputBundle;
        private final AppliedPTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, InProcessGroupByKeyOnly<K, V>> application;
        private final Coder<K> keyCoder;
        private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;

        public GroupByKeyEvaluator(InProcessEvaluationContext evaluationContext, InProcessPipelineRunner.CommittedBundle<KV<K, V>> inputBundle, AppliedPTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>, InProcessGroupByKeyOnly<K, V>> application) {
            this.evaluationContext = evaluationContext;
            this.inputBundle = inputBundle;
            this.application = application;
            PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
            this.keyCoder = this.getKeyCoder(input.getCoder());
            this.groupingMap = new HashMap<GroupingKey<K>, List<WindowedValue<V>>>();
        }

        private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
            if (!(coder instanceof KvCoder)) {
                throw new IllegalStateException();
            }
            Coder keyCoder = ((KvCoder)coder).getKeyCoder();
            return keyCoder;
        }

        @Override
        public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
            byte[] encodedKey;
            KV<K, WindowedValue<V>> kv = element.getValue();
            K key = kv.getKey();
            try {
                encodedKey = CoderUtils.encodeToByteArray(this.keyCoder, key);
            }
            catch (CoderException exn) {
                throw new IllegalArgumentException(String.format("unable to encode key %s of input to %s using %s", key, this, this.keyCoder), exn);
            }
            GroupingKey<K> groupingKey = new GroupingKey<K>(key, encodedKey);
            List<WindowedValue<V>> values = this.groupingMap.get(groupingKey);
            if (values == null) {
                values = new ArrayList<WindowedValue<V>>();
                this.groupingMap.put(groupingKey, values);
            }
            values.add(kv.getValue());
        }

        @Override
        public InProcessTransformResult finishBundle() {
            StepTransformResult.Builder resultBuilder = StepTransformResult.withoutHold(this.application);
            for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry : this.groupingMap.entrySet()) {
                Object key = ((GroupingKey)groupedEntry.getKey()).key;
                KeyedWorkItem groupedKv = KeyedWorkItems.elementsWorkItem(key, (Iterable)groupedEntry.getValue());
                InProcessPipelineRunner.UncommittedBundle<KeyedWorkItem<K, V>> bundle = this.evaluationContext.createKeyedBundle(this.inputBundle, StructuralKey.of(key, this.keyCoder), this.application.getOutput());
                bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
                resultBuilder.addOutput(bundle, new InProcessPipelineRunner.UncommittedBundle[0]);
            }
            return resultBuilder.build();
        }

        private static class GroupingKey<K> {
            private K key;
            private byte[] encodedKey;

            public GroupingKey(K key, byte[] encodedKey) {
                this.key = key;
                this.encodedKey = encodedKey;
            }

            public boolean equals(Object o) {
                if (o instanceof GroupingKey) {
                    GroupingKey that = (GroupingKey)o;
                    return Arrays.equals(this.encodedKey, that.encodedKey);
                }
                return false;
            }

            public int hashCode() {
                return Arrays.hashCode(this.encodedKey);
            }
        }
    }
}

