/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io.datastore;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Verify;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.transforms.Values;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.FluentBackoff;
import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.datastore.v1.CommitRequest;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.EntityResult;
import com.google.datastore.v1.Filter;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Mutation;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.PropertyFilter;
import com.google.datastore.v1.PropertyOrder;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.DatastoreFactory;
import com.google.datastore.v1.client.DatastoreHelper;
import com.google.datastore.v1.client.DatastoreOptions;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class DatastoreV1 {
    @VisibleForTesting
    static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;

    DatastoreV1() {
    }

    public Read read() {
        return new Read(null, null, null, 0);
    }

    public Write write() {
        return new Write(null);
    }

    public DeleteEntity deleteEntity() {
        return new DeleteEntity(null);
    }

    public DeleteKey deleteKey() {
        return new DeleteKey(null);
    }

    static boolean isValidKey(Key key) {
        List elementList = key.getPathList();
        if (elementList.isEmpty()) {
            return false;
        }
        Key.PathElement lastElement = (Key.PathElement)elementList.get(elementList.size() - 1);
        return lastElement.getId() != 0L || !lastElement.getName().isEmpty();
    }

    @VisibleForTesting
    static class V1DatastoreFactory
    implements Serializable {
        V1DatastoreFactory() {
        }

        public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
            DatastoreOptions.Builder builder = new DatastoreOptions.Builder().projectId(projectId).initializer((HttpRequestInitializer)new RetryHttpRequestInitializer());
            Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
            if (credential != null) {
                builder.credential(credential);
            }
            return DatastoreFactory.get().create(builder.build());
        }

        public QuerySplitter getQuerySplitter() {
            return DatastoreHelper.getQuerySplitter();
        }
    }

    @VisibleForTesting
    static class DeleteKeyFn
    extends SimpleFunction<Key, Mutation> {
        DeleteKeyFn() {
        }

        @Override
        public Mutation apply(Key key) {
            Preconditions.checkArgument(DatastoreV1.isValidKey(key), "Keys to be deleted from the Cloud Datastore must be complete:\n%s", key);
            return DatastoreHelper.makeDelete((Key)key).build();
        }
    }

    @VisibleForTesting
    static class DeleteEntityFn
    extends SimpleFunction<Entity, Mutation> {
        DeleteEntityFn() {
        }

        @Override
        public Mutation apply(Entity entity) {
            Preconditions.checkArgument(DatastoreV1.isValidKey(entity.getKey()), "Entities to be deleted from the Cloud Datastore must have complete keys:\n%s", entity);
            return DatastoreHelper.makeDelete((Key)entity.getKey()).build();
        }
    }

    @VisibleForTesting
    static class UpsertFn
    extends SimpleFunction<Entity, Mutation> {
        UpsertFn() {
        }

        @Override
        public Mutation apply(Entity entity) {
            Preconditions.checkArgument(DatastoreV1.isValidKey(entity.getKey()), "Entities to be written to the Cloud Datastore must have complete keys:\n%s", entity);
            return DatastoreHelper.makeUpsert((Entity)entity).build();
        }
    }

    @VisibleForTesting
    static class DatastoreWriterFn
    extends DoFn<Mutation, Void> {
        private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
        private final String projectId;
        private transient Datastore datastore;
        private final V1DatastoreFactory datastoreFactory;
        private final List<Mutation> mutations = new ArrayList<Mutation>();
        private static final int MAX_RETRIES = 5;
        private static final FluentBackoff BUNDLE_WRITE_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(5).withInitialBackoff(Duration.standardSeconds((long)5L));

        DatastoreWriterFn(String projectId) {
            this(projectId, new V1DatastoreFactory());
        }

        @VisibleForTesting
        DatastoreWriterFn(String projectId, V1DatastoreFactory datastoreFactory) {
            this.projectId = Preconditions.checkNotNull(projectId, "projectId");
            this.datastoreFactory = datastoreFactory;
        }

        @Override
        public void startBundle(DoFn.Context c) {
            this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), this.projectId);
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            this.mutations.add((Mutation)c.element());
            if (this.mutations.size() >= 500) {
                this.flushBatch();
            }
        }

        @Override
        public void finishBundle(DoFn.Context c) throws Exception {
            if (!this.mutations.isEmpty()) {
                this.flushBatch();
            }
        }

        private void flushBatch() throws DatastoreException, IOException, InterruptedException {
            LOG.debug("Writing batch of {} mutations", (Object)this.mutations.size());
            Sleeper sleeper = Sleeper.DEFAULT;
            BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
            while (true) {
                try {
                    CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
                    commitRequest.addAllMutations(this.mutations);
                    commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
                    this.datastore.commit(commitRequest.build());
                }
                catch (DatastoreException exception) {
                    LOG.error("Error writing to the Datastore ({}): {}", (Object)exception.getCode(), (Object)exception.getMessage());
                    if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) continue;
                    LOG.error("Aborting after {} retries.", (Object)5);
                    throw exception;
                }
                break;
            }
            LOG.debug("Successfully wrote {} mutations", (Object)this.mutations.size());
            this.mutations.clear();
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("projectId", this.projectId).withLabel("Output Project"));
        }
    }

    private static abstract class Mutate<T>
    extends PTransform<PCollection<T>, PDone> {
        @Nullable
        private final String projectId;
        private final SimpleFunction<T, Mutation> mutationFn;

        Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
            this.projectId = projectId;
            this.mutationFn = Preconditions.checkNotNull(mutationFn);
        }

        @Override
        public PDone apply(PCollection<T> input) {
            ((PCollection)input.apply("Convert to Mutation", MapElements.via(this.mutationFn))).apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(this.projectId)));
            return PDone.in(input.getPipeline());
        }

        @Override
        public void validate(PCollection<T> input) {
            Preconditions.checkNotNull(this.projectId, "projectId");
            Preconditions.checkNotNull(this.mutationFn, "mutationFn");
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("projectId", this.projectId).add("mutationFn", this.mutationFn.getClass().getName()).toString();
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("projectId", this.projectId).withLabel("Output Project")).add(DisplayData.item("mutationFn", this.mutationFn.getClass()).withLabel("Datastore Mutation Function"));
        }

        public String getProjectId() {
            return this.projectId;
        }
    }

    public static class DeleteKey
    extends Mutate<Key> {
        DeleteKey(@Nullable String projectId) {
            super(projectId, new DeleteKeyFn());
        }

        public DeleteKey withProjectId(String projectId) {
            Preconditions.checkNotNull(projectId, "projectId");
            return new DeleteKey(projectId);
        }
    }

    public static class DeleteEntity
    extends Mutate<Entity> {
        DeleteEntity(@Nullable String projectId) {
            super(projectId, new DeleteEntityFn());
        }

        public DeleteEntity withProjectId(String projectId) {
            Preconditions.checkNotNull(projectId, "projectId");
            return new DeleteEntity(projectId);
        }
    }

    public static class Write
    extends Mutate<Entity> {
        Write(@Nullable String projectId) {
            super(projectId, new UpsertFn());
        }

        public Write withProjectId(String projectId) {
            Preconditions.checkNotNull(projectId, "projectId");
            return new Write(projectId);
        }
    }

    public static class Read
    extends PTransform<PBegin, PCollection<Entity>> {
        private static final Logger LOG = LoggerFactory.getLogger(Read.class);
        public static final int NUM_QUERY_SPLITS_MAX = 50000;
        static final int NUM_QUERY_SPLITS_MIN = 12;
        static final long DEFAULT_BUNDLE_SIZE_BYTES = 0x4000000L;
        static final int QUERY_BATCH_LIMIT = 500;
        @Nullable
        private final String projectId;
        @Nullable
        private final Query query;
        @Nullable
        private final String namespace;
        private final int numQuerySplits;

        static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
            int numSplits;
            try {
                long estimatedSizeBytes = Read.getEstimatedSizeBytes(datastore, query, namespace);
                LOG.info("Estimated size bytes for the query is: {}", (Object)estimatedSizeBytes);
                numSplits = (int)Math.min(50000L, Math.round((double)estimatedSizeBytes / 6.7108864E7));
            }
            catch (Exception e) {
                LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", (Object)query, (Object)e);
                numSplits = 12;
            }
            return Math.max(numSplits, 12);
        }

        private static long queryLatestStatisticsTimestamp(Datastore datastore, @Nullable String namespace) throws DatastoreException {
            Query.Builder query = Query.newBuilder();
            if (namespace == null) {
                query.addKindBuilder().setName("__Stat_Total__");
            } else {
                query.addKindBuilder().setName("__Stat_Ns_Total__");
            }
            query.addOrder(DatastoreHelper.makeOrder((String)"timestamp", (PropertyOrder.Direction)PropertyOrder.Direction.DESCENDING));
            query.setLimit(Int32Value.newBuilder().setValue(1));
            RunQueryRequest request = Read.makeRequest(query.build(), namespace);
            RunQueryResponse response = datastore.runQuery(request);
            QueryResultBatch batch = response.getBatch();
            if (batch.getEntityResultsCount() == 0) {
                throw new NoSuchElementException("Datastore total statistics unavailable");
            }
            Entity entity = batch.getEntityResults(0).getEntity();
            return ((Value)entity.getProperties().get("timestamp")).getTimestampValue().getSeconds() * 1000000L;
        }

        static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace) throws DatastoreException {
            String ourKind = query.getKind(0).getName();
            long latestTimestamp = Read.queryLatestStatisticsTimestamp(datastore, namespace);
            LOG.info("Latest stats timestamp for kind {} is {}", (Object)ourKind, (Object)latestTimestamp);
            Query.Builder queryBuilder = Query.newBuilder();
            if (namespace == null) {
                queryBuilder.addKindBuilder().setName("__Stat_Kind__");
            } else {
                queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
            }
            queryBuilder.setFilter(DatastoreHelper.makeAndFilter((Filter[])new Filter[]{DatastoreHelper.makeFilter((String)"kind_name", (PropertyFilter.Operator)PropertyFilter.Operator.EQUAL, (Value)DatastoreHelper.makeValue((String)ourKind).build()).build(), DatastoreHelper.makeFilter((String)"timestamp", (PropertyFilter.Operator)PropertyFilter.Operator.EQUAL, (Value)DatastoreHelper.makeValue((long)latestTimestamp).build()).build()}));
            RunQueryRequest request = Read.makeRequest(queryBuilder.build(), namespace);
            long now = System.currentTimeMillis();
            RunQueryResponse response = datastore.runQuery(request);
            LOG.debug("Query for per-kind statistics took {}ms", (Object)(System.currentTimeMillis() - now));
            QueryResultBatch batch = response.getBatch();
            if (batch.getEntityResultsCount() == 0) {
                throw new NoSuchElementException("Cloud Datastore statistics for kind " + ourKind + " unavailable");
            }
            Entity entity = batch.getEntityResults(0).getEntity();
            return ((Value)entity.getProperties().get("entity_bytes")).getIntegerValue();
        }

        static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
            RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
            if (namespace != null) {
                requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
            }
            return requestBuilder.build();
        }

        private static List<Query> splitQuery(Query query, @Nullable String namespace, Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException {
            PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
            if (namespace != null) {
                partitionBuilder.setNamespaceId(namespace);
            }
            return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore);
        }

        private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace, int numQuerySplits) {
            this.projectId = projectId;
            this.query = query;
            this.namespace = namespace;
            this.numQuerySplits = numQuerySplits;
        }

        public Read withProjectId(String projectId) {
            Preconditions.checkNotNull(projectId, "projectId");
            return new Read(projectId, this.query, this.namespace, this.numQuerySplits);
        }

        public Read withQuery(Query query) {
            Preconditions.checkNotNull(query, "query");
            Preconditions.checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0, "Invalid query limit %s: must be positive", query.getLimit().getValue());
            return new Read(this.projectId, query, this.namespace, this.numQuerySplits);
        }

        public Read withNamespace(String namespace) {
            return new Read(this.projectId, this.query, namespace, this.numQuerySplits);
        }

        public Read withNumQuerySplits(int numQuerySplits) {
            return new Read(this.projectId, this.query, this.namespace, Math.min(Math.max(numQuerySplits, 0), 50000));
        }

        @Nullable
        public Query getQuery() {
            return this.query;
        }

        @Nullable
        public String getProjectId() {
            return this.projectId;
        }

        @Nullable
        public String getNamespace() {
            return this.namespace;
        }

        @Override
        public PCollection<Entity> apply(PBegin input) {
            V1Options v1Options = V1Options.from(this.getProjectId(), this.getQuery(), this.getNamespace());
            PCollection queries = (PCollection)((Object)((PCollection)input.apply(Create.of(this.query))).apply(ParDo.of(new SplitQueryFn(v1Options, this.numQuerySplits))));
            PCollection shardedQueries = (PCollection)((PCollection)((PCollection)queries.apply(GroupByKey.create())).apply(Values.create())).apply(Flatten.iterables());
            PCollection entities = (PCollection)shardedQueries.apply(ParDo.of(new ReadFn(v1Options)));
            return entities;
        }

        @Override
        public void validate(PBegin input) {
            Preconditions.checkNotNull(this.projectId, "projectId");
            Preconditions.checkNotNull(this.query, "query");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("projectId", this.projectId).withLabel("ProjectId")).addIfNotNull(DisplayData.item("namespace", this.namespace).withLabel("Namespace")).addIfNotNull(DisplayData.item("query", this.query.toString()).withLabel("Query"));
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("projectId", this.projectId).add("query", this.query).add("namespace", this.namespace).toString();
        }

        @VisibleForTesting
        static class ReadFn
        extends DoFn<Query, Entity> {
            private final V1Options options;
            private final V1DatastoreFactory datastoreFactory;
            private transient Datastore datastore;

            public ReadFn(V1Options options) {
                this(options, new V1DatastoreFactory());
            }

            @VisibleForTesting
            ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
                this.options = options;
                this.datastoreFactory = datastoreFactory;
            }

            @Override
            public void startBundle(DoFn.Context c) throws Exception {
                this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), this.options.getProjectId());
            }

            @Override
            public void processElement(DoFn.ProcessContext context) throws Exception {
                Query query = (Query)context.element();
                String namespace = this.options.getNamespace();
                int userLimit = query.hasLimit() ? query.getLimit().getValue() : Integer.MAX_VALUE;
                boolean moreResults = true;
                QueryResultBatch currentBatch = null;
                while (moreResults) {
                    Query.Builder queryBuilder = (Query.Builder)query.toBuilder().clone();
                    queryBuilder.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, 500)));
                    if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
                        queryBuilder.setStartCursor(currentBatch.getEndCursor());
                    }
                    RunQueryRequest request = Read.makeRequest(queryBuilder.build(), namespace);
                    RunQueryResponse response = this.datastore.runQuery(request);
                    currentBatch = response.getBatch();
                    int numFetch = currentBatch.getEntityResultsCount();
                    if (query.hasLimit()) {
                        Verify.verify(userLimit >= numFetch, "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit", userLimit, numFetch, query.getLimit());
                        userLimit -= numFetch;
                    }
                    for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
                        context.output(entityResult.getEntity());
                    }
                    moreResults = userLimit > 0 && (numFetch == 500 || currentBatch.getMoreResults() == QueryResultBatch.MoreResultsType.NOT_FINISHED);
                }
            }
        }

        @VisibleForTesting
        static class SplitQueryFn
        extends DoFn<Query, KV<Integer, Query>> {
            private final V1Options options;
            private final int numSplits;
            private final V1DatastoreFactory datastoreFactory;
            private transient Datastore datastore;
            private transient QuerySplitter querySplitter;

            public SplitQueryFn(V1Options options, int numSplits) {
                this(options, numSplits, new V1DatastoreFactory());
            }

            @VisibleForTesting
            SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) {
                this.options = options;
                this.numSplits = numSplits;
                this.datastoreFactory = datastoreFactory;
            }

            @Override
            public void startBundle(DoFn.Context c) throws Exception {
                this.datastore = this.datastoreFactory.getDatastore(c.getPipelineOptions(), this.options.projectId);
                this.querySplitter = this.datastoreFactory.getQuerySplitter();
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                List<Object> querySplits;
                int key = 1;
                Query query = (Query)c.element();
                if (query.hasLimit()) {
                    c.output(KV.of(key, query));
                    return;
                }
                int estimatedNumSplits = this.numSplits <= 0 ? Read.getEstimatedNumSplits(this.datastore, query, this.options.getNamespace()) : this.numSplits;
                LOG.info("Splitting the query into {} splits", (Object)estimatedNumSplits);
                try {
                    querySplits = Read.splitQuery(query, this.options.getNamespace(), this.datastore, this.querySplitter, estimatedNumSplits);
                }
                catch (Exception e) {
                    LOG.warn("Unable to parallelize the given query: {}", (Object)query, (Object)e);
                    querySplits = ImmutableList.of(query);
                }
                for (Query query2 : querySplits) {
                    c.output(KV.of(key++, query2));
                }
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.addIfNotNull(DisplayData.item("projectId", this.options.getProjectId()).withLabel("ProjectId")).addIfNotNull(DisplayData.item("namespace", this.options.getNamespace()).withLabel("Namespace")).addIfNotNull(DisplayData.item("query", this.options.getQuery().toString()).withLabel("Query"));
            }
        }

        @VisibleForTesting
        static class V1Options
        implements Serializable {
            private final Query query;
            private final String projectId;
            @Nullable
            private final String namespace;

            private V1Options(String projectId, Query query, @Nullable String namespace) {
                this.projectId = Preconditions.checkNotNull(projectId, "projectId");
                this.query = Preconditions.checkNotNull(query, "query");
                this.namespace = namespace;
            }

            public static V1Options from(String projectId, Query query, @Nullable String namespace) {
                return new V1Options(projectId, query, namespace);
            }

            public Query getQuery() {
                return this.query;
            }

            public String getProjectId() {
                return this.projectId;
            }

            @Nullable
            public String getNamespace() {
                return this.namespace;
            }
        }
    }
}

