/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.api.client.util.Clock;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Charsets;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.BucketingFunction;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.MovingFunction;
import com.google.cloud.dataflow.sdk.util.PubsubClient;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.GeneralSecurityException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubUnboundedSource<T>
extends PTransform<PBegin, PCollection<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubUnboundedSource.class);
    private static final int DEAULT_ACK_TIMEOUT_SEC = 60;
    private static final PubsubCheckpointCoder<?> CHECKPOINT_CODER = new PubsubCheckpointCoder();
    private static final int PULL_BATCH_SIZE = 1000;
    private static final int ACK_BATCH_SIZE = 2000;
    private static final int MAX_IN_FLIGHT = 20000;
    private static final Duration PROCESSING_TIMEOUT = Duration.standardSeconds((long)120L);
    private static final int ACK_EXTENSION_PCT = 50;
    private static final int ACK_SAFETY_PCT = 20;
    private static final Duration ACK_TOO_LATE = Duration.standardSeconds((long)2L);
    private static final Duration SAMPLE_PERIOD = Duration.standardMinutes((long)1L);
    private static final Duration SAMPLE_UPDATE = Duration.standardSeconds((long)5L);
    private static final Duration LOG_PERIOD = Duration.standardSeconds((long)30L);
    private static final int MIN_WATERMARK_MESSAGES = 10;
    private static final int MIN_WATERMARK_SPREAD = 2;
    private static final int SCALE_OUT = 4;
    private static final Combine.BinaryCombineLongFn MIN = new Combine.BinaryCombineLongFn(){

        @Override
        public long apply(long left, long right) {
            return Math.min(left, right);
        }

        @Override
        public long identity() {
            return Long.MAX_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn MAX = new Combine.BinaryCombineLongFn(){

        @Override
        public long apply(long left, long right) {
            return Math.max(left, right);
        }

        @Override
        public long identity() {
            return Long.MIN_VALUE;
        }
    };
    private static final Combine.BinaryCombineLongFn SUM = new Sum.SumLongFn();
    @Nullable
    private Clock clock;
    private final PubsubClient.PubsubClientFactory pubsubFactory;
    @Nullable
    private final PubsubClient.ProjectPath project;
    @Nullable
    private final PubsubClient.TopicPath topic;
    @Nullable
    private PubsubClient.SubscriptionPath subscription;
    private final Coder<T> elementCoder;
    @Nullable
    private final String timestampLabel;
    @Nullable
    private final String idLabel;

    @VisibleForTesting
    PubsubUnboundedSource(Clock clock, PubsubClient.PubsubClientFactory pubsubFactory, @Nullable PubsubClient.ProjectPath project, @Nullable PubsubClient.TopicPath topic, @Nullable PubsubClient.SubscriptionPath subscription, Coder<T> elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) {
        Preconditions.checkArgument(topic == null != (subscription == null), "Exactly one of topic and subscription must be given");
        Preconditions.checkArgument(topic == null == (project == null), "Project must be given if topic is given");
        this.clock = clock;
        this.pubsubFactory = Preconditions.checkNotNull(pubsubFactory);
        this.project = project;
        this.topic = topic;
        this.subscription = subscription;
        this.elementCoder = Preconditions.checkNotNull(elementCoder);
        this.timestampLabel = timestampLabel;
        this.idLabel = idLabel;
    }

    public PubsubUnboundedSource(PubsubClient.PubsubClientFactory pubsubFactory, @Nullable PubsubClient.ProjectPath project, @Nullable PubsubClient.TopicPath topic, @Nullable PubsubClient.SubscriptionPath subscription, Coder<T> elementCoder, @Nullable String timestampLabel, @Nullable String idLabel) {
        this(null, pubsubFactory, project, topic, subscription, elementCoder, timestampLabel, idLabel);
    }

    public Coder<T> getElementCoder() {
        return this.elementCoder;
    }

    @Nullable
    public PubsubClient.ProjectPath getProject() {
        return this.project;
    }

    @Nullable
    public PubsubClient.TopicPath getTopic() {
        return this.topic;
    }

    @Nullable
    public PubsubClient.SubscriptionPath getSubscription() {
        return this.subscription;
    }

    @Nullable
    public String getTimestampLabel() {
        return this.timestampLabel;
    }

    @Nullable
    public String getIdLabel() {
        return this.idLabel;
    }

    @Override
    public PCollection<T> apply(PBegin input) {
        if (this.subscription == null) {
            try (PubsubClient pubsubClient = this.pubsubFactory.newClient(this.timestampLabel, this.idLabel, input.getPipeline().getOptions().as(DataflowPipelineOptions.class));){
                this.subscription = pubsubClient.createRandomSubscription(this.project, this.topic, 60);
                LOG.warn("Created subscription {} to topic {}. Note this subscription WILL NOT be deleted when the pipeline terminates", (Object)this.subscription, (Object)this.topic);
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to create subscription: ", e);
            }
        }
        return (PCollection)((PCollection)input.getPipeline().begin().apply(Read.from(new PubsubSource(this)))).apply(ParDo.named("PubsubUnboundedSource.Stats").of(new StatsFn(this.pubsubFactory, this.subscription, this.timestampLabel, this.idLabel)));
    }

    private static class StatsFn<T>
    extends DoFn<T, T> {
        private final Aggregator<Long, Long> elementCounter = this.createAggregator("elements", new Sum.SumLongFn());
        private final PubsubClient.PubsubClientFactory pubsubFactory;
        private final PubsubClient.SubscriptionPath subscription;
        @Nullable
        private final String timestampLabel;
        @Nullable
        private final String idLabel;

        public StatsFn(PubsubClient.PubsubClientFactory pubsubFactory, PubsubClient.SubscriptionPath subscription, @Nullable String timestampLabel, @Nullable String idLabel) {
            this.pubsubFactory = pubsubFactory;
            this.subscription = subscription;
            this.timestampLabel = timestampLabel;
            this.idLabel = idLabel;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            this.elementCounter.addValue(1L);
            c.output(c.element());
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("subscription", this.subscription.getPath()));
            builder.add(DisplayData.item("transport", this.pubsubFactory.getKind()));
            builder.addIfNotNull(DisplayData.item("timestampLabel", this.timestampLabel));
            builder.addIfNotNull(DisplayData.item("idLabel", this.idLabel));
        }
    }

    @VisibleForTesting
    static class PubsubSource<T>
    extends UnboundedSource<T, PubsubCheckpoint<T>> {
        public final PubsubUnboundedSource<T> outer;

        public PubsubSource(PubsubUnboundedSource<T> outer) {
            this.outer = outer;
        }

        @Override
        public List<PubsubSource<T>> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception {
            ArrayList<PubsubSource<T>> result = new ArrayList<PubsubSource<T>>(desiredNumSplits);
            for (int i = 0; i < desiredNumSplits * 4; ++i) {
                result.add(this);
            }
            return result;
        }

        @Override
        public PubsubReader<T> createReader(PipelineOptions options, @Nullable PubsubCheckpoint<T> checkpoint) {
            PubsubReader reader;
            try {
                reader = new PubsubReader(options.as(DataflowPipelineOptions.class), this);
            }
            catch (IOException | GeneralSecurityException e) {
                throw new RuntimeException("Unable to subscribe to " + ((PubsubUnboundedSource)this.outer).subscription + ": ", e);
            }
            if (checkpoint != null) {
                try {
                    checkpoint.nackAll(reader);
                }
                catch (IOException e) {
                    LOG.error("Pubsub {} cannot have {} lost messages NACKed, ignoring: {}", new Object[]{((PubsubUnboundedSource)this.outer).subscription, checkpoint.notYetReadIds.size(), e});
                }
            }
            return reader;
        }

        @Override
        @Nullable
        public Coder<PubsubCheckpoint<T>> getCheckpointMarkCoder() {
            PubsubCheckpointCoder typedCoder = CHECKPOINT_CODER;
            return typedCoder;
        }

        @Override
        public Coder<T> getDefaultOutputCoder() {
            return ((PubsubUnboundedSource)this.outer).elementCoder;
        }

        @Override
        public void validate() {
        }

        @Override
        public boolean requiresDeduping() {
            return true;
        }
    }

    @VisibleForTesting
    static class PubsubReader<T>
    extends UnboundedSource.UnboundedReader<T> {
        private final PubsubSource<T> outer;
        @Nullable
        private PubsubClient pubsubClient;
        private int ackTimeoutMs;
        private Set<String> safeToAckIds;
        private final Queue<PubsubClient.IncomingMessage> notYetRead;
        private final LinkedHashMap<String, InFlightState> inFlight;
        private final Queue<List<String>> ackedIds;
        private long notYetReadBytes;
        private BucketingFunction minUnreadTimestampMsSinceEpoch;
        private MovingFunction minReadTimestampMsSinceEpoch;
        private long lastReceivedMsSinceEpoch;
        private long lastWatermarkMsSinceEpoch;
        @Nullable
        private PubsubClient.IncomingMessage current;
        private long lastLogTimestampMsSinceEpoch;
        private long numReceived;
        private MovingFunction numReceivedRecently;
        private MovingFunction numExtendedDeadlines;
        private MovingFunction numLateDeadlines;
        private MovingFunction numAcked;
        private MovingFunction numExpired;
        private MovingFunction numNacked;
        private MovingFunction numReadBytes;
        private MovingFunction minReceivedTimestampMsSinceEpoch;
        private MovingFunction maxReceivedTimestampMsSinceEpoch;
        private MovingFunction minWatermarkMsSinceEpoch;
        private MovingFunction maxWatermarkMsSinceEpoch;
        private MovingFunction numLateMessages;
        private AtomicInteger numInFlightCheckpoints;
        private int maxInFlightCheckpoints;

        private static MovingFunction newFun(Combine.BinaryCombineLongFn function) {
            return new MovingFunction(SAMPLE_PERIOD.getMillis(), SAMPLE_UPDATE.getMillis(), 2, 10, function);
        }

        public PubsubReader(DataflowPipelineOptions options, PubsubSource<T> outer) throws IOException, GeneralSecurityException {
            this.outer = outer;
            this.pubsubClient = outer.outer.pubsubFactory.newClient(outer.outer.timestampLabel, outer.outer.idLabel, options);
            this.ackTimeoutMs = -1;
            this.safeToAckIds = new HashSet<String>();
            this.notYetRead = new ArrayDeque<PubsubClient.IncomingMessage>();
            this.inFlight = new LinkedHashMap();
            this.ackedIds = new ConcurrentLinkedQueue<List<String>>();
            this.notYetReadBytes = 0L;
            this.minUnreadTimestampMsSinceEpoch = new BucketingFunction(SAMPLE_UPDATE.getMillis(), 2, 10, MIN);
            this.minReadTimestampMsSinceEpoch = PubsubReader.newFun(MIN);
            this.lastReceivedMsSinceEpoch = -1L;
            this.lastWatermarkMsSinceEpoch = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
            this.current = null;
            this.lastLogTimestampMsSinceEpoch = -1L;
            this.numReceived = 0L;
            this.numReceivedRecently = PubsubReader.newFun(SUM);
            this.numExtendedDeadlines = PubsubReader.newFun(SUM);
            this.numLateDeadlines = PubsubReader.newFun(SUM);
            this.numAcked = PubsubReader.newFun(SUM);
            this.numExpired = PubsubReader.newFun(SUM);
            this.numNacked = PubsubReader.newFun(SUM);
            this.numReadBytes = PubsubReader.newFun(SUM);
            this.minReceivedTimestampMsSinceEpoch = PubsubReader.newFun(MIN);
            this.maxReceivedTimestampMsSinceEpoch = PubsubReader.newFun(MAX);
            this.minWatermarkMsSinceEpoch = PubsubReader.newFun(MIN);
            this.maxWatermarkMsSinceEpoch = PubsubReader.newFun(MAX);
            this.numLateMessages = PubsubReader.newFun(SUM);
            this.numInFlightCheckpoints = new AtomicInteger();
            this.maxInFlightCheckpoints = 0;
        }

        @VisibleForTesting
        PubsubClient getPubsubClient() {
            return this.pubsubClient;
        }

        void ackBatch(List<String> ackIds) throws IOException {
            this.pubsubClient.acknowledge(this.outer.outer.subscription, ackIds);
            this.ackedIds.add(ackIds);
        }

        public void nackBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
            this.pubsubClient.modifyAckDeadline(this.outer.outer.subscription, ackIds, 0);
            this.numNacked.add(nowMsSinceEpoch, ackIds.size());
        }

        private void extendBatch(long nowMsSinceEpoch, List<String> ackIds) throws IOException {
            int extensionSec = this.ackTimeoutMs * 50 / 100000;
            this.pubsubClient.modifyAckDeadline(this.outer.outer.subscription, ackIds, extensionSec);
            this.numExtendedDeadlines.add(nowMsSinceEpoch, ackIds.size());
        }

        private long now() {
            if (this.outer.outer.clock == null) {
                return System.currentTimeMillis();
            }
            return this.outer.outer.clock.currentTimeMillis();
        }

        private void retire() throws IOException {
            long nowMsSinceEpoch = this.now();
            List<String> ackIds;
            block0: while ((ackIds = this.ackedIds.poll()) != null) {
                this.numAcked.add(nowMsSinceEpoch, ackIds.size());
                Iterator<String> iterator = ackIds.iterator();
                while (true) {
                    if (!iterator.hasNext()) continue block0;
                    String ackId = iterator.next();
                    this.inFlight.remove(ackId);
                    this.safeToAckIds.remove(ackId);
                }
                break;
            }
            return;
        }

        private void extend() throws IOException {
            while (true) {
                long nowMsSinceEpoch = this.now();
                ArrayList<String> assumeExpired = new ArrayList<String>();
                ArrayList<String> toBeExtended = new ArrayList<String>();
                ArrayList<String> toBeExpired = new ArrayList<String>();
                for (Map.Entry<String, InFlightState> entry : this.inFlight.entrySet()) {
                    if (entry.getValue().ackDeadlineMsSinceEpoch - (long)(this.ackTimeoutMs * 20 / 100) > nowMsSinceEpoch) break;
                    if (entry.getValue().ackDeadlineMsSinceEpoch - ACK_TOO_LATE.getMillis() < nowMsSinceEpoch) {
                        assumeExpired.add(entry.getKey());
                        continue;
                    }
                    if (entry.getValue().requestTimeMsSinceEpoch + PROCESSING_TIMEOUT.getMillis() < nowMsSinceEpoch) {
                        toBeExpired.add(entry.getKey());
                        continue;
                    }
                    toBeExtended.add(entry.getKey());
                    if (toBeExtended.size() < 2000) continue;
                    break;
                }
                if (assumeExpired.isEmpty() && toBeExtended.isEmpty() && toBeExpired.isEmpty()) {
                    return;
                }
                if (!assumeExpired.isEmpty()) {
                    this.numLateDeadlines.add(nowMsSinceEpoch, assumeExpired.size());
                    for (String ackId : assumeExpired) {
                        this.inFlight.remove(ackId);
                    }
                }
                if (!toBeExpired.isEmpty()) {
                    this.numExpired.add(nowMsSinceEpoch, toBeExpired.size());
                    for (String ackId : toBeExpired) {
                        this.inFlight.remove(ackId);
                    }
                }
                if (toBeExtended.isEmpty()) continue;
                long newDeadlineMsSinceEpoch = nowMsSinceEpoch + (long)(this.ackTimeoutMs * 50 / 100);
                for (String ackId : toBeExtended) {
                    InFlightState state = (InFlightState)this.inFlight.remove(ackId);
                    this.inFlight.put(ackId, new InFlightState(state.requestTimeMsSinceEpoch, newDeadlineMsSinceEpoch));
                }
                this.extendBatch(nowMsSinceEpoch, toBeExtended);
            }
        }

        private void pull() throws IOException {
            if (this.inFlight.size() >= 20000) {
                return;
            }
            long requestTimeMsSinceEpoch = this.now();
            long deadlineMsSinceEpoch = requestTimeMsSinceEpoch + (long)this.ackTimeoutMs;
            List<PubsubClient.IncomingMessage> receivedMessages = this.pubsubClient.pull(requestTimeMsSinceEpoch, this.outer.outer.subscription, 1000, true);
            if (receivedMessages.isEmpty()) {
                return;
            }
            this.lastReceivedMsSinceEpoch = requestTimeMsSinceEpoch;
            for (PubsubClient.IncomingMessage incomingMessage : receivedMessages) {
                this.notYetRead.add(incomingMessage);
                this.notYetReadBytes += (long)incomingMessage.elementBytes.length;
                this.inFlight.put(incomingMessage.ackId, new InFlightState(requestTimeMsSinceEpoch, deadlineMsSinceEpoch));
                ++this.numReceived;
                this.numReceivedRecently.add(requestTimeMsSinceEpoch, 1L);
                this.minReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
                this.maxReceivedTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
                this.minUnreadTimestampMsSinceEpoch.add(requestTimeMsSinceEpoch, incomingMessage.timestampMsSinceEpoch);
            }
        }

        private void stats() {
            long nowMsSinceEpoch = this.now();
            if (this.lastLogTimestampMsSinceEpoch < 0L) {
                this.lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
                return;
            }
            long deltaMs = nowMsSinceEpoch - this.lastLogTimestampMsSinceEpoch;
            if (deltaMs < LOG_PERIOD.getMillis()) {
                return;
            }
            String messageSkew = "unknown";
            long minTimestamp = this.minReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            long maxTimestamp = this.maxReceivedTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            if (minTimestamp < Long.MAX_VALUE && maxTimestamp > Long.MIN_VALUE) {
                messageSkew = maxTimestamp - minTimestamp + "ms";
            }
            String watermarkSkew = "unknown";
            long minWatermark = this.minWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
            long maxWatermark = this.maxWatermarkMsSinceEpoch.get(nowMsSinceEpoch);
            if (minWatermark < Long.MAX_VALUE && maxWatermark > Long.MIN_VALUE) {
                watermarkSkew = maxWatermark - minWatermark + "ms";
            }
            String oldestInFlight = "no";
            String oldestAckId = Iterables.getFirst(this.inFlight.keySet(), null);
            if (oldestAckId != null) {
                oldestInFlight = nowMsSinceEpoch - this.inFlight.get((Object)oldestAckId).requestTimeMsSinceEpoch + "ms";
            }
            LOG.info("Pubsub {} has {} received messages, {} current unread messages, {} current unread bytes, {} current in-flight msgs, {} oldest in-flight, {} current in-flight checkpoints, {} max in-flight checkpoints, {}B/s recent read, {} recent received, {} recent extended, {} recent late extended, {} recent ACKed, {} recent NACKed, {} recent expired, {} recent message timestamp skew, {} recent watermark skew, {} recent late messages, {} last reported watermark", new Object[]{this.outer.outer.subscription, this.numReceived, this.notYetRead.size(), this.notYetReadBytes, this.inFlight.size(), oldestInFlight, this.numInFlightCheckpoints.get(), this.maxInFlightCheckpoints, this.numReadBytes.get(nowMsSinceEpoch) / (SAMPLE_PERIOD.getMillis() / 1000L), this.numReceivedRecently.get(nowMsSinceEpoch), this.numExtendedDeadlines.get(nowMsSinceEpoch), this.numLateDeadlines.get(nowMsSinceEpoch), this.numAcked.get(nowMsSinceEpoch), this.numNacked.get(nowMsSinceEpoch), this.numExpired.get(nowMsSinceEpoch), messageSkew, watermarkSkew, this.numLateMessages.get(nowMsSinceEpoch), new Instant(this.lastWatermarkMsSinceEpoch)});
            this.lastLogTimestampMsSinceEpoch = nowMsSinceEpoch;
        }

        @Override
        public boolean start() throws IOException {
            this.ackTimeoutMs = this.pubsubClient.ackDeadlineSeconds(this.outer.outer.subscription) * 1000;
            return this.advance();
        }

        @Override
        public boolean advance() throws IOException {
            this.stats();
            if (this.current != null) {
                this.minUnreadTimestampMsSinceEpoch.remove(this.current.requestTimeMsSinceEpoch);
                this.current = null;
            }
            this.retire();
            this.extend();
            if (this.notYetRead.isEmpty()) {
                this.pull();
            }
            this.current = this.notYetRead.poll();
            if (this.current == null) {
                return false;
            }
            this.notYetReadBytes -= (long)this.current.elementBytes.length;
            Preconditions.checkState(this.notYetReadBytes >= 0L);
            long nowMsSinceEpoch = this.now();
            this.numReadBytes.add(nowMsSinceEpoch, this.current.elementBytes.length);
            this.minReadTimestampMsSinceEpoch.add(nowMsSinceEpoch, this.current.timestampMsSinceEpoch);
            if (this.current.timestampMsSinceEpoch < this.lastWatermarkMsSinceEpoch) {
                this.numLateMessages.add(nowMsSinceEpoch, 1L);
            }
            this.safeToAckIds.add(this.current.ackId);
            return true;
        }

        @Override
        public T getCurrent() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            try {
                return CoderUtils.decodeFromByteArray(this.outer.outer.elementCoder, this.current.elementBytes);
            }
            catch (CoderException e) {
                throw new RuntimeException("Unable to decode element from Pubsub message: ", e);
            }
        }

        @Override
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return new Instant(this.current.timestampMsSinceEpoch);
        }

        @Override
        public byte[] getCurrentRecordId() throws NoSuchElementException {
            if (this.current == null) {
                throw new NoSuchElementException();
            }
            return this.current.recordId.getBytes(Charsets.UTF_8);
        }

        @Override
        public void close() throws IOException {
            if (this.pubsubClient != null) {
                this.pubsubClient.close();
                this.pubsubClient = null;
            }
        }

        @Override
        public PubsubSource<T> getCurrentSource() {
            return this.outer;
        }

        @Override
        public Instant getWatermark() {
            if (this.pubsubClient.isEOF() && this.notYetRead.isEmpty()) {
                return BoundedWindow.TIMESTAMP_MAX_VALUE;
            }
            long nowMsSinceEpoch = this.now();
            long readMin = this.minReadTimestampMsSinceEpoch.get(nowMsSinceEpoch);
            long unreadMin = this.minUnreadTimestampMsSinceEpoch.get();
            if (readMin == Long.MAX_VALUE && unreadMin == Long.MAX_VALUE && this.lastReceivedMsSinceEpoch >= 0L && nowMsSinceEpoch > this.lastReceivedMsSinceEpoch + SAMPLE_PERIOD.getMillis()) {
                this.lastWatermarkMsSinceEpoch = nowMsSinceEpoch;
            } else if (this.minReadTimestampMsSinceEpoch.isSignificant() || this.minUnreadTimestampMsSinceEpoch.isSignificant()) {
                this.lastWatermarkMsSinceEpoch = Math.min(readMin, unreadMin);
            }
            this.minWatermarkMsSinceEpoch.add(nowMsSinceEpoch, this.lastWatermarkMsSinceEpoch);
            this.maxWatermarkMsSinceEpoch.add(nowMsSinceEpoch, this.lastWatermarkMsSinceEpoch);
            return new Instant(this.lastWatermarkMsSinceEpoch);
        }

        @Override
        public PubsubCheckpoint<T> getCheckpointMark() {
            int cur = this.numInFlightCheckpoints.incrementAndGet();
            this.maxInFlightCheckpoints = Math.max(this.maxInFlightCheckpoints, cur);
            ArrayList<String> snapshotSafeToAckIds = Lists.newArrayList(this.safeToAckIds);
            ArrayList<String> snapshotNotYetReadIds = new ArrayList<String>(this.notYetRead.size());
            for (PubsubClient.IncomingMessage incomingMessage : this.notYetRead) {
                snapshotNotYetReadIds.add(incomingMessage.ackId);
            }
            return new PubsubCheckpoint(this, snapshotSafeToAckIds, snapshotNotYetReadIds);
        }

        @Override
        public long getSplitBacklogBytes() {
            return this.notYetReadBytes;
        }

        private static class InFlightState {
            long requestTimeMsSinceEpoch;
            long ackDeadlineMsSinceEpoch;

            public InFlightState(long requestTimeMsSinceEpoch, long ackDeadlineMsSinceEpoch) {
                this.requestTimeMsSinceEpoch = requestTimeMsSinceEpoch;
                this.ackDeadlineMsSinceEpoch = ackDeadlineMsSinceEpoch;
            }
        }
    }

    private static class PubsubCheckpointCoder<T>
    extends AtomicCoder<PubsubCheckpoint<T>> {
        private static final Coder<List<String>> LIST_CODER = ListCoder.of(StringUtf8Coder.of());

        private PubsubCheckpointCoder() {
        }

        @Override
        public void encode(PubsubCheckpoint<T> value, OutputStream outStream, Coder.Context context) throws IOException {
            LIST_CODER.encode(value.notYetReadIds, outStream, context);
        }

        @Override
        public PubsubCheckpoint<T> decode(InputStream inStream, Coder.Context context) throws IOException {
            List<String> notYetReadIds = LIST_CODER.decode(inStream, context);
            return new PubsubCheckpoint(null, null, notYetReadIds);
        }
    }

    @VisibleForTesting
    static class PubsubCheckpoint<T>
    implements UnboundedSource.CheckpointMark {
        @Nullable
        private PubsubReader<T> reader;
        @Nullable
        private List<String> safeToAckIds;
        @VisibleForTesting
        final List<String> notYetReadIds;

        public PubsubCheckpoint(@Nullable PubsubReader<T> reader, @Nullable List<String> safeToAckIds, List<String> notYetReadIds) {
            this.reader = reader;
            this.safeToAckIds = safeToAckIds;
            this.notYetReadIds = notYetReadIds;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void finalizeCheckpoint() throws IOException {
            Preconditions.checkState(this.reader != null && this.safeToAckIds != null, "Cannot finalize a restored checkpoint");
            try {
                int n = this.safeToAckIds.size();
                ArrayList<String> batchSafeToAckIds = new ArrayList<String>(Math.min(n, 2000));
                for (String ackId : this.safeToAckIds) {
                    batchSafeToAckIds.add(ackId);
                    if (batchSafeToAckIds.size() < 2000) continue;
                    this.reader.ackBatch(batchSafeToAckIds);
                    batchSafeToAckIds = new ArrayList(Math.min(n -= batchSafeToAckIds.size(), 2000));
                }
                if (!batchSafeToAckIds.isEmpty()) {
                    this.reader.ackBatch(batchSafeToAckIds);
                }
                Preconditions.checkState(((PubsubReader)this.reader).numInFlightCheckpoints.decrementAndGet() >= 0, "Miscounted in-flight checkpoints");
            }
            catch (Throwable throwable) {
                Preconditions.checkState(((PubsubReader)this.reader).numInFlightCheckpoints.decrementAndGet() >= 0, "Miscounted in-flight checkpoints");
                this.reader = null;
                this.safeToAckIds = null;
                throw throwable;
            }
            this.reader = null;
            this.safeToAckIds = null;
        }

        private static long now(PubsubReader reader) {
            if (((PubsubReader)reader).outer.outer.clock == null) {
                return System.currentTimeMillis();
            }
            return ((PubsubReader)reader).outer.outer.clock.currentTimeMillis();
        }

        public void nackAll(PubsubReader<T> reader) throws IOException {
            Preconditions.checkState(this.reader == null, "Cannot nackAll on persisting checkpoint");
            ArrayList<String> batchYetToAckIds = new ArrayList<String>(Math.min(this.notYetReadIds.size(), 2000));
            for (String ackId : this.notYetReadIds) {
                batchYetToAckIds.add(ackId);
                if (batchYetToAckIds.size() < 2000) continue;
                long nowMsSinceEpoch = PubsubCheckpoint.now(reader);
                reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
                batchYetToAckIds.clear();
            }
            if (!batchYetToAckIds.isEmpty()) {
                long nowMsSinceEpoch = PubsubCheckpoint.now(reader);
                reader.nackBatch(nowMsSinceEpoch, batchYetToAckIds);
            }
        }
    }
}

