/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSink;
import com.google.cloud.dataflow.sdk.io.PubsubUnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Strings;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.PubsubClient;
import com.google.cloud.dataflow.sdk.util.PubsubJsonClient;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubIO {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
    private static final PubsubClient.PubsubClientFactory FACTORY = PubsubJsonClient.FACTORY;
    public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
    private static final Pattern PROJECT_ID_REGEXP = Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
    private static final Pattern SUBSCRIPTION_REGEXP = Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
    private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
    private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP = Pattern.compile("/subscriptions/([^/]+)/(.+)");
    private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
    private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
    private static final int PUBSUB_NAME_MIN_LENGTH = 3;
    private static final int PUBSUB_NAME_MAX_LENGTH = 255;
    private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
    private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
    private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";

    private static void validateProjectName(String project) {
        Matcher match = PROJECT_ID_REGEXP.matcher(project);
        if (!match.matches()) {
            throw new IllegalArgumentException("Illegal project name specified in Pubsub subscription: " + project);
        }
    }

    private static void validatePubsubName(String name) {
        if (name.length() < 3) {
            throw new IllegalArgumentException("Pubsub object name is shorter than 3 characters: " + name);
        }
        if (name.length() > 255) {
            throw new IllegalArgumentException("Pubsub object name is longer than 255 characters: " + name);
        }
        if (name.startsWith("goog")) {
            throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
        }
        Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
        if (!match.matches()) {
            throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name + " Please see Javadoc for naming rules.");
        }
    }

    private static void populateCommonDisplayData(DisplayData.Builder builder, String timestampLabel, String idLabel, PubsubTopic topic) {
        builder.addIfNotNull(DisplayData.item("timestampLabel", timestampLabel).withLabel("Timestamp Label Attribute")).addIfNotNull(DisplayData.item("idLabel", idLabel).withLabel("ID Label Attribute"));
        if (topic != null) {
            builder.add(DisplayData.item("topic", topic.asPath()).withLabel("Pubsub Topic"));
        }
    }

    private PubsubIO() {
    }

    public static class Write {
        public static Bound<String> named(String name) {
            return new Bound(DEFAULT_PUBSUB_CODER).named(name);
        }

        public static Bound<String> topic(String topic) {
            return new Bound(DEFAULT_PUBSUB_CODER).topic(topic);
        }

        public static Bound<String> timestampLabel(String timestampLabel) {
            return new Bound(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
        }

        public static Bound<String> idLabel(String idLabel) {
            return new Bound(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
        }

        public static <T> Bound<T> withCoder(Coder<T> coder) {
            return new Bound(coder);
        }

        private Write() {
        }

        public static class Bound<T>
        extends PTransform<PCollection<T>, PDone> {
            @Nullable
            private final PubsubTopic topic;
            @Nullable
            private final String timestampLabel;
            @Nullable
            private final String idLabel;
            private final Coder<T> coder;

            private Bound(Coder<T> coder) {
                this(null, null, null, null, coder);
            }

            private Bound(String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder<T> coder) {
                super(name);
                this.topic = topic;
                this.timestampLabel = timestampLabel;
                this.idLabel = idLabel;
                this.coder = coder;
            }

            public Bound<T> named(String name) {
                return new Bound<T>(name, this.topic, this.timestampLabel, this.idLabel, this.coder);
            }

            public Bound<T> topic(String topic) {
                return new Bound<T>(this.name, PubsubTopic.fromPath(topic), this.timestampLabel, this.idLabel, this.coder);
            }

            public Bound<T> timestampLabel(String timestampLabel) {
                return new Bound<T>(this.name, this.topic, timestampLabel, this.idLabel, this.coder);
            }

            public Bound<T> idLabel(String idLabel) {
                return new Bound<T>(this.name, this.topic, this.timestampLabel, idLabel, this.coder);
            }

            public <X> Bound<X> withCoder(Coder<X> coder) {
                return new Bound<X>(this.name, this.topic, this.timestampLabel, this.idLabel, coder);
            }

            @Override
            public PDone apply(PCollection<T> input) {
                if (this.topic == null) {
                    throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
                }
                switch (input.isBounded()) {
                    case BOUNDED: {
                        input.apply(ParDo.of(new PubsubWriter()));
                        return PDone.in(input.getPipeline());
                    }
                    case UNBOUNDED: {
                        return (PDone)input.apply(new PubsubUnboundedSink<T>(FACTORY, PubsubClient.topicPathFromName(this.topic.project, this.topic.topic), this.coder, this.timestampLabel, this.idLabel, 100));
                    }
                }
                throw new RuntimeException();
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                PubsubIO.populateCommonDisplayData(builder, this.timestampLabel, this.idLabel, this.topic);
            }

            @Override
            protected Coder<Void> getDefaultOutputCoder() {
                return VoidCoder.of();
            }

            public PubsubTopic getTopic() {
                return this.topic;
            }

            public String getTimestampLabel() {
                return this.timestampLabel;
            }

            public String getIdLabel() {
                return this.idLabel;
            }

            public Coder<T> getCoder() {
                return this.coder;
            }

            public class PubsubWriter
            extends DoFn<T, Void> {
                private static final int MAX_PUBLISH_BATCH_SIZE = 100;
                private transient List<PubsubClient.OutgoingMessage> output;
                private transient PubsubClient pubsubClient;

                @Override
                public void startBundle(DoFn.Context c) throws IOException {
                    this.output = new ArrayList<PubsubClient.OutgoingMessage>();
                    this.pubsubClient = FACTORY.newClient(Bound.this.timestampLabel, null, c.getPipelineOptions().as(DataflowPipelineOptions.class));
                }

                @Override
                public void processElement(DoFn.ProcessContext c) throws IOException {
                    PubsubClient.OutgoingMessage message = new PubsubClient.OutgoingMessage(CoderUtils.encodeToByteArray(Bound.this.getCoder(), c.element()), c.timestamp().getMillis(), null);
                    this.output.add(message);
                    if (this.output.size() >= 100) {
                        this.publish();
                    }
                }

                @Override
                public void finishBundle(DoFn.Context c) throws IOException {
                    if (!this.output.isEmpty()) {
                        this.publish();
                    }
                    this.output = null;
                    this.pubsubClient.close();
                    this.pubsubClient = null;
                }

                private void publish() throws IOException {
                    int n = this.pubsubClient.publish(PubsubClient.topicPathFromName(Bound.this.getTopic().project, Bound.this.getTopic().topic), this.output);
                    Preconditions.checkState(n == this.output.size());
                    this.output.clear();
                }

                @Override
                public void populateDisplayData(DisplayData.Builder builder) {
                    super.populateDisplayData(builder);
                    Bound.this.populateDisplayData(builder);
                }
            }
        }
    }

    public static class Read {
        public static Bound<String> named(String name) {
            return new Bound(DEFAULT_PUBSUB_CODER).named(name);
        }

        public static Bound<String> topic(String topic) {
            return new Bound(DEFAULT_PUBSUB_CODER).topic(topic);
        }

        public static Bound<String> subscription(String subscription) {
            return new Bound(DEFAULT_PUBSUB_CODER).subscription(subscription);
        }

        public static Bound<String> timestampLabel(String timestampLabel) {
            return new Bound(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
        }

        public static Bound<String> idLabel(String idLabel) {
            return new Bound(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
        }

        public static <T> Bound<T> withCoder(Coder<T> coder) {
            return new Bound(coder);
        }

        public static Bound<String> maxNumRecords(int maxNumRecords) {
            return new Bound(DEFAULT_PUBSUB_CODER).maxNumRecords(maxNumRecords);
        }

        public static Bound<String> maxReadTime(Duration maxReadTime) {
            return new Bound(DEFAULT_PUBSUB_CODER).maxReadTime(maxReadTime);
        }

        private Read() {
        }

        public static class Bound<T>
        extends PTransform<PInput, PCollection<T>> {
            @Nullable
            private final PubsubTopic topic;
            @Nullable
            private final PubsubSubscription subscription;
            @Nullable
            private final String timestampLabel;
            @Nullable
            private final String idLabel;
            @Nullable
            private final Coder<T> coder;
            private final int maxNumRecords;
            @Nullable
            private final Duration maxReadTime;

            private Bound(Coder<T> coder) {
                this(null, null, null, null, coder, null, 0, null);
            }

            private Bound(String name, PubsubSubscription subscription, PubsubTopic topic, String timestampLabel, Coder<T> coder, String idLabel, int maxNumRecords, Duration maxReadTime) {
                super(name);
                this.subscription = subscription;
                this.topic = topic;
                this.timestampLabel = timestampLabel;
                this.coder = coder;
                this.idLabel = idLabel;
                this.maxNumRecords = maxNumRecords;
                this.maxReadTime = maxReadTime;
            }

            public Bound<T> named(String name) {
                return new Bound<T>(name, this.subscription, this.topic, this.timestampLabel, this.coder, this.idLabel, this.maxNumRecords, this.maxReadTime);
            }

            public Bound<T> subscription(String subscription) {
                return new Bound<T>(this.name, PubsubSubscription.fromPath(subscription), this.topic, this.timestampLabel, this.coder, this.idLabel, this.maxNumRecords, this.maxReadTime);
            }

            public Bound<T> topic(String topic) {
                return new Bound<T>(this.name, this.subscription, PubsubTopic.fromPath(topic), this.timestampLabel, this.coder, this.idLabel, this.maxNumRecords, this.maxReadTime);
            }

            public Bound<T> timestampLabel(String timestampLabel) {
                return new Bound<T>(this.name, this.subscription, this.topic, timestampLabel, this.coder, this.idLabel, this.maxNumRecords, this.maxReadTime);
            }

            public Bound<T> idLabel(String idLabel) {
                return new Bound<T>(this.name, this.subscription, this.topic, this.timestampLabel, this.coder, idLabel, this.maxNumRecords, this.maxReadTime);
            }

            public <X> Bound<X> withCoder(Coder<X> coder) {
                return new Bound<X>(this.name, this.subscription, this.topic, this.timestampLabel, coder, this.idLabel, this.maxNumRecords, this.maxReadTime);
            }

            public Bound<T> maxNumRecords(int maxNumRecords) {
                return new Bound<T>(this.name, this.subscription, this.topic, this.timestampLabel, this.coder, this.idLabel, maxNumRecords, this.maxReadTime);
            }

            public Bound<T> maxReadTime(Duration maxReadTime) {
                return new Bound<T>(this.name, this.subscription, this.topic, this.timestampLabel, this.coder, this.idLabel, this.maxNumRecords, maxReadTime);
            }

            @Override
            public PCollection<T> apply(PInput input) {
                boolean boundedOutput;
                if (this.topic == null && this.subscription == null) {
                    throw new IllegalStateException("Need to set either the topic or the subscription for a PubsubIO.Read transform");
                }
                if (this.topic != null && this.subscription != null) {
                    throw new IllegalStateException("Can't set both the topic and the subscription for a PubsubIO.Read transform");
                }
                boolean bl = boundedOutput = this.getMaxNumRecords() > 0 || this.getMaxReadTime() != null;
                if (boundedOutput) {
                    return ((PCollection)((PCollection)((PCollection)input.getPipeline().begin().apply(Create.of(new Void[]{null}))).setCoder((Coder)VoidCoder.of())).apply(ParDo.of(new PubsubReader()))).setCoder((Coder)this.coder);
                }
                PubsubClient.ProjectPath projectPath = this.topic == null ? null : PubsubClient.projectPathFromId(this.topic.project);
                PubsubClient.TopicPath topicPath = this.topic == null ? null : PubsubClient.topicPathFromName(this.topic.project, this.topic.topic);
                PubsubClient.SubscriptionPath subscriptionPath = this.subscription == null ? null : PubsubClient.subscriptionPathFromName(this.subscription.project, this.subscription.subscription);
                return (PCollection)input.getPipeline().begin().apply(new PubsubUnboundedSource<T>(FACTORY, projectPath, topicPath, subscriptionPath, this.coder, this.timestampLabel, this.idLabel));
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                PubsubIO.populateCommonDisplayData(builder, this.timestampLabel, this.idLabel, this.topic);
                builder.addIfNotNull(DisplayData.item("maxReadTime", this.maxReadTime).withLabel("Maximum Read Time")).addIfNotDefault(DisplayData.item("maxNumRecords", this.maxNumRecords).withLabel("Maximum Read Records"), 0);
                if (this.subscription != null) {
                    builder.add(DisplayData.item("subscription", this.subscription.asPath()).withLabel("Pubsub Subscription"));
                }
            }

            @Override
            protected Coder<T> getDefaultOutputCoder() {
                return this.coder;
            }

            public PubsubTopic getTopic() {
                return this.topic;
            }

            public PubsubSubscription getSubscription() {
                return this.subscription;
            }

            public String getTimestampLabel() {
                return this.timestampLabel;
            }

            public Coder<T> getCoder() {
                return this.coder;
            }

            public String getIdLabel() {
                return this.idLabel;
            }

            public int getMaxNumRecords() {
                return this.maxNumRecords;
            }

            public Duration getMaxReadTime() {
                return this.maxReadTime;
            }

            public class PubsubReader
            extends DoFn<Void, T> {
                private static final int DEFAULT_PULL_SIZE = 100;
                private static final int ACK_TIMEOUT_SEC = 60;

                @Override
                public void processElement(DoFn.ProcessContext c) throws IOException {
                    try (PubsubClient pubsubClient = FACTORY.newClient(Bound.this.timestampLabel, Bound.this.idLabel, c.getPipelineOptions().as(DataflowPipelineOptions.class));){
                        PubsubClient.SubscriptionPath subscriptionPath;
                        if (Bound.this.getSubscription() == null) {
                            PubsubClient.TopicPath topicPath = PubsubClient.topicPathFromName(Bound.this.getTopic().project, Bound.this.getTopic().topic);
                            String projectId = c.getPipelineOptions().as(DataflowPipelineOptions.class).getProject();
                            if (Strings.isNullOrEmpty(projectId)) {
                                projectId = Bound.this.getTopic().project;
                            }
                            PubsubClient.ProjectPath projectPath = PubsubClient.projectPathFromId(projectId);
                            try {
                                subscriptionPath = pubsubClient.createRandomSubscription(projectPath, topicPath, 60);
                            }
                            catch (Exception e) {
                                throw new RuntimeException("Failed to create subscription: ", e);
                            }
                        } else {
                            subscriptionPath = PubsubClient.subscriptionPathFromName(Bound.this.getSubscription().project, Bound.this.getSubscription().subscription);
                        }
                        Instant endTime = Bound.this.getMaxReadTime() == null ? new Instant(Long.MAX_VALUE) : Instant.now().plus((ReadableDuration)Bound.this.getMaxReadTime());
                        ArrayList<PubsubClient.IncomingMessage> messages = new ArrayList<PubsubClient.IncomingMessage>();
                        Exception finallyBlockException = null;
                        try {
                            while ((Bound.this.getMaxNumRecords() == 0 || messages.size() < Bound.this.getMaxNumRecords()) && Instant.now().isBefore((ReadableInstant)endTime)) {
                                int batchSize = 100;
                                if (Bound.this.getMaxNumRecords() > 0) {
                                    batchSize = Math.min(batchSize, Bound.this.getMaxNumRecords() - messages.size());
                                }
                                List<PubsubClient.IncomingMessage> batchMessages = pubsubClient.pull(System.currentTimeMillis(), subscriptionPath, batchSize, false);
                                ArrayList<String> ackIds = new ArrayList<String>();
                                for (PubsubClient.IncomingMessage message : batchMessages) {
                                    messages.add(message);
                                    ackIds.add(message.ackId);
                                }
                                if (ackIds.size() == 0) continue;
                                pubsubClient.acknowledge(subscriptionPath, ackIds);
                            }
                        }
                        catch (IOException e) {
                            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
                        }
                        finally {
                            if (Bound.this.getSubscription() == null) {
                                try {
                                    pubsubClient.deleteSubscription(subscriptionPath);
                                }
                                catch (Exception e) {
                                    finallyBlockException = e;
                                }
                            }
                        }
                        if (finallyBlockException != null) {
                            throw new RuntimeException("Failed to delete subscription: ", finallyBlockException);
                        }
                        for (PubsubClient.IncomingMessage message : messages) {
                            c.outputWithTimestamp(CoderUtils.decodeFromByteArray(Bound.this.getCoder(), message.elementBytes), new Instant(message.timestampMsSinceEpoch));
                        }
                    }
                }

                @Override
                public void populateDisplayData(DisplayData.Builder builder) {
                    super.populateDisplayData(builder);
                    Bound.this.populateDisplayData(builder);
                }
            }
        }
    }

    public static class PubsubTopic
    implements Serializable {
        private final Type type;
        private final String project;
        private final String topic;

        private PubsubTopic(Type type, String project, String topic) {
            this.type = type;
            this.project = project;
            this.topic = topic;
        }

        public static PubsubTopic fromPath(String path) {
            String topicName;
            String projectName;
            if (path.equals(PubsubIO.TOPIC_DEV_NULL_TEST_NAME)) {
                return new PubsubTopic(Type.FAKE, "", path);
            }
            Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
            if (v1beta1Match.matches()) {
                LOG.warn("Saw topic in v1beta1 format.  Topics should be in the format projects/<project_id>/topics/<topic_name>");
                projectName = v1beta1Match.group(1);
                topicName = v1beta1Match.group(2);
            } else {
                Matcher match = TOPIC_REGEXP.matcher(path);
                if (!match.matches()) {
                    throw new IllegalArgumentException("Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
                }
                projectName = match.group(1);
                topicName = match.group(2);
            }
            PubsubIO.validateProjectName(projectName);
            PubsubIO.validatePubsubName(topicName);
            return new PubsubTopic(Type.NORMAL, projectName, topicName);
        }

        @Deprecated
        public String asV1Beta1Path() {
            if (this.type == Type.NORMAL) {
                return "/topics/" + this.project + "/" + this.topic;
            }
            return this.topic;
        }

        @Deprecated
        public String asV1Beta2Path() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/topics/" + this.topic;
            }
            return this.topic;
        }

        public String asPath() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/topics/" + this.topic;
            }
            return this.topic;
        }

        private static enum Type {
            NORMAL,
            FAKE;

        }
    }

    public static class PubsubSubscription
    implements Serializable {
        private final Type type;
        private final String project;
        private final String subscription;

        private PubsubSubscription(Type type, String project, String subscription) {
            this.type = type;
            this.project = project;
            this.subscription = subscription;
        }

        public static PubsubSubscription fromPath(String path) {
            String subscriptionName;
            String projectName;
            if (path.startsWith(PubsubIO.SUBSCRIPTION_RANDOM_TEST_PREFIX) || path.startsWith(PubsubIO.SUBSCRIPTION_STARTING_SIGNAL)) {
                return new PubsubSubscription(Type.FAKE, "", path);
            }
            Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
            if (v1beta1Match.matches()) {
                LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format projects/<project_id>/subscriptions/<subscription_name>");
                projectName = v1beta1Match.group(1);
                subscriptionName = v1beta1Match.group(2);
            } else {
                Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
                if (!match.matches()) {
                    throw new IllegalArgumentException("Pubsub subscription is not in projects/<project_id>/subscriptions/<subscription_name> format: " + path);
                }
                projectName = match.group(1);
                subscriptionName = match.group(2);
            }
            PubsubIO.validateProjectName(projectName);
            PubsubIO.validatePubsubName(subscriptionName);
            return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
        }

        @Deprecated
        public String asV1Beta1Path() {
            if (this.type == Type.NORMAL) {
                return "/subscriptions/" + this.project + "/" + this.subscription;
            }
            return this.subscription;
        }

        @Deprecated
        public String asV1Beta2Path() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/subscriptions/" + this.subscription;
            }
            return this.subscription;
        }

        public String asPath() {
            if (this.type == Type.NORMAL) {
                return "projects/" + this.project + "/subscriptions/" + this.subscription;
            }
            return this.subscription;
        }

        private static enum Type {
            NORMAL,
            FAKE;

        }
    }
}

