/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class CountingSource {
    @Deprecated
    public static BoundedSource<Long> upTo(long numElements) {
        Preconditions.checkArgument(numElements > 0L, "numElements (%s) must be greater than 0", numElements);
        return new BoundedCountingSource(0L, numElements);
    }

    static UnboundedCountingSource createUnbounded() {
        return new UnboundedCountingSource(0L, 1L, 1L, Duration.ZERO, new NowTimestampFn());
    }

    @Deprecated
    public static UnboundedSource<Long, CounterMark> unbounded() {
        return CountingSource.unboundedWithTimestampFn(new NowTimestampFn());
    }

    @Deprecated
    public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
        return new UnboundedCountingSource(0L, 1L, 1L, Duration.ZERO, timestampFn);
    }

    private CountingSource() {
    }

    @DefaultCoder(value=AvroCoder.class)
    public static class CounterMark
    implements UnboundedSource.CheckpointMark {
        private final long lastEmitted;
        private final Instant startTime;

        public CounterMark(long lastEmitted, Instant startTime) {
            this.lastEmitted = lastEmitted;
            this.startTime = startTime;
        }

        public long getLastEmitted() {
            return this.lastEmitted;
        }

        public Instant getStartTime() {
            return this.startTime;
        }

        private CounterMark() {
            this.lastEmitted = 0L;
            this.startTime = Instant.now();
        }

        @Override
        public void finalizeCheckpoint() throws IOException {
        }
    }

    private static class UnboundedCountingReader
    extends UnboundedSource.UnboundedReader<Long> {
        private UnboundedCountingSource source;
        private long current;
        private Instant currentTimestamp;
        private Instant firstStarted;

        public UnboundedCountingReader(UnboundedCountingSource source, CounterMark mark) {
            this.source = source;
            if (mark == null) {
                this.current = source.start - source.stride;
            } else {
                this.current = mark.getLastEmitted();
                this.firstStarted = mark.getStartTime();
            }
        }

        @Override
        public boolean start() throws IOException {
            if (this.firstStarted == null) {
                this.firstStarted = Instant.now();
            }
            return this.advance();
        }

        @Override
        public boolean advance() throws IOException {
            if (Long.MAX_VALUE - this.source.stride < this.current) {
                return false;
            }
            long nextValue = this.current + this.source.stride;
            if (this.expectedValue() < nextValue) {
                return false;
            }
            this.current = nextValue;
            this.currentTimestamp = (Instant)this.source.timestampFn.apply(this.current);
            return true;
        }

        private long expectedValue() {
            if (this.source.period.getMillis() == 0L) {
                return Long.MAX_VALUE;
            }
            double periodsElapsed = (double)(Instant.now().getMillis() - this.firstStarted.getMillis()) / (double)this.source.period.getMillis();
            return (long)((double)this.source.elementsPerPeriod * periodsElapsed);
        }

        @Override
        public Instant getWatermark() {
            return (Instant)this.source.timestampFn.apply(this.current);
        }

        @Override
        public CounterMark getCheckpointMark() {
            return new CounterMark(this.current, this.firstStarted);
        }

        @Override
        public UnboundedSource<Long, CounterMark> getCurrentSource() {
            return this.source;
        }

        @Override
        public Long getCurrent() throws NoSuchElementException {
            return this.current;
        }

        @Override
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentTimestamp;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public long getSplitBacklogBytes() {
            long expected = this.expectedValue();
            return Math.max(0L, 8L * (expected - this.current) / this.source.stride);
        }
    }

    static class UnboundedCountingSource
    extends UnboundedSource<Long, CounterMark> {
        private final long start;
        private final long stride;
        private final long elementsPerPeriod;
        private final Duration period;
        private final SerializableFunction<Long, Instant> timestampFn;

        private UnboundedCountingSource(long start, long stride, long elementsPerPeriod, Duration period, SerializableFunction<Long, Instant> timestampFn) {
            this.start = start;
            this.stride = stride;
            Preconditions.checkArgument(elementsPerPeriod > 0L, "Must produce at least one element per period, got %s", elementsPerPeriod);
            this.elementsPerPeriod = elementsPerPeriod;
            Preconditions.checkArgument(period.getMillis() >= 0L, "Must have a non-negative period length, got %s", period);
            this.period = period;
            this.timestampFn = timestampFn;
        }

        public UnboundedCountingSource withRate(long elementsPerPeriod, Duration period) {
            return new UnboundedCountingSource(this.start, this.stride, elementsPerPeriod, period, this.timestampFn);
        }

        public UnboundedCountingSource withTimestampFn(SerializableFunction<Long, Instant> timestampFn) {
            Preconditions.checkNotNull(timestampFn);
            return new UnboundedCountingSource(this.start, this.stride, this.elementsPerPeriod, this.period, timestampFn);
        }

        @Override
        public List<? extends UnboundedSource<Long, CounterMark>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
            long newStride = this.stride * (long)desiredNumSplits;
            ImmutableList.Builder splits = ImmutableList.builder();
            for (int i = 0; i < desiredNumSplits; ++i) {
                splits.add(new UnboundedCountingSource(this.start + (long)i * this.stride, newStride, this.elementsPerPeriod, this.period, this.timestampFn));
            }
            return splits.build();
        }

        @Override
        public UnboundedSource.UnboundedReader<Long> createReader(PipelineOptions options, CounterMark checkpointMark) {
            return new UnboundedCountingReader(this, checkpointMark);
        }

        @Override
        public Coder<CounterMark> getCheckpointMarkCoder() {
            return AvroCoder.of(CounterMark.class);
        }

        @Override
        public void validate() {
        }

        @Override
        public Coder<Long> getDefaultOutputCoder() {
            return VarLongCoder.of();
        }
    }

    private static class BoundedCountingReader
    extends OffsetBasedSource.OffsetBasedReader<Long> {
        private long current;

        public BoundedCountingReader(OffsetBasedSource<Long> source) {
            super(source);
        }

        @Override
        protected long getCurrentOffset() throws NoSuchElementException {
            return this.current;
        }

        @Override
        public synchronized long getSplitPointsRemaining() {
            return Math.max(0L, this.getCurrentSource().getEndOffset() - this.current);
        }

        @Override
        public synchronized BoundedCountingSource getCurrentSource() {
            return (BoundedCountingSource)super.getCurrentSource();
        }

        @Override
        public Long getCurrent() throws NoSuchElementException {
            return this.current;
        }

        @Override
        protected boolean startImpl() throws IOException {
            this.current = this.getCurrentSource().getStartOffset();
            return true;
        }

        @Override
        protected boolean advanceImpl() throws IOException {
            ++this.current;
            return true;
        }

        @Override
        public void close() throws IOException {
        }
    }

    private static class BoundedCountingSource
    extends OffsetBasedSource<Long> {
        public BoundedCountingSource(long start, long end) {
            super(start, end, 1L);
        }

        @Override
        public long getBytesPerOffset() {
            return 8L;
        }

        @Override
        public long getMaxEndOffset(PipelineOptions options) throws Exception {
            return this.getEndOffset();
        }

        @Override
        public OffsetBasedSource<Long> createSourceForSubrange(long start, long end) {
            return new BoundedCountingSource(start, end);
        }

        @Override
        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return true;
        }

        @Override
        public BoundedSource.BoundedReader<Long> createReader(PipelineOptions options) throws IOException {
            return new BoundedCountingReader(this);
        }

        @Override
        public Coder<Long> getDefaultOutputCoder() {
            return VarLongCoder.of();
        }
    }

    static class NowTimestampFn
    implements SerializableFunction<Long, Instant> {
        NowTimestampFn() {
        }

        @Override
        public Instant apply(Long input) {
            return Instant.now();
        }
    }
}

