/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableMap;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.ActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.MergingActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.NonEmptyPanes;
import com.google.cloud.dataflow.sdk.util.NonMergingActiveWindowSet;
import com.google.cloud.dataflow.sdk.util.PaneInfoTracker;
import com.google.cloud.dataflow.sdk.util.ReduceFn;
import com.google.cloud.dataflow.sdk.util.ReduceFnContextFactory;
import com.google.cloud.dataflow.sdk.util.TimeDomain;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.TriggerContextFactory;
import com.google.cloud.dataflow.sdk.util.TriggerRunner;
import com.google.cloud.dataflow.sdk.util.WatermarkHold;
import com.google.cloud.dataflow.sdk.util.WindowTracing;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.ReadableState;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.util.state.StateNamespaces;
import com.google.cloud.dataflow.sdk.values.KV;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
    private final WindowingStrategy<Object, W> windowingStrategy;
    private final OutputWindowedValue<KV<K, OutputT>> outputter;
    private final StateInternals<K> stateInternals;
    private final Aggregator<Long, Long> droppedDueToClosedWindow;
    private final K key;
    private final ActiveWindowSet<W> activeWindows;
    private final ReduceFn<K, InputT, OutputT, W> reduceFn;
    private final TimerInternals timerInternals;
    private final TriggerRunner<W> triggerRunner;
    private final WatermarkHold<W> watermarkHold;
    private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
    private final PaneInfoTracker paneInfoTracker;
    private final NonEmptyPanes<K, W> nonEmptyPanes;

    public ReduceFnRunner(K key, WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals, TimerInternals timerInternals, WindowingInternals<?, KV<K, OutputT>> windowingInternals, Aggregator<Long, Long> droppedDueToClosedWindow, ReduceFn<K, InputT, OutputT, W> reduceFn, PipelineOptions options) {
        this.key = key;
        this.timerInternals = timerInternals;
        this.paneInfoTracker = new PaneInfoTracker(timerInternals);
        this.stateInternals = stateInternals;
        this.outputter = new OutputViaWindowingInternals<KV<K, KV<K, OutputT>>>(windowingInternals);
        this.droppedDueToClosedWindow = droppedDueToClosedWindow;
        this.reduceFn = reduceFn;
        WindowingStrategy<?, W> objectWindowingStrategy = windowingStrategy;
        this.windowingStrategy = objectWindowingStrategy;
        this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
        this.activeWindows = this.createActiveWindowSet();
        this.contextFactory = new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy, stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
        this.watermarkHold = new WatermarkHold<W>(timerInternals, windowingStrategy);
        this.triggerRunner = new TriggerRunner<W>(windowingStrategy.getTrigger(), new TriggerContextFactory<W>(windowingStrategy, stateInternals, this.activeWindows));
    }

    private ActiveWindowSet<W> createActiveWindowSet() {
        return this.windowingStrategy.getWindowFn().isNonMerging() ? new NonMergingActiveWindowSet() : new MergingActiveWindowSet<W>(this.windowingStrategy.getWindowFn(), this.stateInternals);
    }

    @VisibleForTesting
    boolean isFinished(W window) {
        return this.triggerRunner.isClosed(this.contextFactory.base(window, ReduceFnContextFactory.StateStyle.DIRECT).state());
    }

    @VisibleForTesting
    boolean hasNoActiveWindows() {
        return this.activeWindows.getActiveAndNewWindows().isEmpty();
    }

    public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
        Map<W, W> windowToMergeResult = this.collectAndMergeWindows(values);
        HashSet<W> windowsToConsider = new HashSet<W>();
        for (WindowedValue<InputT> value : values) {
            windowsToConsider.addAll(this.processElement(windowToMergeResult, value));
        }
        for (BoundedWindow mergedWindow : windowsToConsider) {
            ReduceFn.Context directContext = this.contextFactory.base(mergedWindow, ReduceFnContextFactory.StateStyle.DIRECT);
            ReduceFn.Context renamedContext = this.contextFactory.base(mergedWindow, ReduceFnContextFactory.StateStyle.RENAMED);
            this.triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
            this.emitIfAppropriate(directContext, renamedContext);
        }
        this.activeWindows.cleanupTemporaryWindows();
    }

    public void persist() {
        this.activeWindows.persist();
    }

    private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values) throws Exception {
        if (this.windowingStrategy.getWindowFn().isNonMerging()) {
            return ImmutableMap.of();
        }
        for (WindowedValue<InputT> value : values) {
            for (BoundedWindow untypedWindow : value.getWindows()) {
                Set<BoundedWindow> stateAddressWindows;
                BoundedWindow window = untypedWindow;
                if (this.activeWindows.isActive(window) && (stateAddressWindows = this.activeWindows.readStateAddresses(window)).size() > 1) {
                    ReduceFn.OnMergeContext premergeContext = this.contextFactory.forPremerge(window);
                    this.reduceFn.onMerge(premergeContext);
                    this.watermarkHold.onMerge(premergeContext);
                    this.activeWindows.merged(window);
                }
                this.activeWindows.ensureWindowExists(window);
            }
        }
        HashMap windowToMergeResult = new HashMap();
        this.activeWindows.merge(new OnMergeCallback(windowToMergeResult));
        return windowToMergeResult;
    }

    private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value) throws Exception {
        ArrayList<BoundedWindow> windows = new ArrayList<BoundedWindow>();
        for (BoundedWindow untypedWindow : value.getWindows()) {
            BoundedWindow window = untypedWindow;
            BoundedWindow mergeResult = (BoundedWindow)windowToMergeResult.get(window);
            if (mergeResult == null) {
                mergeResult = window;
            }
            windows.add(mergeResult);
        }
        for (BoundedWindow window : windows) {
            ReduceFn.ProcessValueContext directContext = this.contextFactory.forValue(window, value.getValue(), value.getTimestamp(), ReduceFnContextFactory.StateStyle.DIRECT);
            this.triggerRunner.prefetchForValue(window, directContext.state());
        }
        ArrayList<BoundedWindow> triggerableWindows = new ArrayList<BoundedWindow>(windows.size());
        for (BoundedWindow window : windows) {
            ReduceFn.ProcessValueContext directContext = this.contextFactory.forValue(window, value.getValue(), value.getTimestamp(), ReduceFnContextFactory.StateStyle.DIRECT);
            if (this.triggerRunner.isClosed(directContext.state())) {
                this.droppedDueToClosedWindow.addValue(1L);
                WindowTracing.debug("ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} since window is no longer active at inputWatermark:{}; outputWatermark:{}", value.getTimestamp(), this.key, window, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
                continue;
            }
            triggerableWindows.add(window);
            this.activeWindows.ensureWindowIsActive(window);
            ReduceFn.ProcessValueContext renamedContext = this.contextFactory.forValue(window, value.getValue(), value.getTimestamp(), ReduceFnContextFactory.StateStyle.RENAMED);
            this.nonEmptyPanes.recordContent(renamedContext.state());
            Instant timer = this.scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
            Instant hold = this.watermarkHold.addHolds(renamedContext);
            if (hold != null) {
                boolean holdInWindow = !hold.isAfter((ReadableInstant)window.maxTimestamp());
                boolean timerInWindow = !timer.isAfter((ReadableInstant)window.maxTimestamp());
                Preconditions.checkState(holdInWindow == timerInWindow, "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s", hold, timer, directContext.window());
            }
            this.reduceFn.processValue(renamedContext);
            this.triggerRunner.processValue(directContext.window(), directContext.timestamp(), directContext.timers(), directContext.state());
        }
        return triggerableWindows;
    }

    public void onTimer(TimerInternals.TimerData timer) throws Exception {
        boolean isGarbageCollection;
        boolean windowIsActiveAndOpen;
        Preconditions.checkArgument(timer.getNamespace() instanceof StateNamespaces.WindowNamespace, "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
        StateNamespaces.WindowNamespace windowNamespace = (StateNamespaces.WindowNamespace)timer.getNamespace();
        Object window = windowNamespace.getWindow();
        ReduceFn.Context directContext = this.contextFactory.base(window, ReduceFnContextFactory.StateStyle.DIRECT);
        ReduceFn.Context renamedContext = this.contextFactory.base(window, ReduceFnContextFactory.StateStyle.RENAMED);
        boolean bl = windowIsActiveAndOpen = this.activeWindows.isActive(window) && !this.triggerRunner.isClosed(directContext.state());
        if (!windowIsActiveAndOpen) {
            WindowTracing.debug("ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
        }
        boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals((Object)((BoundedWindow)window).maxTimestamp());
        Instant cleanupTime = this.garbageCollectionTime(window);
        boolean bl2 = isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain() && !timer.getTimestamp().isBefore((ReadableInstant)cleanupTime);
        if (isGarbageCollection) {
            WindowTracing.debug("ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", this.key, window, timer.getTimestamp(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
            if (windowIsActiveAndOpen) {
                Instant newHold = this.onTrigger(directContext, renamedContext, true, isEndOfWindow);
                Preconditions.checkState(newHold == null, "Hold placed at %s despite isFinished being true.", newHold);
            }
            this.clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
        } else {
            WindowTracing.debug("ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", this.key, window, timer.getTimestamp(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
            if (windowIsActiveAndOpen) {
                this.emitIfAppropriate(directContext, renamedContext);
            }
            if (isEndOfWindow) {
                Preconditions.checkState(this.windowingStrategy.getAllowedLateness().isLongerThan((ReadableDuration)Duration.ZERO), "Unexpected zero getAllowedLateness");
                WindowTracing.debug("ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with inputWatermark:{}; outputWatermark:{}", this.key, directContext.window(), cleanupTime, this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
                Preconditions.checkState(!cleanupTime.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Cleanup time %s is beyond end-of-time", cleanupTime);
                directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
            }
        }
    }

    private void clearAllState(ReduceFn.Context directContext, ReduceFn.Context renamedContext, boolean windowIsActiveAndOpen) throws Exception {
        if (windowIsActiveAndOpen) {
            this.reduceFn.clearState(renamedContext);
            this.watermarkHold.clearHolds(renamedContext);
            this.nonEmptyPanes.clearPane(renamedContext.state());
            this.triggerRunner.clearState(directContext.window(), directContext.timers(), directContext.state());
            this.paneInfoTracker.clear(directContext.state());
        } else if (this.windowingStrategy.getMode() == WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES && !this.windowingStrategy.getWindowFn().isNonMerging()) {
            this.watermarkHold.clearHolds(directContext);
        }
        this.activeWindows.remove(directContext.window());
        this.triggerRunner.clearFinished(directContext.state());
    }

    private boolean shouldDiscardAfterFiring(boolean isFinished) {
        if (isFinished) {
            return true;
        }
        return this.windowingStrategy.getMode() == WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES;
    }

    private void emitIfAppropriate(ReduceFn.Context directContext, ReduceFn.Context renamedContext) throws Exception {
        if (!this.triggerRunner.shouldFire(directContext.window(), directContext.timers(), directContext.state())) {
            return;
        }
        this.triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
        boolean isFinished = this.triggerRunner.isClosed(directContext.state());
        boolean shouldDiscard = this.shouldDiscardAfterFiring(isFinished);
        this.onTrigger(directContext, renamedContext, isFinished, false);
        this.nonEmptyPanes.clearPane(renamedContext.state());
        if (shouldDiscard) {
            this.reduceFn.clearState(renamedContext);
        }
        if (isFinished) {
            this.triggerRunner.clearState(directContext.window(), directContext.timers(), directContext.state());
            this.paneInfoTracker.clear(directContext.state());
            this.activeWindows.remove(directContext.window());
        }
    }

    private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
        if (!isEmpty) {
            return true;
        }
        if (timing == PaneInfo.Timing.ON_TIME) {
            return true;
        }
        return isFinished && this.windowingStrategy.getClosingBehavior() == Window.ClosingBehavior.FIRE_ALWAYS;
    }

    @Nullable
    private Instant onTrigger(final ReduceFn.Context directContext, ReduceFn.Context renamedContext, boolean isFinished, boolean isEndOfWindow) throws Exception {
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture = this.watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
        ReadableState<PaneInfo> paneFuture = this.paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
        ReadableState<Boolean> isEmptyFuture = this.nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
        this.reduceFn.prefetchOnTrigger(directContext.state());
        this.triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
        final PaneInfo pane = paneFuture.read();
        WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
        final Instant outputTimestamp = pair.oldHold;
        Instant newHold = pair.newHold;
        if (newHold != null) {
            Preconditions.checkState(!isFinished, "new hold at %s but finished %s", newHold, directContext.window());
            Preconditions.checkState(!newHold.isBefore((ReadableInstant)inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
            if (newHold.isAfter((ReadableInstant)((BoundedWindow)directContext.window()).maxTimestamp())) {
                Preconditions.checkState(newHold.isEqual((ReadableInstant)this.garbageCollectionTime(directContext.window())), "new hold %s should be at garbage collection for window %s plus %s", newHold, directContext.window(), this.windowingStrategy.getAllowedLateness());
            } else {
                Preconditions.checkState(newHold.isEqual((ReadableInstant)((BoundedWindow)directContext.window()).maxTimestamp()), "new hold %s should be at end of window %s", newHold, directContext.window());
                Preconditions.checkState(!isEndOfWindow, "new hold at %s for %s but this is the watermark trigger", newHold, directContext.window());
            }
        }
        if (this.needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
            final List windows = Collections.singletonList(directContext.window());
            ReduceFn.OnTriggerContext renamedTriggerContext = this.contextFactory.forTrigger(directContext.window(), paneFuture, ReduceFnContextFactory.StateStyle.RENAMED, new ReduceFnContextFactory.OnTriggerCallbacks<OutputT>(){

                @Override
                public void output(OutputT toOutput) {
                    ReduceFnRunner.this.paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
                    ReduceFnRunner.this.outputter.outputWindowedValue(KV.of(ReduceFnRunner.this.key, toOutput), outputTimestamp, windows, pane);
                }
            });
            this.reduceFn.onTrigger(renamedTriggerContext);
        }
        return newHold;
    }

    private Instant scheduleEndOfWindowOrGarbageCollectionTimer(ReduceFn.Context directContext) {
        String which;
        Instant timer;
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        Instant endOfWindow = ((BoundedWindow)directContext.window()).maxTimestamp();
        if (endOfWindow.isBefore((ReadableInstant)inputWM)) {
            timer = this.garbageCollectionTime(directContext.window());
            which = "garbage collection";
        } else {
            timer = endOfWindow;
            which = "end-of-window";
        }
        WindowTracing.trace("ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", which, timer, this.key, directContext.window(), inputWM, this.timerInternals.currentOutputWatermarkTime());
        Preconditions.checkState(!timer.isAfter((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Timer %s is beyond end-of-time", timer);
        directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
        return timer;
    }

    private void cancelEndOfWindowAndGarbageCollectionTimers(ReduceFn.Context directContext) {
        WindowTracing.debug("ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for key:{}; window:{} where inputWatermark:{}; outputWatermark:{}", this.key, directContext.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        Instant eow = ((BoundedWindow)directContext.window()).maxTimestamp();
        directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
        Instant gc = this.garbageCollectionTime(directContext.window());
        if (gc.isAfter((ReadableInstant)eow)) {
            directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
        }
    }

    private Instant garbageCollectionTime(W window) {
        if (GlobalWindow.INSTANCE.maxTimestamp().minus((ReadableDuration)this.windowingStrategy.getAllowedLateness()).isBefore((ReadableInstant)((BoundedWindow)window).maxTimestamp())) {
            return GlobalWindow.INSTANCE.maxTimestamp();
        }
        return ((BoundedWindow)window).maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness());
    }

    private static class OutputViaWindowingInternals<OutputT>
    implements OutputWindowedValue<OutputT> {
        private final WindowingInternals<?, OutputT> windowingInternals;

        public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
            this.windowingInternals = windowingInternals;
        }

        @Override
        public void outputWindowedValue(OutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
        }
    }

    private static interface OutputWindowedValue<OutputT> {
        public void outputWindowedValue(OutputT var1, Instant var2, Collection<? extends BoundedWindow> var3, PaneInfo var4);
    }

    private class OnMergeCallback
    implements ActiveWindowSet.MergeCallback<W> {
        private final Map<W, W> windowToMergeResult;

        OnMergeCallback(Map<W, W> windowToMergeResult) {
            this.windowToMergeResult = windowToMergeResult;
        }

        private List<W> activeWindows(Iterable<W> windows) {
            ArrayList<BoundedWindow> active = new ArrayList<BoundedWindow>();
            for (BoundedWindow window : windows) {
                if (!ReduceFnRunner.this.activeWindows.isActive(window)) continue;
                active.add(window);
            }
            return active;
        }

        @Override
        public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
            List activeToBeMerged = this.activeWindows(toBeMerged);
            ReduceFn.OnMergeContext directMergeContext = ReduceFnRunner.this.contextFactory.forMerge(activeToBeMerged, mergeResult, ReduceFnContextFactory.StateStyle.DIRECT);
            ReduceFn.OnMergeContext renamedMergeContext = ReduceFnRunner.this.contextFactory.forMerge(activeToBeMerged, mergeResult, ReduceFnContextFactory.StateStyle.RENAMED);
            ReduceFnRunner.this.triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
            ReduceFnRunner.this.reduceFn.prefetchOnMerge(renamedMergeContext.state());
            ReduceFnRunner.this.watermarkHold.prefetchOnMerge(renamedMergeContext.state());
            ReduceFnRunner.this.nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
        }

        @Override
        public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
            for (BoundedWindow window : toBeMerged) {
                this.windowToMergeResult.put(window, mergeResult);
            }
            List activeToBeMerged = this.activeWindows(toBeMerged);
            ReduceFn.OnMergeContext directMergeContext = ReduceFnRunner.this.contextFactory.forMerge(activeToBeMerged, mergeResult, ReduceFnContextFactory.StateStyle.DIRECT);
            ReduceFn.OnMergeContext renamedMergeContext = ReduceFnRunner.this.contextFactory.forMerge(activeToBeMerged, mergeResult, ReduceFnContextFactory.StateStyle.RENAMED);
            ReduceFnRunner.this.reduceFn.onMerge(renamedMergeContext);
            ReduceFnRunner.this.watermarkHold.onMerge(renamedMergeContext);
            ReduceFnRunner.this.nonEmptyPanes.onMerge(renamedMergeContext.state());
            ReduceFnRunner.this.triggerRunner.onMerge(directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
            for (BoundedWindow active : activeToBeMerged) {
                if (active.equals(mergeResult)) continue;
                WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
                ReduceFn.Context directClearContext = ReduceFnRunner.this.contextFactory.base(active, ReduceFnContextFactory.StateStyle.DIRECT);
                ReduceFnRunner.this.cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
                ReduceFnRunner.this.paneInfoTracker.clear(directClearContext.state());
            }
        }
    }
}

