/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.datastore.DatastoreV1;
import com.google.api.services.datastore.client.Datastore;
import com.google.api.services.datastore.client.DatastoreException;
import com.google.api.services.datastore.client.DatastoreFactory;
import com.google.api.services.datastore.client.DatastoreHelper;
import com.google.api.services.datastore.client.DatastoreOptions;
import com.google.api.services.datastore.client.QuerySplitter;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.EntityCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.io.Write;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Verify;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.primitives.Ints;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.FluentBackoff;
import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class DatastoreIO {
    public static final String DEFAULT_HOST = "https://www.googleapis.com";
    public static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;

    @Deprecated
    public static Source read() {
        return DatastoreIO.source();
    }

    public static Source source() {
        return new Source(DEFAULT_HOST, null, null, null);
    }

    public static Read.Bounded<DatastoreV1.Entity> readFrom(String datasetId, DatastoreV1.Query query) {
        return Read.from(new Source(DEFAULT_HOST, datasetId, query, null));
    }

    @Deprecated
    public static Read.Bounded<DatastoreV1.Entity> readFrom(String host, String datasetId, DatastoreV1.Query query) {
        return Read.from(new Source(host, datasetId, query, null));
    }

    public static Sink sink() {
        return new Sink(DEFAULT_HOST, null);
    }

    public static Write.Bound<DatastoreV1.Entity> writeTo(String datasetId) {
        return Write.to(DatastoreIO.sink().withDataset(datasetId));
    }

    public static class DatastoreReader
    extends BoundedSource.BoundedReader<DatastoreV1.Entity> {
        private final Source source;
        private final Datastore datastore;
        private boolean moreResults;
        private Iterator<DatastoreV1.EntityResult> entities;
        private DatastoreV1.QueryResultBatch currentBatch;
        private static final int QUERY_BATCH_LIMIT = 500;
        private int userLimit;
        private volatile boolean done = false;
        private DatastoreV1.Entity currentEntity;

        public DatastoreReader(Source source, Datastore datastore) {
            this.source = source;
            this.datastore = datastore;
            this.userLimit = source.query.hasLimit() ? source.query.getLimit() : Integer.MAX_VALUE;
        }

        @Override
        public DatastoreV1.Entity getCurrent() {
            return this.currentEntity;
        }

        @Override
        public final long getSplitPointsConsumed() {
            return this.done ? 1L : 0L;
        }

        @Override
        public final long getSplitPointsRemaining() {
            return this.done ? 0L : 1L;
        }

        @Override
        public boolean start() throws IOException {
            return this.advance();
        }

        @Override
        public boolean advance() throws IOException {
            if (this.entities == null || !this.entities.hasNext() && this.moreResults) {
                try {
                    this.entities = this.getIteratorAndMoveCursor();
                }
                catch (DatastoreException e) {
                    throw new IOException(e);
                }
            }
            if (this.entities == null || !this.entities.hasNext()) {
                this.currentEntity = null;
                this.done = true;
                return false;
            }
            this.currentEntity = this.entities.next().getEntity();
            return true;
        }

        @Override
        public void close() throws IOException {
        }

        @Override
        public Source getCurrentSource() {
            return this.source;
        }

        public Source splitAtFraction(double fraction) {
            return null;
        }

        @Override
        public Double getFractionConsumed() {
            return null;
        }

        private Iterator<DatastoreV1.EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
            DatastoreV1.Query.Builder query = (DatastoreV1.Query.Builder)this.source.query.toBuilder().clone();
            query.setLimit(Math.min(this.userLimit, 500));
            if (this.currentBatch != null && this.currentBatch.hasEndCursor()) {
                query.setStartCursor(this.currentBatch.getEndCursor());
            }
            DatastoreV1.RunQueryRequest request = this.source.makeRequest(query.build());
            DatastoreV1.RunQueryResponse response = this.datastore.runQuery(request);
            this.currentBatch = response.getBatch();
            int numFetch = this.currentBatch.getEntityResultCount();
            if (this.source.query.hasLimit()) {
                Verify.verify(this.userLimit >= numFetch, "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit", this.userLimit, numFetch, query.getLimit());
                this.userLimit -= numFetch;
            }
            boolean bl = this.moreResults = this.userLimit > 0 && (numFetch == 500 || this.currentBatch.getMoreResults() == DatastoreV1.QueryResultBatch.MoreResultsType.NOT_FINISHED);
            if (numFetch == 0) {
                return null;
            }
            return this.currentBatch.getEntityResultList().iterator();
        }
    }

    private static class DatastoreWriteResult
    implements Serializable {
        final long entitiesWritten;

        public DatastoreWriteResult(long recordsWritten) {
            this.entitiesWritten = recordsWritten;
        }
    }

    static class DatastoreWriter
    extends Sink.Writer<DatastoreV1.Entity, DatastoreWriteResult> {
        private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
        private final DatastoreWriteOperation writeOp;
        private final Datastore datastore;
        private long totalWritten = 0L;
        final List<DatastoreV1.Entity> entities = new ArrayList<DatastoreV1.Entity>();
        private static final int MAX_RETRIES = 5;
        private static final FluentBackoff BUNDLE_WRITE_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(5).withInitialBackoff(Duration.standardSeconds((long)5L));

        static boolean isValidKey(DatastoreV1.Key key) {
            List elementList = key.getPathElementList();
            if (elementList.isEmpty()) {
                return false;
            }
            DatastoreV1.Key.PathElement lastElement = (DatastoreV1.Key.PathElement)elementList.get(elementList.size() - 1);
            return lastElement.hasId() || lastElement.hasName();
        }

        DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
            this.writeOp = writeOp;
            this.datastore = datastore;
        }

        @Override
        public void open(String uId) throws Exception {
        }

        @Override
        public void write(DatastoreV1.Entity value) throws Exception {
            if (!DatastoreWriter.isValidKey(value.getKey())) {
                throw new IllegalArgumentException("Entities to be written to the Datastore must have complete keys");
            }
            this.entities.add(value);
            if (this.entities.size() >= 500) {
                this.flushBatch();
            }
        }

        @Override
        public DatastoreWriteResult close() throws Exception {
            if (this.entities.size() > 0) {
                this.flushBatch();
            }
            return new DatastoreWriteResult(this.totalWritten);
        }

        public DatastoreWriteOperation getWriteOperation() {
            return this.writeOp;
        }

        private void flushBatch() throws DatastoreException, IOException, InterruptedException {
            LOG.debug("Writing batch of {} entities", (Object)this.entities.size());
            Sleeper sleeper = Sleeper.DEFAULT;
            BackOff backoff = BUNDLE_WRITE_BACKOFF.backoff();
            while (true) {
                try {
                    DatastoreV1.CommitRequest.Builder commitRequest = DatastoreV1.CommitRequest.newBuilder();
                    commitRequest.getMutationBuilder().addAllUpsert(this.entities);
                    commitRequest.setMode(DatastoreV1.CommitRequest.Mode.NON_TRANSACTIONAL);
                    this.datastore.commit(commitRequest.build());
                }
                catch (DatastoreException exception) {
                    LOG.error("Error writing to the Datastore ({}): {}", (Object)exception.getCode(), (Object)exception.getMessage());
                    if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) continue;
                    LOG.error("Aborting after {} retries.", (Object)5);
                    throw exception;
                }
                break;
            }
            this.totalWritten += (long)this.entities.size();
            LOG.debug("Successfully wrote {} entities", (Object)this.entities.size());
            this.entities.clear();
        }
    }

    private static class DatastoreWriteOperation
    extends Sink.WriteOperation<DatastoreV1.Entity, DatastoreWriteResult> {
        private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
        private final Sink sink;

        public DatastoreWriteOperation(Sink sink) {
            this.sink = sink;
        }

        @Override
        public Coder<DatastoreWriteResult> getWriterResultCoder() {
            return SerializableCoder.of(DatastoreWriteResult.class);
        }

        @Override
        public void initialize(PipelineOptions options) throws Exception {
        }

        @Override
        public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options) throws Exception {
            long totalEntities = 0L;
            for (DatastoreWriteResult result : writerResults) {
                totalEntities += result.entitiesWritten;
            }
            LOG.info("Wrote {} elements.", (Object)totalEntities);
        }

        public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
            DatastoreOptions.Builder builder = new DatastoreOptions.Builder().host(this.sink.host).dataset(this.sink.datasetId).initializer((HttpRequestInitializer)new RetryHttpRequestInitializer());
            Credential credential = options.as(GcpOptions.class).getGcpCredential();
            if (credential != null) {
                builder.credential(credential);
            }
            Datastore datastore = DatastoreFactory.get().create(builder.build());
            return new DatastoreWriter(this, datastore);
        }

        public Sink getSink() {
            return this.sink;
        }
    }

    public static class Sink
    extends com.google.cloud.dataflow.sdk.io.Sink<DatastoreV1.Entity> {
        final String host;
        final String datasetId;

        public Sink withDataset(String datasetId) {
            Preconditions.checkNotNull(datasetId, "datasetId");
            return new Sink(this.host, datasetId);
        }

        public Sink withHost(String host) {
            Preconditions.checkNotNull(host, "host");
            return new Sink(host, this.datasetId);
        }

        protected Sink(String host, String datasetId) {
            this.host = Preconditions.checkNotNull(host, "host");
            this.datasetId = datasetId;
        }

        @Override
        public void validate(PipelineOptions options) {
            Preconditions.checkNotNull(this.host, "Host is a required parameter. Please use withHost to set the host.");
            Preconditions.checkNotNull(this.datasetId, "Dataset ID is a required parameter. Please use withDataset to to set the datasetId.");
        }

        public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
            return new DatastoreWriteOperation(this);
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotDefault(DisplayData.item("host", this.host).withLabel("Datastore Service"), DatastoreIO.DEFAULT_HOST).addIfNotNull(DisplayData.item("dataset", this.datasetId).withLabel("Output Dataset"));
        }
    }

    public static class Source
    extends BoundedSource<DatastoreV1.Entity> {
        private static final Logger LOG = LoggerFactory.getLogger(Source.class);
        private final String host;
        @Nullable
        private final String datasetId;
        @Nullable
        private final DatastoreV1.Query query;
        @Nullable
        private final String namespace;
        @Nullable
        private QuerySplitter mockSplitter;
        @Nullable
        private Long mockEstimateSizeBytes;

        public String getHost() {
            return this.host;
        }

        public String getDataset() {
            return this.datasetId;
        }

        public DatastoreV1.Query getQuery() {
            return this.query;
        }

        @Nullable
        public String getNamespace() {
            return this.namespace;
        }

        public Source withDataset(String datasetId) {
            Preconditions.checkNotNull(datasetId, "datasetId");
            return new Source(this.host, datasetId, this.query, this.namespace);
        }

        public Source withQuery(DatastoreV1.Query query) {
            Preconditions.checkNotNull(query, "query");
            Preconditions.checkArgument(!query.hasLimit() || query.getLimit() > 0, "Invalid query limit %s: must be positive", query.getLimit());
            return new Source(this.host, this.datasetId, query, this.namespace);
        }

        public Source withHost(String host) {
            Preconditions.checkNotNull(host, "host");
            return new Source(host, this.datasetId, this.query, this.namespace);
        }

        public Source withNamespace(@Nullable String namespace) {
            return new Source(this.host, this.datasetId, this.query, namespace);
        }

        @Override
        public Coder<DatastoreV1.Entity> getDefaultOutputCoder() {
            return EntityCoder.of();
        }

        @Override
        public boolean producesSortedKeys(PipelineOptions options) {
            return false;
        }

        @Override
        public List<Source> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            List<DatastoreV1.Query> datastoreSplits;
            long numSplits;
            if (this.query.hasLimit()) {
                return ImmutableList.of(this);
            }
            try {
                numSplits = Math.round((double)this.getEstimatedSizeBytes(options) / (double)desiredBundleSizeBytes);
            }
            catch (Exception e) {
                DataflowPipelineWorkerPoolOptions poolOptions = options.as(DataflowPipelineWorkerPoolOptions.class);
                if (poolOptions.getNumWorkers() > 0) {
                    LOG.warn("Estimated size of unavailable, using the number of workers {}", (Object)poolOptions.getNumWorkers(), (Object)e);
                    numSplits = poolOptions.getNumWorkers();
                }
                numSplits = 12L;
            }
            if (numSplits <= 1L) {
                return ImmutableList.of(this);
            }
            try {
                datastoreSplits = this.getSplitQueries(Ints.checkedCast(numSplits), options);
            }
            catch (DatastoreException | IllegalArgumentException e) {
                LOG.warn("Unable to parallelize the given query: {}", (Object)this.query, (Object)e);
                return ImmutableList.of(this);
            }
            ImmutableList.Builder splits = ImmutableList.builder();
            for (DatastoreV1.Query splitQuery : datastoreSplits) {
                splits.add(new Source(this.host, this.datasetId, splitQuery, this.namespace));
            }
            return splits.build();
        }

        @Override
        public BoundedSource.BoundedReader<DatastoreV1.Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
            return new DatastoreReader(this, this.getDatastore(pipelineOptions));
        }

        @Override
        public void validate() {
            Preconditions.checkNotNull(this.host, "host");
            Preconditions.checkNotNull(this.query, "query");
            Preconditions.checkNotNull(this.datasetId, "datasetId");
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
            if (this.mockEstimateSizeBytes != null) {
                return this.mockEstimateSizeBytes;
            }
            Datastore datastore = this.getDatastore(options);
            if (this.query.getKindCount() != 1) {
                throw new UnsupportedOperationException("Can only estimate size for queries specifying exactly 1 kind.");
            }
            String ourKind = this.query.getKind(0).getName();
            long latestTimestamp = this.queryLatestStatisticsTimestamp(datastore);
            DatastoreV1.Query.Builder query = DatastoreV1.Query.newBuilder();
            if (this.namespace == null) {
                query.addKindBuilder().setName("__Stat_Kind__");
            } else {
                query.addKindBuilder().setName("__Ns_Stat_Kind__");
            }
            query.setFilter(DatastoreHelper.makeFilter((DatastoreV1.Filter[])new DatastoreV1.Filter[]{DatastoreHelper.makeFilter((String)"kind_name", (DatastoreV1.PropertyFilter.Operator)DatastoreV1.PropertyFilter.Operator.EQUAL, (DatastoreV1.Value.Builder)DatastoreHelper.makeValue((String)ourKind)).build(), DatastoreHelper.makeFilter((String)"timestamp", (DatastoreV1.PropertyFilter.Operator)DatastoreV1.PropertyFilter.Operator.EQUAL, (DatastoreV1.Value.Builder)DatastoreHelper.makeValue((long)latestTimestamp)).build()}));
            DatastoreV1.RunQueryRequest request = this.makeRequest(query.build());
            long now = System.currentTimeMillis();
            DatastoreV1.RunQueryResponse response = datastore.runQuery(request);
            LOG.info("Query for per-kind statistics took {}ms", (Object)(System.currentTimeMillis() - now));
            DatastoreV1.QueryResultBatch batch = response.getBatch();
            if (batch.getEntityResultCount() == 0) {
                throw new NoSuchElementException("Datastore statistics for kind " + ourKind + " unavailable");
            }
            DatastoreV1.Entity entity = batch.getEntityResult(0).getEntity();
            return ((DatastoreV1.Value)DatastoreHelper.getPropertyMap((DatastoreV1.EntityOrBuilder)entity).get("entity_bytes")).getIntegerValue();
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotDefault(DisplayData.item("host", this.host).withLabel("Datastore Service"), DatastoreIO.DEFAULT_HOST).addIfNotNull(DisplayData.item("dataset", this.datasetId).withLabel("Input Dataset")).addIfNotNull(DisplayData.item("namespace", this.namespace).withLabel("App Engine Namespace"));
            if (this.query != null) {
                builder.add(DisplayData.item("query", this.query.toString()).withLabel("Query"));
            }
        }

        public String toString() {
            return MoreObjects.toStringHelper(this.getClass()).add("host", this.host).add("dataset", this.datasetId).add("query", this.query).add("namespace", this.namespace).toString();
        }

        private Source(String host, @Nullable String datasetId, @Nullable DatastoreV1.Query query, @Nullable String namespace) {
            this.host = Preconditions.checkNotNull(host, "host");
            this.datasetId = datasetId;
            this.query = query;
            this.namespace = namespace;
        }

        private List<DatastoreV1.Query> getSplitQueries(int numSplits, PipelineOptions options) throws DatastoreException {
            DatastoreV1.PartitionId.Builder partitionBuilder = DatastoreV1.PartitionId.newBuilder();
            if (this.namespace != null) {
                partitionBuilder.setNamespace(this.namespace);
            }
            if (this.mockSplitter != null) {
                return this.mockSplitter.getSplits(this.query, partitionBuilder.build(), numSplits, null);
            }
            return DatastoreHelper.getQuerySplitter().getSplits(this.query, partitionBuilder.build(), numSplits, this.getDatastore(options));
        }

        private DatastoreV1.RunQueryRequest makeRequest(DatastoreV1.Query query) {
            DatastoreV1.RunQueryRequest.Builder requestBuilder = DatastoreV1.RunQueryRequest.newBuilder().setQuery(query);
            if (this.namespace != null) {
                requestBuilder.getPartitionIdBuilder().setNamespace(this.namespace);
            }
            return requestBuilder.build();
        }

        private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
            DatastoreV1.Query.Builder query = DatastoreV1.Query.newBuilder();
            query.addKindBuilder().setName("__Stat_Total__");
            query.addOrder(DatastoreHelper.makeOrder((String)"timestamp", (DatastoreV1.PropertyOrder.Direction)DatastoreV1.PropertyOrder.Direction.DESCENDING));
            query.setLimit(1);
            DatastoreV1.RunQueryRequest request = this.makeRequest(query.build());
            long now = System.currentTimeMillis();
            DatastoreV1.RunQueryResponse response = datastore.runQuery(request);
            LOG.info("Query for latest stats timestamp of dataset {} took {}ms", (Object)this.datasetId, (Object)(System.currentTimeMillis() - now));
            DatastoreV1.QueryResultBatch batch = response.getBatch();
            if (batch.getEntityResultCount() == 0) {
                throw new NoSuchElementException("Datastore total statistics for dataset " + this.datasetId + " unavailable");
            }
            DatastoreV1.Entity entity = batch.getEntityResult(0).getEntity();
            return ((DatastoreV1.Value)DatastoreHelper.getPropertyMap((DatastoreV1.EntityOrBuilder)entity).get("timestamp")).getTimestampMicrosecondsValue();
        }

        private Datastore getDatastore(PipelineOptions pipelineOptions) {
            DatastoreOptions.Builder builder = new DatastoreOptions.Builder().host(this.host).dataset(this.datasetId).initializer((HttpRequestInitializer)new RetryHttpRequestInitializer());
            Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
            if (credential != null) {
                builder.credential(credential);
            }
            return DatastoreFactory.get().create(builder.build());
        }

        Source withMockSplitter(QuerySplitter splitter) {
            Source res = new Source(this.host, this.datasetId, this.query, this.namespace);
            res.mockSplitter = splitter;
            res.mockEstimateSizeBytes = this.mockEstimateSizeBytes;
            return res;
        }

        Source withMockEstimateSizeBytes(Long estimateSizeBytes) {
            Source res = new Source(this.host, this.datasetId, this.query, this.namespace);
            res.mockSplitter = this.mockSplitter;
            res.mockEstimateSizeBytes = estimateSizeBytes;
            return res;
        }
    }
}

