/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.OffsetBasedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Iterables;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Futures;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ListenableFuture;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.ListeningExecutorService;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.MoreExecutors;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FileBasedSource<T>
extends OffsetBasedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
    private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
    static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
    static final int THREAD_POOL_SIZE = 128;
    private final String fileOrPatternSpec;
    private final Mode mode;

    public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
        super(0L, Long.MAX_VALUE, minBundleSize);
        this.mode = Mode.FILEPATTERN;
        this.fileOrPatternSpec = fileOrPatternSpec;
    }

    public FileBasedSource(String fileName, long minBundleSize, long startOffset, long endOffset) {
        super(startOffset, endOffset, minBundleSize);
        this.mode = Mode.SINGLE_FILE_OR_SUBRANGE;
        this.fileOrPatternSpec = fileName;
    }

    public final String getFileOrPatternSpec() {
        return this.fileOrPatternSpec;
    }

    public final Mode getMode() {
        return this.mode;
    }

    @Override
    public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
        Preconditions.checkArgument(this.mode != Mode.FILEPATTERN, "Cannot split a file pattern based source based on positions");
        Preconditions.checkArgument(start >= this.getStartOffset(), "Start offset value " + start + " of the subrange cannot be smaller than the start offset value " + this.getStartOffset() + " of the parent source");
        Preconditions.checkArgument(end <= this.getEndOffset(), "End offset value " + end + " of the subrange cannot be larger than the end offset value " + this.getEndOffset() + " of the parent source");
        FileBasedSource<T> source = this.createForSubrangeOfFile(this.fileOrPatternSpec, start, end);
        if (start > 0L || end != Long.MAX_VALUE) {
            Preconditions.checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE, "Source created for the range [" + start + "," + end + ")" + " must be a subrange source");
        }
        return source;
    }

    protected abstract FileBasedSource<T> createForSubrangeOfFile(String var1, long var2, long var4);

    protected abstract FileBasedReader<T> createSingleFileReader(PipelineOptions var1);

    @Override
    public final long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
        IOChannelFactory factory = IOChannelUtils.getFactory(this.fileOrPatternSpec);
        if (this.mode == Mode.FILEPATTERN) {
            long startTime = System.currentTimeMillis();
            long totalSize = 0L;
            Collection<String> inputs = factory.match(this.fileOrPatternSpec);
            if (inputs.size() <= 100) {
                totalSize = FileBasedSource.getExactTotalSizeOfFiles(inputs, factory);
                LOG.debug("Size estimation of all files of pattern " + this.fileOrPatternSpec + " took " + (System.currentTimeMillis() - startTime) + " ms");
            } else {
                totalSize = FileBasedSource.getEstimatedSizeOfFilesBySampling(inputs, factory);
                LOG.debug("Size estimation of pattern " + this.fileOrPatternSpec + " by sampling took " + (System.currentTimeMillis() - startTime) + " ms");
            }
            return totalSize;
        }
        long start = this.getStartOffset();
        long end = Math.min(this.getEndOffset(), this.getMaxEndOffset(options));
        return end - start;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static long getExactTotalSizeOfFiles(Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
        ArrayList<ListenableFuture<Long>> futures = new ArrayList<ListenableFuture<Long>>();
        ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(128));
        long totalSize = 0L;
        try {
            for (String file : files) {
                futures.add(FileBasedSource.createFutureForSizeEstimation(file, ioChannelFactory, service));
            }
            for (Long val : (List)Futures.allAsList(futures).get()) {
                totalSize += val.longValue();
            }
            long l = totalSize;
            return l;
        }
        finally {
            service.shutdown();
        }
    }

    private static ListenableFuture<Long> createFutureForSizeEstimation(final String file, final IOChannelFactory ioChannelFactory, ListeningExecutorService service) {
        return service.submit(new Callable<Long>(){

            @Override
            public Long call() throws Exception {
                return ioChannelFactory.getSizeBytes(file);
            }
        });
    }

    private static long getEstimatedSizeOfFilesBySampling(Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
        int sampleSize = (int)(0.01f * (float)files.size());
        sampleSize = Math.max(100, sampleSize);
        List<String> selectedFiles = new ArrayList<String>(files);
        Collections.shuffle(selectedFiles);
        selectedFiles = selectedFiles.subList(0, sampleSize);
        return (long)files.size() * FileBasedSource.getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory) / (long)selectedFiles.size();
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.add(DisplayData.item("filePattern", this.getFileOrPatternSpec()).withLabel("File Pattern"));
    }

    private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(final String file, final long desiredBundleSizeBytes, final PipelineOptions options, ListeningExecutorService service) {
        return service.submit(new Callable<List<? extends FileBasedSource<T>>>(){

            @Override
            public List<? extends FileBasedSource<T>> call() throws Exception {
                return FileBasedSource.this.createForSubrangeOfFile(file, 0L, Long.MAX_VALUE).splitIntoBundles(desiredBundleSizeBytes, options);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final List<? extends FileBasedSource<T>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
        if (this.mode == Mode.FILEPATTERN) {
            long startTime = System.currentTimeMillis();
            ArrayList<ListenableFuture<List<FileBasedSource<T>>>> futures = new ArrayList<ListenableFuture<List<FileBasedSource<T>>>>();
            ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(128));
            try {
                for (String file : FileBasedSource.expandFilePattern(this.fileOrPatternSpec)) {
                    futures.add(this.createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
                }
                ImmutableList splitResults = ImmutableList.copyOf(Iterables.concat((Iterable)Futures.allAsList(futures).get()));
                LOG.debug("Splitting the source based on file pattern " + this.fileOrPatternSpec + " took " + (System.currentTimeMillis() - startTime) + " ms");
                ImmutableList immutableList = splitResults;
                return immutableList;
            }
            finally {
                service.shutdown();
            }
        }
        if (this.isSplittable()) {
            ArrayList<FileBasedSource> splitResults = new ArrayList<FileBasedSource>();
            for (OffsetBasedSource split : super.splitIntoBundles(desiredBundleSizeBytes, options)) {
                splitResults.add((FileBasedSource)split);
            }
            return splitResults;
        }
        LOG.debug("The source for file " + this.fileOrPatternSpec + " is not split into sub-range based sources since the file is not seekable");
        return ImmutableList.of(this);
    }

    protected boolean isSplittable() throws Exception {
        IOChannelFactory factory = IOChannelUtils.getFactory(this.fileOrPatternSpec);
        return factory.isReadSeekEfficient(this.fileOrPatternSpec);
    }

    @Override
    public final BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
        this.validate();
        if (this.mode == Mode.FILEPATTERN) {
            long startTime = System.currentTimeMillis();
            Collection<String> files = FileBasedSource.expandFilePattern(this.fileOrPatternSpec);
            ArrayList fileReaders = new ArrayList();
            for (String fileName : files) {
                long endOffset;
                try {
                    endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
                }
                catch (IOException e) {
                    LOG.warn("Failed to get size of " + fileName, (Throwable)e);
                    endOffset = Long.MAX_VALUE;
                }
                fileReaders.add(this.createForSubrangeOfFile(fileName, 0L, endOffset).createSingleFileReader(options));
            }
            LOG.debug("Creating a reader for file pattern " + this.fileOrPatternSpec + " took " + (System.currentTimeMillis() - startTime) + " ms");
            if (fileReaders.size() == 1) {
                return (BoundedSource.BoundedReader)fileReaders.get(0);
            }
            return new FilePatternReader(this, fileReaders);
        }
        return this.createSingleFileReader(options);
    }

    @Override
    public String toString() {
        switch (this.mode) {
            case FILEPATTERN: {
                return this.fileOrPatternSpec;
            }
            case SINGLE_FILE_OR_SUBRANGE: {
                return this.fileOrPatternSpec + " range " + super.toString();
            }
        }
        throw new IllegalStateException("Unexpected mode: " + (Object)((Object)this.mode));
    }

    @Override
    public void validate() {
        super.validate();
        switch (this.mode) {
            case FILEPATTERN: {
                Preconditions.checkArgument(this.getStartOffset() == 0L, "FileBasedSource is based on a file pattern or a full single file but the starting offset proposed " + this.getStartOffset() + " is not zero");
                Preconditions.checkArgument(this.getEndOffset() == Long.MAX_VALUE, "FileBasedSource is based on a file pattern or a full single file but the ending offset proposed " + this.getEndOffset() + " is not Long.MAX_VALUE");
                break;
            }
            case SINGLE_FILE_OR_SUBRANGE: {
                break;
            }
            default: {
                throw new IllegalStateException("Unknown mode: " + (Object)((Object)this.mode));
            }
        }
    }

    @Override
    public final long getMaxEndOffset(PipelineOptions options) throws Exception {
        if (this.mode == Mode.FILEPATTERN) {
            throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
        }
        if (this.getEndOffset() == Long.MAX_VALUE) {
            IOChannelFactory factory = IOChannelUtils.getFactory(this.fileOrPatternSpec);
            return factory.getSizeBytes(this.fileOrPatternSpec);
        }
        return this.getEndOffset();
    }

    protected static final Collection<String> expandFilePattern(String fileOrPatternSpec) throws IOException {
        IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
        Collection<String> matches = factory.match(fileOrPatternSpec);
        LOG.info("Matched {} files for pattern {}", (Object)matches.size(), (Object)fileOrPatternSpec);
        return matches;
    }

    private class FilePatternReader
    extends BoundedSource.BoundedReader<T> {
        private final FileBasedSource<T> source;
        private final List<FileBasedReader<T>> fileReaders;
        final ListIterator<FileBasedReader<T>> fileReadersIterator;
        FileBasedReader<T> currentReader = null;

        public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>> fileReaders) {
            this.source = source;
            this.fileReaders = fileReaders;
            this.fileReadersIterator = fileReaders.listIterator();
        }

        @Override
        public boolean start() throws IOException {
            return this.startNextNonemptyReader();
        }

        @Override
        public boolean advance() throws IOException {
            Preconditions.checkState(this.currentReader != null, "Call start() before advance()");
            if (this.currentReader.advance()) {
                return true;
            }
            return this.startNextNonemptyReader();
        }

        private boolean startNextNonemptyReader() throws IOException {
            while (this.fileReadersIterator.hasNext()) {
                this.currentReader = this.fileReadersIterator.next();
                if (this.currentReader.start()) {
                    return true;
                }
                this.currentReader.close();
            }
            return false;
        }

        @Override
        public T getCurrent() throws NoSuchElementException {
            return this.currentReader.getCurrent();
        }

        @Override
        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return this.currentReader.getCurrentTimestamp();
        }

        @Override
        public void close() throws IOException {
            if (this.currentReader != null) {
                this.currentReader.close();
            }
            while (this.fileReadersIterator.hasNext()) {
                this.fileReadersIterator.next().close();
            }
        }

        @Override
        public FileBasedSource<T> getCurrentSource() {
            return this.source;
        }

        @Override
        public FileBasedSource<T> splitAtFraction(double fraction) {
            LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
            return null;
        }

        @Override
        public Double getFractionConsumed() {
            int numReaders;
            if (this.currentReader == null) {
                return 0.0;
            }
            if (this.fileReaders.isEmpty()) {
                return 1.0;
            }
            int index = this.fileReadersIterator.previousIndex();
            if (index == (numReaders = this.fileReaders.size())) {
                return 1.0;
            }
            double before = 1.0 * (double)index / (double)numReaders;
            double after = 1.0 * (double)(index + 1) / (double)numReaders;
            Double fractionOfCurrentReader = this.currentReader.getFractionConsumed();
            if (fractionOfCurrentReader == null) {
                return before;
            }
            return before + fractionOfCurrentReader * (after - before);
        }
    }

    public static abstract class FileBasedReader<T>
    extends OffsetBasedSource.OffsetBasedReader<T> {
        private ReadableByteChannel channel = null;

        public FileBasedReader(FileBasedSource<T> source) {
            super(source);
            Preconditions.checkArgument(source.getMode() != Mode.FILEPATTERN, "FileBasedReader does not support reading file patterns");
        }

        @Override
        public synchronized FileBasedSource<T> getCurrentSource() {
            return (FileBasedSource)super.getCurrentSource();
        }

        @Override
        protected final boolean startImpl() throws IOException {
            OffsetBasedSource source = this.getCurrentSource();
            IOChannelFactory factory = IOChannelUtils.getFactory(((FileBasedSource)source).getFileOrPatternSpec());
            this.channel = factory.open(((FileBasedSource)source).getFileOrPatternSpec());
            if (this.channel instanceof SeekableByteChannel) {
                SeekableByteChannel seekChannel = (SeekableByteChannel)this.channel;
                seekChannel.position(source.getStartOffset());
            } else {
                Preconditions.checkArgument(((FileBasedSource)source).mode != Mode.SINGLE_FILE_OR_SUBRANGE, "Subrange-based sources must only be defined for file types that support seekable  read channels");
                Preconditions.checkArgument(source.getStartOffset() == 0L, "Start offset " + source.getStartOffset() + " is not zero but channel for reading the file is not seekable.");
            }
            this.startReading(this.channel);
            return this.advanceImpl();
        }

        @Override
        protected final boolean advanceImpl() throws IOException {
            return this.readNextRecord();
        }

        @Override
        public void close() throws IOException {
            if (this.channel != null) {
                this.channel.close();
            }
        }

        protected abstract void startReading(ReadableByteChannel var1) throws IOException;

        protected abstract boolean readNextRecord() throws IOException;
    }

    public static enum Mode {
        FILEPATTERN,
        SINGLE_FILE_OR_SUBRANGE;

    }
}

