/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util.state;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.state.AccumulatorCombiningState;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.CombiningState;
import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
import com.google.cloud.dataflow.sdk.util.state.ReadableState;
import com.google.cloud.dataflow.sdk.util.state.State;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class StateMerging {
    public static <K, StateT extends State, W extends BoundedWindow> void clear(MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) {
        for (State state : context.accessInEachMergingWindow(address).values()) {
            state.clear();
        }
    }

    public static <K, T, W extends BoundedWindow> void prefetchBags(MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
        Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
        if (map.isEmpty()) {
            return;
        }
        BagState<T> result = context.access(address);
        for (BagState<T> source : map.values()) {
            if (source.equals(result)) continue;
            source.readLater();
        }
    }

    public static <K, T, W extends BoundedWindow> void mergeBags(MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> address) {
        StateMerging.mergeBags(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <T, W extends BoundedWindow> void mergeBags(Collection<BagState<T>> sources, BagState<T> result) {
        if (sources.isEmpty()) {
            return;
        }
        ArrayList<BagState<T>> futures = new ArrayList<BagState<T>>(sources.size());
        for (BagState<T> bagState : sources) {
            if (bagState.equals(result)) continue;
            bagState.readLater();
            futures.add(bagState);
        }
        if (futures.isEmpty()) {
            return;
        }
        for (ReadableState readableState : futures) {
            for (Object element : (Iterable)readableState.read()) {
                result.add(element);
            }
        }
        for (BagState<Object> bagState : sources) {
            if (bagState.equals(result)) continue;
            bagState.clear();
        }
    }

    public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void prefetchCombiningValues(MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) {
        for (CombiningState state : context.accessInEachMergingWindow(address).values()) {
            state.readLater();
        }
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(MergingStateAccessor<K, W> context, StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address) {
        StateMerging.mergeCombiningValues(context.accessInEachMergingWindow(address).values(), context.access(address));
    }

    public static <InputT, AccumT, OutputT, W extends BoundedWindow> void mergeCombiningValues(Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources, AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
        if (sources.isEmpty()) {
            return;
        }
        if (sources.size() == 1 && sources.contains(result)) {
            return;
        }
        ArrayList futures = new ArrayList(sources.size());
        for (AccumulatorCombiningState<InputT, AccumT, OutputT> accumulatorCombiningState : sources) {
            accumulatorCombiningState.readLater();
        }
        ArrayList<AccumT> accumulators = new ArrayList<AccumT>(futures.size());
        for (AccumulatorCombiningState<InputT, AccumT, OutputT> accumulatorCombiningState : sources) {
            accumulators.add(accumulatorCombiningState.getAccum());
        }
        Object e = result.mergeAccumulators(accumulators);
        for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
            source.clear();
        }
        result.addAccum(e);
    }

    public static <K, W extends BoundedWindow> void prefetchWatermarks(MergingStateAccessor<K, W> context, StateTag<? super K, WatermarkHoldState<W>> address) {
        Map<W, WatermarkHoldState<W>> map = context.accessInEachMergingWindow(address);
        WatermarkHoldState<W> result = context.access(address);
        if (map.isEmpty()) {
            return;
        }
        if (map.size() == 1 && map.values().contains(result) && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
            return;
        }
        if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
            return;
        }
        for (WatermarkHoldState<W> source : map.values()) {
            source.readLater();
        }
    }

    public static <K, W extends BoundedWindow> void mergeWatermarks(MergingStateAccessor<K, W> context, StateTag<? super K, WatermarkHoldState<W>> address, W mergeResult) {
        StateMerging.mergeWatermarks(context.accessInEachMergingWindow(address).values(), context.access(address), mergeResult);
    }

    public static <W extends BoundedWindow> void mergeWatermarks(Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result, W resultWindow) {
        if (sources.isEmpty()) {
            return;
        }
        if (sources.size() == 1 && sources.contains(result) && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
            return;
        }
        if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
            for (WatermarkHoldState<W> source : sources) {
                source.clear();
            }
            Instant hold = result.getOutputTimeFn().assignOutputTime(BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
            Preconditions.checkState(hold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MIN_VALUE));
            result.add(hold);
        } else {
            ArrayList<WatermarkHoldState<W>> futures = new ArrayList<WatermarkHoldState<W>>(sources.size());
            for (WatermarkHoldState<W> source : sources) {
                futures.add(source);
            }
            ArrayList<Instant> outputTimesToMerge = new ArrayList<Instant>(sources.size());
            for (ReadableState readableState : futures) {
                Instant sourceOutputTime = (Instant)readableState.read();
                if (sourceOutputTime == null) continue;
                outputTimesToMerge.add(sourceOutputTime);
            }
            for (WatermarkHoldState watermarkHoldState : sources) {
                watermarkHoldState.clear();
            }
            if (!outputTimesToMerge.isEmpty()) {
                result.add(result.getOutputTimeFn().merge(resultWindow, outputTimesToMerge));
            }
        }
    }
}

