/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Predicate;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowTracing;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow>
implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> {
    private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner;
    private final LateDataFilter lateDataFilter;

    public LateDataDroppingDoFnRunner(DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals, Aggregator<Long, Long> droppedDueToLateness) {
        this.doFnRunner = doFnRunner;
        this.lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness);
    }

    @Override
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    @Override
    public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) {
        Iterable nonLateElements = this.lateDataFilter.filter(elem.getValue().key(), elem.getValue().elementsIterable());
        KeyedWorkItem keyedWorkItem = KeyedWorkItems.workItem(elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements);
        this.doFnRunner.processElement(elem.withValue(keyedWorkItem));
    }

    @Override
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }

    @VisibleForTesting
    static class LateDataFilter {
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final TimerInternals timerInternals;
        private final Aggregator<Long, Long> droppedDueToLateness;

        public LateDataFilter(WindowingStrategy<?, ?> windowingStrategy, TimerInternals timerInternals, Aggregator<Long, Long> droppedDueToLateness) {
            this.windowingStrategy = windowingStrategy;
            this.timerInternals = timerInternals;
            this.droppedDueToLateness = droppedDueToLateness;
        }

        public <K, InputT> Iterable<WindowedValue<InputT>> filter(final K key, Iterable<WindowedValue<InputT>> elements) {
            Iterable windowsExpandedElements = Iterables.transform(elements, new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>(){

                @Override
                public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) {
                    return Iterables.transform(input.getWindows(), new Function<BoundedWindow, WindowedValue<InputT>>(){

                        @Override
                        public WindowedValue<InputT> apply(BoundedWindow window) {
                            return WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane());
                        }
                    });
                }
            });
            Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(Iterables.concat(windowsExpandedElements), new Predicate<WindowedValue<InputT>>(){

                @Override
                public boolean apply(WindowedValue<InputT> input) {
                    BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
                    if (LateDataFilter.this.canDropDueToExpiredWindow(window)) {
                        LateDataFilter.this.droppedDueToLateness.addValue(1L);
                        WindowTracing.debug("ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} since too far behind inputWatermark:{}; outputWatermark:{}", input.getTimestamp(), key, window, LateDataFilter.this.timerInternals.currentInputWatermarkTime(), LateDataFilter.this.timerInternals.currentOutputWatermarkTime());
                        return false;
                    }
                    return true;
                }
            });
            return nonLateElements;
        }

        private boolean canDropDueToExpiredWindow(BoundedWindow window) {
            Instant inputWM = this.timerInternals.currentInputWatermarkTime();
            return window.maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness()).isBefore((ReadableInstant)inputWM);
        }
    }
}

