/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.DoFnRunner;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.ReduceFnRunner;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.TimerInternals;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
import com.google.cloud.dataflow.sdk.values.KV;

@SystemDoFnInternal
public class GroupAlsoByWindowViaWindowSetDoFn<K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
extends DoFn<RinT, KV<K, OutputT>>
implements DoFnRunner.ReduceFnExecutor<K, InputT, OutputT, W> {
    protected final Aggregator<Long, Long> droppedDueToClosedWindow = this.createAggregator("DroppedDueToClosedWindow", new Sum.SumLongFn());
    protected final Aggregator<Long, Long> droppedDueToLateness = this.createAggregator("DroppedDueToLateness", new Sum.SumLongFn());
    private final WindowingStrategy<Object, W> windowingStrategy;
    private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;

    public static <K, InputT, OutputT, W extends BoundedWindow> DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(WindowingStrategy<?, W> strategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
        return new GroupAlsoByWindowViaWindowSetDoFn(strategy, reduceFn);
    }

    private GroupAlsoByWindowViaWindowSetDoFn(WindowingStrategy<?, W> windowingStrategy, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) {
        WindowingStrategy<?, W> noWildcard = windowingStrategy;
        this.windowingStrategy = noWildcard;
        this.reduceFn = reduceFn;
    }

    @Override
    public void processElement(DoFn.ProcessContext c) throws Exception {
        KeyedWorkItem element = (KeyedWorkItem)c.element();
        Object key = ((KeyedWorkItem)c.element()).key();
        TimerInternals timerInternals = c.windowingInternals().timerInternals();
        StateInternals<?> stateInternals = c.windowingInternals().stateInternals();
        ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, stateInternals, timerInternals, c.windowingInternals(), this.droppedDueToClosedWindow, this.reduceFn, c.getPipelineOptions());
        reduceFnRunner.processElements(element.elementsIterable());
        for (TimerInternals.TimerData timer : element.timersIterable()) {
            reduceFnRunner.onTimer(timer);
        }
        reduceFnRunner.persist();
    }

    @Override
    public DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
        GroupAlsoByWindowViaWindowSetDoFn asFn = this;
        return asFn;
    }

    @Override
    public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
        return this.droppedDueToLateness;
    }
}

