/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io.bigtable;

import com.google.bigtable.v1.Mutation;
import com.google.bigtable.v1.Row;
import com.google.bigtable.v1.RowFilter;
import com.google.bigtable.v1.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
import com.google.cloud.dataflow.sdk.coders.protobuf.ProtoCoder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.io.bigtable.BigtableService;
import com.google.cloud.dataflow.sdk.io.bigtable.BigtableServiceImpl;
import com.google.cloud.dataflow.sdk.io.range.ByteKey;
import com.google.cloud.dataflow.sdk.io.range.ByteKeyRange;
import com.google.cloud.dataflow.sdk.io.range.ByteKeyRangeTracker;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableCollection;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.FutureCallback;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Futures;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import io.grpc.Status;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class BigtableIO {
    private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);

    @Experimental
    public static Read read() {
        return new Read(null, "", null, null);
    }

    @Experimental
    public static Write write() {
        return new Write(null, "", null);
    }

    private BigtableIO() {
    }

    private static String getUserAgent() {
        String javaVersion = System.getProperty("java.specification.version");
        DataflowReleaseInfo info = DataflowReleaseInfo.getReleaseInfo();
        return String.format("%s/%s (%s); %s", info.getName(), info.getVersion(), javaVersion, "0.3.0");
    }

    private static RetryOptions.Builder retryOptionsToBuilder(RetryOptions options) {
        RetryOptions.Builder builder = new RetryOptions.Builder();
        builder.setEnableRetries(options.enableRetries());
        builder.setInitialBackoffMillis(options.getInitialBackoffMillis());
        builder.setBackoffMultiplier(options.getBackoffMultiplier());
        builder.setMaxElapsedBackoffMillis(options.getMaxElaspedBackoffMillis());
        builder.setStreamingBufferSize(options.getStreamingBufferSize());
        builder.setStreamingBatchSize(options.getStreamingBatchSize());
        builder.setReadPartialRowTimeoutMillis(options.getReadPartialRowTimeoutMillis());
        builder.setMaxScanTimeoutRetries(options.getMaxScanTimeoutRetries());
        builder.setAllowRetriesWithoutTimestamp(options.allowRetriesWithoutTimestamp());
        for (Status.Code code : Status.Code.values()) {
            if (!options.isRetryable(code)) continue;
            builder.addStatusToRetryOn(code);
        }
        return builder;
    }

    static class BigtableWriteException
    extends IOException {
        public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
            super(String.format("Error mutating row %s with mutations %s", record.getKey().toStringUtf8(), record.getValue()), cause);
        }
    }

    private static class BigtableWriter
    extends Sink.Writer<KV<ByteString, Iterable<Mutation>>, Long> {
        private final BigtableWriteOperation writeOperation;
        private final Sink sink;
        private BigtableService.Writer bigtableWriter;
        private long recordsWritten;
        private final ConcurrentLinkedQueue<BigtableWriteException> failures;

        public BigtableWriter(BigtableWriteOperation writeOperation) {
            this.writeOperation = writeOperation;
            this.sink = writeOperation.getSink();
            this.failures = new ConcurrentLinkedQueue();
        }

        @Override
        public void open(String uId) throws Exception {
            this.bigtableWriter = this.sink.getBigtableService().openForWriting(this.sink.getTableId());
            this.recordsWritten = 0L;
        }

        private void checkForFailures() throws IOException {
            int i;
            if (this.failures.isEmpty()) {
                return;
            }
            StringBuilder logEntry = new StringBuilder();
            for (i = 0; i < 10 && !this.failures.isEmpty(); ++i) {
                BigtableWriteException exc = (BigtableWriteException)this.failures.remove();
                logEntry.append("\n").append(exc.getMessage());
                if (exc.getCause() == null) continue;
                logEntry.append(": ").append(exc.getCause().getMessage());
            }
            String message = String.format("At least %d errors occurred writing to Bigtable. First %d errors: %s", i + this.failures.size(), i, logEntry.toString());
            logger.error(message);
            throw new IOException(message);
        }

        @Override
        public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
            this.checkForFailures();
            Futures.addCallback(this.bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
            ++this.recordsWritten;
        }

        @Override
        public Long close() throws Exception {
            this.bigtableWriter.close();
            this.bigtableWriter = null;
            this.checkForFailures();
            logger.info("Wrote {} records", (Object)this.recordsWritten);
            return this.recordsWritten;
        }

        @Override
        public Sink.WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
            return this.writeOperation;
        }

        private class WriteExceptionCallback
        implements FutureCallback<Empty> {
            private final KV<ByteString, Iterable<Mutation>> value;

            public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
                this.value = value;
            }

            @Override
            public void onFailure(Throwable cause) {
                BigtableWriter.this.failures.add(new BigtableWriteException(this.value, cause));
            }

            @Override
            public void onSuccess(Empty produced) {
            }
        }
    }

    private static class BigtableWriteOperation
    extends Sink.WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
        private final Sink sink;

        public BigtableWriteOperation(Sink sink) {
            this.sink = sink;
        }

        @Override
        public Sink.Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options) throws Exception {
            return new BigtableWriter(this);
        }

        @Override
        public void initialize(PipelineOptions options) {
        }

        @Override
        public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
            long count = 0L;
            for (Long value : writerResults) {
                value = value + count;
            }
            logger.debug("Wrote {} elements to BigtableIO.Sink {}", (Object)this.sink);
        }

        public Sink getSink() {
            return this.sink;
        }

        @Override
        public Coder<Long> getWriterResultCoder() {
            return VarLongCoder.of();
        }
    }

    private static class Sink
    extends com.google.cloud.dataflow.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
        private final String tableId;
        private final BigtableService bigtableService;

        public Sink(String tableId, BigtableService bigtableService) {
            this.tableId = Preconditions.checkNotNull(tableId, "tableId");
            this.bigtableService = Preconditions.checkNotNull(bigtableService, "bigtableService");
        }

        public String getTableId() {
            return this.tableId;
        }

        public BigtableService getBigtableService() {
            return this.bigtableService;
        }

        public String toString() {
            return MoreObjects.toStringHelper(Sink.class).add("bigtableService", this.bigtableService).add("tableId", this.tableId).toString();
        }

        @Override
        public Sink.WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(PipelineOptions options) {
            return new BigtableWriteOperation(this);
        }

        @Override
        public void validate(PipelineOptions options) {
        }
    }

    private static class BigtableReader
    extends BoundedSource.BoundedReader<Row> {
        private BigtableSource source;
        private BigtableService service;
        private BigtableService.Reader reader;
        private final ByteKeyRangeTracker rangeTracker;
        private long recordsReturned;

        public BigtableReader(BigtableSource source, BigtableService service) {
            this.source = source;
            this.service = service;
            this.rangeTracker = ByteKeyRangeTracker.of(source.getRange());
        }

        @Override
        public boolean start() throws IOException {
            boolean hasRecord;
            this.reader = this.service.createReader(this.getCurrentSource());
            boolean bl = hasRecord = this.reader.start() && this.rangeTracker.tryReturnRecordAt(true, ByteKey.of(this.reader.getCurrentRow().getKey())) || this.rangeTracker.markDone();
            if (hasRecord) {
                ++this.recordsReturned;
            }
            return hasRecord;
        }

        @Override
        public synchronized BigtableSource getCurrentSource() {
            return this.source;
        }

        @Override
        public boolean advance() throws IOException {
            boolean hasRecord;
            boolean bl = hasRecord = this.reader.advance() && this.rangeTracker.tryReturnRecordAt(true, ByteKey.of(this.reader.getCurrentRow().getKey())) || this.rangeTracker.markDone();
            if (hasRecord) {
                ++this.recordsReturned;
            }
            return hasRecord;
        }

        @Override
        public Row getCurrent() throws NoSuchElementException {
            return this.reader.getCurrentRow();
        }

        @Override
        public void close() throws IOException {
            logger.info("Closing reader after reading {} records.", (Object)this.recordsReturned);
            if (this.reader != null) {
                this.reader.close();
                this.reader = null;
            }
        }

        @Override
        public final Double getFractionConsumed() {
            return this.rangeTracker.getFractionConsumed();
        }

        @Override
        public final long getSplitPointsConsumed() {
            return this.rangeTracker.getSplitPointsConsumed();
        }

        public final synchronized BigtableSource splitAtFraction(double fraction) {
            ByteKey splitKey;
            try {
                splitKey = this.rangeTracker.getRange().interpolateKey(fraction);
            }
            catch (IllegalArgumentException e) {
                logger.info("%s: Failed to interpolate key for fraction %s.", (Object)this.rangeTracker.getRange(), (Object)fraction);
                return null;
            }
            logger.debug("Proposing to split {} at fraction {} (key {})", new Object[]{this.rangeTracker, fraction, splitKey});
            BigtableSource primary = this.source.withEndKey(splitKey);
            BigtableSource residual = this.source.withStartKey(splitKey);
            if (!this.rangeTracker.trySplitAtPosition(splitKey)) {
                return null;
            }
            this.source = primary;
            return residual;
        }
    }

    static class BigtableSource
    extends BoundedSource<Row> {
        private final BigtableService service;
        @Nullable
        private final String tableId;
        @Nullable
        private final RowFilter filter;
        private final ByteKeyRange range;
        @Nullable
        private Long estimatedSizeBytes;
        @Nullable
        private transient List<SampleRowKeysResponse> sampleRowKeys;

        public BigtableSource(BigtableService service, String tableId, @Nullable RowFilter filter, ByteKeyRange range, Long estimatedSizeBytes) {
            this.service = service;
            this.tableId = tableId;
            this.filter = filter;
            this.range = range;
            this.estimatedSizeBytes = estimatedSizeBytes;
        }

        public String toString() {
            return MoreObjects.toStringHelper(BigtableSource.class).add("tableId", this.tableId).add("filter", this.filter).add("range", this.range).add("estimatedSizeBytes", this.estimatedSizeBytes).toString();
        }

        protected BigtableSource withStartKey(ByteKey startKey) {
            Preconditions.checkNotNull(startKey, "startKey");
            return new BigtableSource(this.service, this.tableId, this.filter, this.range.withStartKey(startKey), this.estimatedSizeBytes);
        }

        protected BigtableSource withEndKey(ByteKey endKey) {
            Preconditions.checkNotNull(endKey, "endKey");
            return new BigtableSource(this.service, this.tableId, this.filter, this.range.withEndKey(endKey), this.estimatedSizeBytes);
        }

        protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
            Preconditions.checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
            return new BigtableSource(this.service, this.tableId, this.filter, this.range, estimatedSizeBytes);
        }

        private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
            return this.service.getSampleRowKeys(this);
        }

        @Override
        public List<BigtableSource> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            long maximumNumberOfSplits = 4000L;
            long sizeEstimate = this.getEstimatedSizeBytes(options);
            desiredBundleSizeBytes = Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
            return this.splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, this.getSampleRowKeys());
        }

        private List<BigtableSource> splitIntoBundlesBasedOnSamples(long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
            if (sampleRowKeys.isEmpty()) {
                logger.info("Not splitting source {} because no sample row keys are available.", (Object)this);
                return Collections.singletonList(this);
            }
            logger.info("About to split into bundles of size {} with sampleRowKeys length {} first element {}", new Object[]{desiredBundleSizeBytes, sampleRowKeys.size(), sampleRowKeys.get(0)});
            ByteKey lastEndKey = ByteKey.EMPTY;
            long lastOffset = 0L;
            ImmutableList.Builder splits = ImmutableList.builder();
            for (SampleRowKeysResponse response : sampleRowKeys) {
                ByteKey splitEndKey;
                ByteKey responseEndKey = ByteKey.of(response.getRowKey());
                long responseOffset = response.getOffsetBytes();
                Preconditions.checkState(responseOffset >= lastOffset, "Expected response byte offset %s to come after the last offset %s", responseOffset, lastOffset);
                if (!this.range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey)).booleanValue()) {
                    lastOffset = responseOffset;
                    lastEndKey = responseEndKey;
                    continue;
                }
                ByteKey splitStartKey = lastEndKey;
                if (splitStartKey.compareTo(this.range.getStartKey()) < 0) {
                    splitStartKey = this.range.getStartKey();
                }
                if (!this.range.containsKey(splitEndKey = responseEndKey).booleanValue()) {
                    splitEndKey = this.range.getEndKey();
                }
                long sampleSizeBytes = responseOffset - lastOffset;
                List<BigtableSource> subSplits = this.splitKeyRangeIntoBundleSizedSubranges(sampleSizeBytes, desiredBundleSizeBytes, ByteKeyRange.of(splitStartKey, splitEndKey));
                splits.addAll(subSplits);
                lastEndKey = responseEndKey;
                lastOffset = responseOffset;
            }
            if (!lastEndKey.isEmpty() && (this.range.getEndKey().isEmpty() || lastEndKey.compareTo(this.range.getEndKey()) < 0)) {
                splits.add(this.withStartKey(lastEndKey).withEndKey(this.range.getEndKey()));
            }
            ImmutableCollection ret = splits.build();
            logger.info("Generated {} splits. First split: {}", (Object)ret.size(), ret.get(0));
            return ret;
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
            if (this.estimatedSizeBytes == null) {
                this.estimatedSizeBytes = this.getEstimatedSizeBytesBasedOnSamples(this.getSampleRowKeys());
            }
            return this.estimatedSizeBytes;
        }

        private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
            long estimatedSizeBytes = 0L;
            long lastOffset = 0L;
            ByteKey currentStartKey = ByteKey.EMPTY;
            for (SampleRowKeysResponse response : samples) {
                ByteKey currentEndKey = ByteKey.of(response.getRowKey());
                long currentOffset = response.getOffsetBytes();
                if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
                    lastOffset = currentOffset;
                    continue;
                }
                if (this.range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey)).booleanValue()) {
                    estimatedSizeBytes += currentOffset - lastOffset;
                }
                currentStartKey = currentEndKey;
                lastOffset = currentOffset;
            }
            return estimatedSizeBytes;
        }

        @Override
        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return true;
        }

        @Override
        public BoundedSource.BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
            return new BigtableReader(this, this.service);
        }

        @Override
        public void validate() {
            Preconditions.checkArgument(!this.tableId.isEmpty(), "tableId cannot be empty");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("tableId", this.tableId).withLabel("Table ID"));
            if (this.filter != null) {
                builder.add(DisplayData.item("rowFilter", this.filter.toString()).withLabel("Table Row Filter"));
            }
        }

        @Override
        public Coder<Row> getDefaultOutputCoder() {
            return ProtoCoder.of(Row.class);
        }

        private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
            logger.debug("Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}", (Object)sampleSizeBytes, (Object)desiredBundleSizeBytes);
            if (sampleSizeBytes <= desiredBundleSizeBytes) {
                return Collections.singletonList(this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
            }
            Preconditions.checkArgument(sampleSizeBytes > 0L, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
            Preconditions.checkArgument(desiredBundleSizeBytes > 0L, "Desired bundle size %s bytes must be greater than 0.", desiredBundleSizeBytes);
            int splitCount = (int)Math.ceil((double)sampleSizeBytes / (double)desiredBundleSizeBytes);
            List<ByteKey> splitKeys = range.split(splitCount);
            ImmutableList.Builder splits = ImmutableList.builder();
            Iterator<ByteKey> keys = splitKeys.iterator();
            ByteKey prev = keys.next();
            while (keys.hasNext()) {
                ByteKey next = keys.next();
                splits.add(this.withStartKey(prev).withEndKey(next).withEstimatedSizeBytes(sampleSizeBytes / (long)splitCount));
                prev = next;
            }
            return splits.build();
        }

        public ByteKeyRange getRange() {
            return this.range;
        }

        public RowFilter getRowFilter() {
            return this.filter;
        }

        public String getTableId() {
            return this.tableId;
        }
    }

    @Experimental
    public static class Write
    extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
        @Nullable
        private final BigtableOptions options;
        private final String tableId;
        @Nullable
        private final BigtableService bigtableService;

        private Write(@Nullable BigtableOptions options, String tableId, @Nullable BigtableService bigtableService) {
            this.options = options;
            this.tableId = Preconditions.checkNotNull(tableId, "tableId");
            this.bigtableService = bigtableService;
        }

        public Write withBigtableOptions(BigtableOptions options) {
            Preconditions.checkNotNull(options, "options");
            return this.withBigtableOptions(options.toBuilder());
        }

        public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
            Preconditions.checkNotNull(optionsBuilder, "optionsBuilder");
            BigtableOptions options = optionsBuilder.build();
            RetryOptions retryOptions = options.getRetryOptions();
            BigtableOptions.Builder clonedBuilder = options.toBuilder().setBulkOptions(options.getBulkOptions().toBuilder().setUseBulkApi(true).build()).setRetryOptions(BigtableIO.retryOptionsToBuilder(retryOptions).setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), retryOptions.getStreamingBufferSize() / 2)).build());
            BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(BigtableIO.getUserAgent()).build();
            return new Write(optionsWithAgent, this.tableId, this.bigtableService);
        }

        public Write withTableId(String tableId) {
            Preconditions.checkNotNull(tableId, "tableId");
            return new Write(this.options, tableId, this.bigtableService);
        }

        public BigtableOptions getBigtableOptions() {
            return this.options;
        }

        public String getTableId() {
            return this.tableId;
        }

        @Override
        public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
            Sink sink = new Sink(this.tableId, this.getBigtableService());
            return (PDone)input.apply(com.google.cloud.dataflow.sdk.io.Write.to(sink));
        }

        @Override
        public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
            Preconditions.checkArgument(this.options != null, "BigtableOptions not specified");
            Preconditions.checkArgument(!this.tableId.isEmpty(), "Table ID not specified");
            try {
                Preconditions.checkArgument(this.getBigtableService().tableExists(this.tableId), "Table %s does not exist", this.tableId);
            }
            catch (IOException e) {
                logger.warn("Error checking whether table {} exists; proceeding.", (Object)this.tableId, (Object)e);
            }
        }

        Write withBigtableService(BigtableService bigtableService) {
            Preconditions.checkNotNull(bigtableService, "bigtableService");
            return new Write(this.options, this.tableId, bigtableService);
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("tableId", this.tableId).withLabel("Table ID"));
            if (this.options != null) {
                builder.add(DisplayData.item("bigtableOptions", this.options.toString()).withLabel("Bigtable Options"));
            }
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(Write.class).add("options", this.options).add("tableId", this.tableId).toString();
        }

        private BigtableService getBigtableService() {
            if (this.bigtableService != null) {
                return this.bigtableService;
            }
            return new BigtableServiceImpl(this.options);
        }
    }

    @Experimental
    public static class Read
    extends PTransform<PBegin, PCollection<Row>> {
        @Nullable
        private final BigtableOptions options;
        private final String tableId;
        @Nullable
        private final RowFilter filter;
        @Nullable
        private final BigtableService bigtableService;

        public Read withBigtableOptions(BigtableOptions options) {
            Preconditions.checkNotNull(options, "options");
            return this.withBigtableOptions(options.toBuilder());
        }

        public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
            Preconditions.checkNotNull(optionsBuilder, "optionsBuilder");
            BigtableOptions options = optionsBuilder.build();
            RetryOptions retryOptions = options.getRetryOptions();
            BigtableOptions.Builder clonedBuilder = options.toBuilder().setDataChannelCount(1).setRetryOptions(BigtableIO.retryOptionsToBuilder(retryOptions).setStreamingBatchSize(Math.min(retryOptions.getStreamingBatchSize(), retryOptions.getStreamingBufferSize() / 2)).build());
            BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(BigtableIO.getUserAgent()).build();
            return new Read(optionsWithAgent, this.tableId, this.filter, this.bigtableService);
        }

        public Read withRowFilter(RowFilter filter) {
            Preconditions.checkNotNull(filter, "filter");
            return new Read(this.options, this.tableId, filter, this.bigtableService);
        }

        public Read withTableId(String tableId) {
            Preconditions.checkNotNull(tableId, "tableId");
            return new Read(this.options, tableId, this.filter, this.bigtableService);
        }

        public BigtableOptions getBigtableOptions() {
            return this.options;
        }

        public String getTableId() {
            return this.tableId;
        }

        @Override
        public PCollection<Row> apply(PBegin input) {
            BigtableSource source = new BigtableSource(this.getBigtableService(), this.tableId, this.filter, ByteKeyRange.ALL_KEYS, null);
            return (PCollection)input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source));
        }

        @Override
        public void validate(PBegin input) {
            Preconditions.checkArgument(this.options != null, "BigtableOptions not specified");
            Preconditions.checkArgument(!this.tableId.isEmpty(), "Table ID not specified");
            try {
                Preconditions.checkArgument(this.getBigtableService().tableExists(this.tableId), "Table %s does not exist", this.tableId);
            }
            catch (IOException e) {
                logger.warn("Error checking whether table {} exists; proceeding.", (Object)this.tableId, (Object)e);
            }
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("tableId", this.tableId).withLinkUrl("Table ID"));
            if (this.options != null) {
                builder.add(DisplayData.item("bigtableOptions", this.options.toString()).withLabel("Bigtable Options"));
            }
            if (this.filter != null) {
                builder.add(DisplayData.item("rowFilter", this.filter.toString()).withLabel("Table Row Filter"));
            }
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(Read.class).add("options", this.options).add("tableId", this.tableId).add("filter", this.filter).toString();
        }

        private Read(@Nullable BigtableOptions options, String tableId, @Nullable RowFilter filter, @Nullable BigtableService bigtableService) {
            this.options = options;
            this.tableId = Preconditions.checkNotNull(tableId, "tableId");
            this.filter = filter;
            this.bigtableService = bigtableService;
        }

        Read withBigtableService(BigtableService bigtableService) {
            Preconditions.checkNotNull(bigtableService, "bigtableService");
            return new Read(this.options, this.tableId, this.filter, bigtableService);
        }

        private BigtableService getBigtableService() {
            if (this.bigtableService != null) {
                return this.bigtableService;
            }
            return new BigtableServiceImpl(this.options);
        }
    }
}

