/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.io.Write;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.runners.inprocess.PTransformOverrideFactory;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Values;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import java.util.concurrent.ThreadLocalRandom;
import org.joda.time.Duration;

class WriteWithShardingFactory
implements PTransformOverrideFactory {
    static final int MAX_RANDOM_EXTRA_SHARDS = 3;

    WriteWithShardingFactory() {
    }

    @Override
    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(PTransform<InputT, OutputT> transform) {
        Write.Bound that;
        if (transform instanceof Write.Bound && (that = (Write.Bound)transform).getNumShards() == 0) {
            return new DynamicallyReshardedWrite(that);
        }
        return transform;
    }

    @VisibleForTesting
    static class KeyBasedOnCountFn<T>
    extends DoFn<T, KV<Integer, T>> {
        @VisibleForTesting
        static final int MIN_SHARDS_FOR_LOG = 3;
        private final PCollectionView<Long> numRecords;
        private final int randomExtraShards;
        private int currentShard;
        private int maxShards;

        KeyBasedOnCountFn(PCollectionView<Long> numRecords, int extraShards) {
            this.numRecords = numRecords;
            this.randomExtraShards = extraShards;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            if ((long)this.maxShards == 0L) {
                this.maxShards = this.calculateShards(c.sideInput(this.numRecords));
                this.currentShard = ThreadLocalRandom.current().nextInt(this.maxShards);
            }
            int shard = this.currentShard;
            this.currentShard = (this.currentShard + 1) % this.maxShards;
            c.output(KV.of(shard, c.element()));
        }

        private int calculateShards(long totalRecords) {
            Preconditions.checkArgument(totalRecords > 0L, "KeyBasedOnCountFn cannot be invoked on an element if there are no elements");
            if (totalRecords < (long)(3 + this.randomExtraShards)) {
                return (int)totalRecords;
            }
            int floorLogRecs = Double.valueOf(Math.log10(totalRecords)).intValue();
            int shards = Math.max(floorLogRecs, 3) + this.randomExtraShards;
            return shards;
        }
    }

    private static class DynamicallyReshardedWrite<T>
    extends PTransform<PCollection<T>, PDone> {
        private final transient Write.Bound<T> original;

        private DynamicallyReshardedWrite(Write.Bound<T> original) {
            this.original = original;
        }

        @Override
        public PDone apply(PCollection<T> input) {
            PCollection records = (PCollection)input.apply("RewindowInputs", Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).withAllowedLateness(Duration.ZERO).discardingFiredPanes());
            PCollectionView numRecords = (PCollectionView)((Object)records.apply(Count.globally().asSingletonView()));
            PCollection resharded = (PCollection)((PCollection)((PCollection)((PCollection)records.apply("ApplySharding", ParDo.withSideInputs(numRecords).of(new KeyBasedOnCountFn(numRecords, ThreadLocalRandom.current().nextInt(3))))).apply("GroupIntoShards", GroupByKey.create())).apply("DropShardingKeys", Values.create())).apply("FlattenShardIterables", Flatten.iterables());
            return this.original.apply(resharded);
        }
    }
}

