/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.ReduceFn;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowTracing;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
import com.google.cloud.dataflow.sdk.util.state.ReadableState;
import com.google.cloud.dataflow.sdk.util.state.StateMerging;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;
import com.google.cloud.dataflow.sdk.util.state.WatermarkHoldState;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class WatermarkHold<W extends BoundedWindow>
implements Serializable {
    @VisibleForTesting
    public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("extra", OutputTimeFns.outputAtEarliestInputTimestamp()));
    private final TimerInternals timerInternals;
    private final WindowingStrategy<?, W> windowingStrategy;
    private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag;

    public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn(OutputTimeFn<? super W> outputTimeFn) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", outputTimeFn));
    }

    public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
        this.timerInternals = timerInternals;
        this.windowingStrategy = windowingStrategy;
        this.elementHoldTag = WatermarkHold.watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn());
    }

    @Nullable
    public Instant addHolds(ReduceFn.ProcessValueContext context) {
        Instant hold = this.addElementHold(context);
        if (hold == null) {
            hold = this.addEndOfWindowOrGarbageCollectionHolds(context, false);
        }
        return hold;
    }

    private Instant shift(Instant timestamp, W window) {
        Instant shifted = this.windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
        Preconditions.checkState(!shifted.isBefore((ReadableInstant)timestamp), "OutputTimeFn moved element from %s to earlier time %s for window %s", timestamp, shifted, window);
        Preconditions.checkState(timestamp.isAfter((ReadableInstant)((BoundedWindow)window).maxTimestamp()) || !shifted.isAfter((ReadableInstant)((BoundedWindow)window).maxTimestamp()), "OutputTimeFn moved element from %s to %s which is beyond end of window %s", timestamp, shifted, window);
        return shifted;
    }

    @Nullable
    private Instant addElementHold(ReduceFn.ProcessValueContext context) {
        boolean tooLate;
        String which;
        Instant elementHold = this.shift(context.timestamp(), context.window());
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        if (outputWM != null && elementHold.isBefore((ReadableInstant)outputWM)) {
            which = "too late to effect output watermark";
            tooLate = true;
        } else if (((BoundedWindow)context.window()).maxTimestamp().isBefore((ReadableInstant)inputWM)) {
            which = "too late for end-of-window timer";
            tooLate = true;
        } else {
            which = "on time";
            tooLate = false;
            Preconditions.checkState(!elementHold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Element hold %s is beyond end-of-time", elementHold);
            context.state().access(this.elementHoldTag).add(elementHold);
        }
        WindowTracing.trace("WatermarkHold.addHolds: element hold at {} is {} for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", elementHold, which, context.key(), context.window(), inputWM, outputWM);
        return tooLate ? null : elementHold;
    }

    @Nullable
    private Instant addEndOfWindowOrGarbageCollectionHolds(ReduceFn.Context context, boolean paneIsEmpty) {
        Instant hold = this.addEndOfWindowHold(context, paneIsEmpty);
        if (hold == null) {
            hold = this.addGarbageCollectionHold(context, paneIsEmpty);
        }
        return hold;
    }

    @Nullable
    private Instant addEndOfWindowHold(ReduceFn.Context context, boolean paneIsEmpty) {
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        Instant eowHold = ((BoundedWindow)context.window()).maxTimestamp();
        if (eowHold.isBefore((ReadableInstant)inputWM)) {
            WindowTracing.trace("WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", eowHold, context.key(), context.window(), inputWM, outputWM);
            return null;
        }
        Preconditions.checkState(outputWM == null || !eowHold.isBefore((ReadableInstant)outputWM), "End-of-window hold %s cannot be before output watermark %s", eowHold, outputWM);
        Preconditions.checkState(!eowHold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "End-of-window hold %s is beyond end-of-time", eowHold);
        context.state().access(EXTRA_HOLD_TAG).add(eowHold);
        WindowTracing.trace("WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", eowHold, context.key(), context.window(), inputWM, outputWM);
        return eowHold;
    }

    @Nullable
    private Instant addGarbageCollectionHold(ReduceFn.Context context, boolean paneIsEmpty) {
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        Instant eow = ((BoundedWindow)context.window()).maxTimestamp();
        Instant gcHold = eow.plus((ReadableDuration)this.windowingStrategy.getAllowedLateness());
        if (!this.windowingStrategy.getAllowedLateness().isLongerThan((ReadableDuration)Duration.ZERO)) {
            WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since no allowed lateness for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM);
            return null;
        }
        if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_IF_NON_EMPTY) {
            WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM);
            return null;
        }
        Preconditions.checkState(!gcHold.isBefore((ReadableInstant)inputWM), "Garbage collection hold %s cannot be before input watermark %s", gcHold, inputWM);
        Preconditions.checkState(!gcHold.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Garbage collection hold %s is beyond end-of-time", gcHold);
        context.state().access(EXTRA_HOLD_TAG).add(gcHold);
        WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM);
        return gcHold;
    }

    public void prefetchOnMerge(MergingStateAccessor<?, W> state) {
        StateMerging.prefetchWatermarks(state, this.elementHoldTag);
    }

    public void onMerge(ReduceFn.OnMergeContext context) {
        WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        StateMerging.mergeWatermarks(context.state(), this.elementHoldTag, context.window());
        StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
        this.addEndOfWindowOrGarbageCollectionHolds(context, false);
    }

    public ReadableState<OldAndNewHolds> extractAndRelease(final ReduceFn.Context context, final boolean isFinished) {
        WindowTracing.debug("WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        final WatermarkHoldState<W> elementHoldState = context.state().access(this.elementHoldTag);
        final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG);
        return new ReadableState<OldAndNewHolds>(){

            @Override
            public ReadableState<OldAndNewHolds> readLater() {
                elementHoldState.readLater();
                extraHoldState.readLater();
                return this;
            }

            @Override
            public OldAndNewHolds read() {
                Instant elementHold = (Instant)elementHoldState.read();
                Instant extraHold = (Instant)extraHoldState.read();
                Instant oldHold = elementHold == null ? extraHold : (extraHold == null ? elementHold : (elementHold.isBefore((ReadableInstant)extraHold) ? elementHold : extraHold));
                if (oldHold == null || oldHold.isAfter((ReadableInstant)((BoundedWindow)context.window()).maxTimestamp())) {
                    WindowTracing.debug("WatermarkHold.extractAndRelease.read: clipping from {} to end of window for key:{}; window:{}", oldHold, context.key(), context.window());
                    oldHold = ((BoundedWindow)context.window()).maxTimestamp();
                }
                WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window());
                elementHoldState.clear();
                extraHoldState.clear();
                Instant newHold = null;
                if (!isFinished) {
                    newHold = WatermarkHold.this.addEndOfWindowOrGarbageCollectionHolds(context, true);
                }
                return new OldAndNewHolds(oldHold, newHold);
            }
        };
    }

    public void clearHolds(ReduceFn.Context context) {
        WindowTracing.debug("WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        context.state().access(this.elementHoldTag).clear();
        context.state().access(EXTRA_HOLD_TAG).clear();
    }

    @Nullable
    public Instant getDataCurrent(ReduceFn.Context context) {
        return (Instant)context.state().access(this.elementHoldTag).read();
    }

    public static class OldAndNewHolds {
        public final Instant oldHold;
        @Nullable
        public final Instant newHold;

        public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
            this.oldHold = oldHold;
            this.newHold = newHold;
        }
    }
}

