/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.io.BlockBasedSource;
import com.google.cloud.dataflow.sdk.io.FileBasedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.util.AvroUtils;
import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.Arrays;
import java.util.Collection;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import javax.annotation.concurrent.GuardedBy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
import org.apache.commons.compress.utils.CountingInputStream;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class AvroSource<T>
extends BlockBasedSource<T> {
    static final long DEFAULT_MIN_BUNDLE_SIZE = 128000L;
    private final String readSchemaString;
    private final String fileSchemaString;
    private final Class<T> type;
    private final String codec;
    private final byte[] syncMarker;
    private transient AvroCoder<T> coder = null;
    private transient Schema fileSchema;
    private transient Schema readSchema;

    public static <T> Read.Bounded<T> readFromFileWithClass(String filePattern, Class<T> clazz) {
        return Read.from(new AvroSource<T>(filePattern, 128000L, ReflectData.get().getSchema(clazz).toString(), clazz, null, null));
    }

    public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
        return new AvroSource<GenericRecord>(fileNameOrPattern, 128000L, null, GenericRecord.class, null, null);
    }

    public AvroSource<GenericRecord> withSchema(String schema) {
        return new AvroSource<GenericRecord>(this.getFileOrPatternSpec(), this.getMinBundleSize(), schema, GenericRecord.class, this.codec, this.syncMarker);
    }

    public AvroSource<GenericRecord> withSchema(Schema schema) {
        return new AvroSource<GenericRecord>(this.getFileOrPatternSpec(), this.getMinBundleSize(), schema.toString(), GenericRecord.class, this.codec, this.syncMarker);
    }

    public <X> AvroSource<X> withSchema(Class<X> clazz) {
        return new AvroSource<X>(this.getFileOrPatternSpec(), this.getMinBundleSize(), ReflectData.get().getSchema(clazz).toString(), clazz, this.codec, this.syncMarker);
    }

    public AvroSource<T> withMinBundleSize(long minBundleSize) {
        return new AvroSource<T>(this.getFileOrPatternSpec(), minBundleSize, this.readSchemaString, this.type, this.codec, this.syncMarker);
    }

    private AvroSource(String fileNameOrPattern, long minBundleSize, String schema, Class<T> type, String codec, byte[] syncMarker) {
        super(fileNameOrPattern, minBundleSize);
        this.readSchemaString = schema;
        this.codec = codec;
        this.syncMarker = syncMarker;
        this.type = type;
        this.fileSchemaString = null;
    }

    private AvroSource(String fileName, long minBundleSize, long startOffset, long endOffset, String schema, Class<T> type, String codec, byte[] syncMarker, String fileSchema) {
        super(fileName, minBundleSize, startOffset, endOffset);
        this.readSchemaString = schema;
        this.codec = codec;
        this.syncMarker = syncMarker;
        this.type = type;
        this.fileSchemaString = fileSchema;
    }

    @Override
    public void validate() {
        super.validate();
    }

    @Override
    public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
        byte[] syncMarker = this.syncMarker;
        String codec = this.codec;
        String readSchemaString = this.readSchemaString;
        String fileSchemaString = this.fileSchemaString;
        if (codec == null || syncMarker == null || fileSchemaString == null) {
            AvroUtils.AvroMetadata metadata;
            try {
                Collection<String> files = FileBasedSource.expandFilePattern(fileName);
                Preconditions.checkArgument(files.size() <= 1, "More than 1 file matched %s");
                metadata = AvroUtils.readMetadataFromFile(fileName);
            }
            catch (IOException e) {
                throw new RuntimeException("Error reading metadata from file " + fileName, e);
            }
            codec = metadata.getCodec();
            syncMarker = metadata.getSyncMarker();
            fileSchemaString = metadata.getSchemaString();
            if (readSchemaString == null) {
                readSchemaString = metadata.getSchemaString();
            }
        }
        return new AvroSource<T>(fileName, this.getMinBundleSize(), start, end, readSchemaString, this.type, codec, syncMarker, fileSchemaString);
    }

    @Override
    protected BlockBasedSource.BlockBasedReader<T> createSingleFileReader(PipelineOptions options) {
        return new AvroReader(this);
    }

    @Override
    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
        return false;
    }

    @Override
    public AvroCoder<T> getDefaultOutputCoder() {
        if (this.coder == null) {
            Schema.Parser parser = new Schema.Parser();
            this.coder = AvroCoder.of(this.type, parser.parse(this.readSchemaString));
        }
        return this.coder;
    }

    public String getSchema() {
        return this.readSchemaString;
    }

    private Schema getReadSchema() {
        if (this.readSchemaString == null) {
            return null;
        }
        if (this.readSchema == null) {
            Schema.Parser parser = new Schema.Parser();
            this.readSchema = parser.parse(this.readSchemaString);
        }
        return this.readSchema;
    }

    private Schema getFileSchema() {
        if (this.fileSchemaString == null) {
            return null;
        }
        if (this.fileSchema == null) {
            Schema.Parser parser = new Schema.Parser();
            this.fileSchema = parser.parse(this.fileSchemaString);
        }
        return this.fileSchema;
    }

    private byte[] getSyncMarker() {
        return this.syncMarker;
    }

    private String getCodec() {
        return this.codec;
    }

    private DatumReader<T> createDatumReader() {
        Schema readSchema = this.getReadSchema();
        Schema fileSchema = this.getFileSchema();
        Preconditions.checkNotNull(readSchema, "No read schema has been initialized for source %s", this);
        Preconditions.checkNotNull(fileSchema, "No file schema has been initialized for source %s", this);
        if (this.type == GenericRecord.class) {
            return new GenericDatumReader(fileSchema, readSchema);
        }
        return new ReflectDatumReader(fileSchema, readSchema);
    }

    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    public static class AvroReader<T>
    extends BlockBasedSource.BlockBasedReader<T> {
        private AvroBlock<T> currentBlock;
        private final Object progressLock = new Object();
        @GuardedBy(value="progressLock")
        private long currentBlockOffset = 0L;
        @GuardedBy(value="progressLock")
        private long currentBlockSizeBytes = 0L;
        private PushbackInputStream stream;
        private CountingInputStream countStream;
        private BinaryDecoder decoder;

        public AvroReader(AvroSource<T> source) {
            super(source);
        }

        @Override
        public synchronized AvroSource<T> getCurrentSource() {
            return (AvroSource)super.getCurrentSource();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean readNextBlock() throws IOException {
            long numRecords;
            long startOfNextBlock;
            Object object = this.progressLock;
            synchronized (object) {
                startOfNextBlock = this.currentBlockOffset + this.currentBlockSizeBytes;
            }
            long preHeaderCount = this.countStream.getBytesRead();
            this.decoder = DecoderFactory.get().directBinaryDecoder((InputStream)this.countStream, this.decoder);
            try {
                numRecords = this.decoder.readLong();
            }
            catch (EOFException e) {
                return false;
            }
            long blockSize = this.decoder.readLong();
            long headerSize = this.countStream.getBytesRead() - preHeaderCount;
            byte[] data = new byte[(int)blockSize];
            int read = this.stream.read(data);
            Preconditions.checkState(blockSize == (long)read, "Only %s/%s bytes in the block were read", read, blockSize);
            this.currentBlock = new AvroBlock(data, numRecords, this.getCurrentSource());
            byte[] syncMarker = ((AvroSource)this.getCurrentSource()).getSyncMarker();
            byte[] readSyncMarker = new byte[syncMarker.length];
            long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
            long bytesRead = this.stream.read(readSyncMarker);
            Preconditions.checkState(bytesRead == (long)syncMarker.length, "When trying to read a sync marker at position %s, only able to read %s/%s bytes", syncMarkerOffset, bytesRead, syncMarker.length);
            if (!Arrays.equals(syncMarker, readSyncMarker)) {
                throw new IllegalStateException(String.format("Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s", syncMarkerOffset, syncMarkerOffset + (long)syncMarker.length, this.getCurrentSource().getFileOrPatternSpec(), Arrays.toString(readSyncMarker)));
            }
            Object object2 = this.progressLock;
            synchronized (object2) {
                this.currentBlockOffset = startOfNextBlock;
                this.currentBlockSizeBytes = headerSize + blockSize + (long)syncMarker.length;
            }
            return true;
        }

        @Override
        public AvroBlock<T> getCurrentBlock() {
            return this.currentBlock;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public long getCurrentBlockOffset() {
            Object object = this.progressLock;
            synchronized (object) {
                return this.currentBlockOffset;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public long getCurrentBlockSize() {
            Object object = this.progressLock;
            synchronized (object) {
                return this.currentBlockSizeBytes;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public long getSplitPointsRemaining() {
            if (this.isDone()) {
                return 0L;
            }
            Object object = this.progressLock;
            synchronized (object) {
                if (this.currentBlockOffset + this.currentBlockSizeBytes >= this.getCurrentSource().getEndOffset()) {
                    return 1L;
                }
            }
            return super.getSplitPointsRemaining();
        }

        private PushbackInputStream createStream(ReadableByteChannel channel) {
            return new PushbackInputStream(Channels.newInputStream(channel), ((AvroSource)this.getCurrentSource()).getSyncMarker().length);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void startReading(ReadableByteChannel channel) throws IOException {
            long startOffset = this.getCurrentSource().getStartOffset();
            byte[] syncMarker = ((AvroSource)this.getCurrentSource()).getSyncMarker();
            long syncMarkerLength = syncMarker.length;
            if (startOffset != 0L) {
                long position = Math.max(0L, startOffset - syncMarkerLength);
                ((SeekableByteChannel)channel).position(position);
                startOffset = position;
            }
            this.stream = this.createStream(channel);
            this.countStream = new CountingInputStream((InputStream)this.stream);
            Object object = this.progressLock;
            synchronized (object) {
                this.currentBlockOffset = startOffset + AvroReader.advancePastNextSyncMarker(this.stream, syncMarker);
                this.currentBlockSizeBytes = 0L;
            }
        }

        static long advancePastNextSyncMarker(PushbackInputStream stream, byte[] syncMarker) throws IOException {
            int read;
            Seeker seeker = new Seeker(syncMarker);
            byte[] syncBuffer = new byte[syncMarker.length];
            long totalBytesConsumed = 0L;
            int mark = -1;
            do {
                if ((read = stream.read(syncBuffer)) < 0) continue;
                mark = seeker.find(syncBuffer, read);
                totalBytesConsumed += (long)read;
            } while (mark < 0 && read > 0);
            if (mark >= 0) {
                stream.unread(syncBuffer, mark + 1, read - (mark + 1));
                totalBytesConsumed -= (long)(read - (mark + 1));
            }
            return totalBytesConsumed;
        }

        static class Seeker {
            private byte[] marker;
            private byte[] searchBuffer;
            private int available = 0;

            public Seeker(byte[] marker) {
                this.marker = marker;
                this.searchBuffer = new byte[marker.length];
            }

            public int find(byte[] buffer, int length) {
                for (int i = 0; i < length; ++i) {
                    System.arraycopy(this.searchBuffer, 1, this.searchBuffer, 0, this.searchBuffer.length - 1);
                    this.searchBuffer[this.searchBuffer.length - 1] = buffer[i];
                    this.available = Math.min(this.available + 1, this.searchBuffer.length);
                    if (!ByteBuffer.wrap(this.searchBuffer, this.searchBuffer.length - this.available, this.available).equals(ByteBuffer.wrap(this.marker))) continue;
                    this.available = 0;
                    return i;
                }
                return -1;
            }
        }
    }

    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    static class AvroBlock<T>
    extends BlockBasedSource.Block<T> {
        private final long numRecords;
        private T currentRecord;
        private long currentRecordIndex = 0L;
        private final DatumReader<T> reader;
        private final BinaryDecoder decoder;

        private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException {
            ByteArrayInputStream byteStream = new ByteArrayInputStream(data);
            switch (codec) {
                case "snappy": {
                    return new SnappyCompressorInputStream((InputStream)byteStream, 65536);
                }
                case "deflate": {
                    Inflater inflater = new Inflater(true);
                    return new InflaterInputStream(byteStream, inflater);
                }
                case "xz": {
                    return new XZCompressorInputStream((InputStream)byteStream);
                }
                case "bzip2": {
                    return new BZip2CompressorInputStream((InputStream)byteStream);
                }
                case "null": {
                    return byteStream;
                }
            }
            throw new IllegalArgumentException("Unsupported codec: " + codec);
        }

        AvroBlock(byte[] data, long numRecords, AvroSource<T> source) throws IOException {
            this.numRecords = numRecords;
            this.reader = ((AvroSource)source).createDatumReader();
            this.decoder = DecoderFactory.get().binaryDecoder(AvroBlock.decodeAsInputStream(data, ((AvroSource)source).getCodec()), null);
        }

        @Override
        public T getCurrentRecord() {
            return this.currentRecord;
        }

        @Override
        public boolean readNextRecord() throws IOException {
            if (this.currentRecordIndex >= this.numRecords) {
                return false;
            }
            this.currentRecord = this.reader.read(null, (Decoder)this.decoder);
            ++this.currentRecordIndex;
            return true;
        }

        @Override
        public double getFractionOfBlockConsumed() {
            return (double)this.currentRecordIndex / (double)this.numRecords;
        }
    }
}

