/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.testing;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Source;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceTestUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SourceTestUtils.class);
    private static final int MAX_CONCURRENT_SPLITTING_TRIALS_PER_ITEM = 100;
    private static final int MAX_CONCURRENT_SPLITTING_TRIALS_TOTAL = 1000;

    public static <T> List<ReadableStructuralValue<T>> createStructuralValues(Coder<T> coder, List<T> list) throws Exception {
        ArrayList<ReadableStructuralValue<T>> result = new ArrayList<ReadableStructuralValue<T>>();
        for (T elem : list) {
            result.add(new ReadableStructuralValue<T>(elem, coder.structuralValue(elem)));
        }
        return result;
    }

    public static <T> List<T> readFromSource(BoundedSource<T> source, PipelineOptions options) throws IOException {
        try (BoundedSource.BoundedReader<T> reader = source.createReader(options);){
            List<T> list = SourceTestUtils.readFromUnstartedReader(reader);
            return list;
        }
    }

    public static <T> List<T> readFromUnstartedReader(Source.Reader<T> reader) throws IOException {
        return SourceTestUtils.readRemainingFromReader(reader, false);
    }

    public static <T> List<T> readFromStartedReader(Source.Reader<T> reader) throws IOException {
        return SourceTestUtils.readRemainingFromReader(reader, true);
    }

    public static <T> List<T> readNItemsFromUnstartedReader(Source.Reader<T> reader, int n) throws IOException {
        return SourceTestUtils.readNItemsFromReader(reader, n, false);
    }

    public static <T> List<T> readNItemsFromStartedReader(Source.Reader<T> reader, int n) throws IOException {
        return SourceTestUtils.readNItemsFromReader(reader, n, true);
    }

    private static <T> List<T> readNItemsFromReader(Source.Reader<T> reader, int n, boolean started) throws IOException {
        ArrayList<T> res = new ArrayList<T>();
        for (int i = 0; i < n; ++i) {
            boolean more;
            boolean shouldStart = i == 0 && !started;
            boolean bl = more = shouldStart ? reader.start() : reader.advance();
            if (n != Integer.MAX_VALUE) {
                Assert.assertTrue((boolean)more);
            }
            if (!more) break;
            res.add(reader.getCurrent());
        }
        return res;
    }

    public static <T> List<T> readRemainingFromReader(Source.Reader<T> reader, boolean started) throws IOException {
        return SourceTestUtils.readNItemsFromReader(reader, Integer.MAX_VALUE, started);
    }

    public static <T> void assertSourcesEqualReferenceSource(BoundedSource<T> referenceSource, List<? extends BoundedSource<T>> sources, PipelineOptions options) throws Exception {
        Coder coder = referenceSource.getDefaultOutputCoder();
        List<T> referenceRecords = SourceTestUtils.readFromSource(referenceSource, options);
        ArrayList<T> bundleRecords = new ArrayList<T>();
        for (BoundedSource<T> source : sources) {
            Assert.assertThat((String)("Coder type for source " + source + " is not compatible with Coder type for referenceSource " + referenceSource), source.getDefaultOutputCoder(), (Matcher)Matchers.equalTo(coder));
            List<T> elems = SourceTestUtils.readFromSource(source, options);
            bundleRecords.addAll(elems);
        }
        List bundleValues = SourceTestUtils.createStructuralValues(coder, bundleRecords);
        List referenceValues = SourceTestUtils.createStructuralValues(coder, referenceRecords);
        Assert.assertThat(bundleValues, (Matcher)Matchers.containsInAnyOrder((Object[])referenceValues.toArray()));
    }

    public static <T> void assertUnstartedReaderReadsSameAsItsSource(BoundedSource.BoundedReader<T> reader, PipelineOptions options) throws Exception {
        Coder coder = reader.getCurrentSource().getDefaultOutputCoder();
        List<T> expected = SourceTestUtils.readFromUnstartedReader(reader);
        List<T> actual = SourceTestUtils.readFromSource(reader.getCurrentSource(), options);
        List expectedStructural = SourceTestUtils.createStructuralValues(coder, expected);
        List actualStructural = SourceTestUtils.createStructuralValues(coder, actual);
        Assert.assertThat(actualStructural, (Matcher)Matchers.containsInAnyOrder((Object[])expectedStructural.toArray()));
    }

    public static <T> SplitAtFractionResult assertSplitAtFractionBehavior(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, ExpectedSplitOutcome expectedOutcome, PipelineOptions options) throws Exception {
        return SourceTestUtils.assertSplitAtFractionBehaviorImpl(source, SourceTestUtils.readFromSource(source, options), numItemsToReadBeforeSplit, splitFraction, expectedOutcome, options);
    }

    private static <T> void assertListsEqualInOrder(String message, String expectedLabel, List<T> expected, String actualLabel, List<T> actual) {
        int i;
        for (i = 0; i < expected.size() && i < actual.size(); ++i) {
            if (Objects.equals(expected.get(i), actual.get(i))) continue;
            Assert.fail((String)String.format("%s: %s and %s have %d items in common and then differ. Item in %s (%d more): %s, item in %s (%d more): %s", message, expectedLabel, actualLabel, i, expectedLabel, expected.size() - i - 1, expected.get(i), actualLabel, actual.size() - i - 1, actual.get(i)));
        }
        if (i < expected.size()) {
            Assert.fail((String)String.format("%s: %s has %d more items after matching all %d from %s. First 5: %s", message, expectedLabel, expected.size() - actual.size(), actual.size(), actualLabel, expected.subList(actual.size(), Math.min(expected.size(), actual.size() + 5))));
        } else if (i < actual.size()) {
            Assert.fail((String)String.format("%s: %s has %d more items after matching all %d from %s. First 5: %s", message, actualLabel, actual.size() - expected.size(), expected.size(), expectedLabel, actual.subList(expected.size(), Math.min(actual.size(), expected.size() + 5))));
        }
    }

    private static <T> SplitAtFractionResult assertSplitAtFractionBehaviorImpl(BoundedSource<T> source, List<T> expectedItems, int numItemsToReadBeforeSplit, double splitFraction, ExpectedSplitOutcome expectedOutcome, PipelineOptions options) throws Exception {
        try (BoundedSource.BoundedReader<T> reader = source.createReader(options);){
            Source originalSource = reader.getCurrentSource();
            List<T> currentItems = SourceTestUtils.readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplit);
            BoundedSource<T> residual = reader.splitAtFraction(splitFraction);
            if (residual != null) {
                Assert.assertFalse((String)String.format("Primary source didn't change after a successful split of %s at %f after reading %d items. Was the source object mutated instead of creating a new one? Source objects MUST be immutable.", source, splitFraction, numItemsToReadBeforeSplit), (reader.getCurrentSource() == originalSource ? 1 : 0) != 0);
                Assert.assertFalse((String)String.format("Residual source equal to original source after a successful split of %s at %f after reading %d items. Was the source object mutated instead of creating a new one? Source objects MUST be immutable.", source, splitFraction, numItemsToReadBeforeSplit), (reader.getCurrentSource() == residual ? 1 : 0) != 0);
            }
            switch (expectedOutcome) {
                case MUST_SUCCEED_AND_BE_CONSISTENT: {
                    Assert.assertNotNull((String)("Failed to split reader of source: " + source + " at " + splitFraction + " after reading " + numItemsToReadBeforeSplit + " items"), residual);
                    break;
                }
                case MUST_FAIL: {
                    Assert.assertEquals(null, residual);
                    break;
                }
            }
            currentItems.addAll(SourceTestUtils.readRemainingFromReader(reader, numItemsToReadBeforeSplit > 0));
            Source primary = reader.getCurrentSource();
            SplitAtFractionResult splitAtFractionResult = SourceTestUtils.verifySingleSplitAtFractionResult(source, expectedItems, currentItems, primary, residual, numItemsToReadBeforeSplit, splitFraction, options);
            return splitAtFractionResult;
        }
    }

    private static <T> SplitAtFractionResult verifySingleSplitAtFractionResult(BoundedSource<T> source, List<T> expectedItems, List<T> currentItems, BoundedSource<T> primary, BoundedSource<T> residual, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws Exception {
        List<T> primaryItems = SourceTestUtils.readFromSource(primary, options);
        if (residual != null) {
            List<T> residualItems = SourceTestUtils.readFromSource(residual, options);
            ArrayList<T> totalItems = new ArrayList<T>();
            totalItems.addAll(primaryItems);
            totalItems.addAll(residualItems);
            String errorMsgForPrimarySourceComp = String.format("Continued reading after split yielded different items than primary source: split at %s after reading %s items, original source: %s, primary source: %s", splitFraction, numItemsToReadBeforeSplit, source, primary);
            String errorMsgForTotalSourceComp = String.format("Items in primary and residual sources after split do not add up to items in the original source. Split at %s after reading %s items; original source: %s, primary: %s, residual: %s", splitFraction, numItemsToReadBeforeSplit, source, primary, residual);
            Coder coder = primary.getDefaultOutputCoder();
            List primaryValues = SourceTestUtils.createStructuralValues(coder, primaryItems);
            List currentValues = SourceTestUtils.createStructuralValues(coder, currentItems);
            List expectedValues = SourceTestUtils.createStructuralValues(coder, expectedItems);
            List totalValues = SourceTestUtils.createStructuralValues(coder, totalItems);
            SourceTestUtils.assertListsEqualInOrder(errorMsgForPrimarySourceComp, "current", currentValues, "primary", primaryValues);
            SourceTestUtils.assertListsEqualInOrder(errorMsgForTotalSourceComp, "total", expectedValues, "primary+residual", totalValues);
            return new SplitAtFractionResult(primaryItems.size(), residualItems.size());
        }
        return new SplitAtFractionResult(primaryItems.size(), -1);
    }

    public static <T> void assertSplitAtFractionSucceedsAndConsistent(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws Exception {
        SourceTestUtils.assertSplitAtFractionBehavior(source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
    }

    public static <T> void assertSplitAtFractionFails(BoundedSource<T> source, int numItemsToReadBeforeSplit, double splitFraction, PipelineOptions options) throws Exception {
        SourceTestUtils.assertSplitAtFractionBehavior(source, numItemsToReadBeforeSplit, splitFraction, ExpectedSplitOutcome.MUST_FAIL, options);
    }

    private static <T> void assertSplitAtFractionBinary(BoundedSource<T> source, List<T> expectedItems, int numItemsToBeReadBeforeSplit, double leftFraction, SplitAtFractionResult leftResult, double rightFraction, SplitAtFractionResult rightResult, PipelineOptions options, SplitFractionStatistics stats) throws Exception {
        if (rightFraction - leftFraction < 0.001) {
            return;
        }
        double middleFraction = (rightFraction + leftFraction) / 2.0;
        if (leftResult == null) {
            leftResult = SourceTestUtils.assertSplitAtFractionBehaviorImpl(source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
        }
        if (rightResult == null) {
            rightResult = SourceTestUtils.assertSplitAtFractionBehaviorImpl(source, expectedItems, numItemsToBeReadBeforeSplit, rightFraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
        }
        SplitAtFractionResult middleResult = SourceTestUtils.assertSplitAtFractionBehaviorImpl(source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
        if (middleResult.numResidualItems != -1) {
            stats.successfulFractions.add(middleFraction);
        }
        if (middleResult.numResidualItems > 0) {
            stats.nonTrivialFractions.add(middleFraction);
        }
        if (leftResult.numPrimaryItems != middleResult.numPrimaryItems) {
            SourceTestUtils.assertSplitAtFractionBinary(source, expectedItems, numItemsToBeReadBeforeSplit, leftFraction, leftResult, middleFraction, middleResult, options, stats);
        }
        if (rightResult.numPrimaryItems != middleResult.numPrimaryItems) {
            SourceTestUtils.assertSplitAtFractionBinary(source, expectedItems, numItemsToBeReadBeforeSplit, middleFraction, middleResult, rightFraction, rightResult, options, stats);
        }
    }

    public static <T> void assertSplitAtFractionExhaustive(BoundedSource<T> source, PipelineOptions options) throws Exception {
        int i;
        List<T> expectedItems = SourceTestUtils.readFromSource(source, options);
        Assert.assertFalse((String)"Empty source", (boolean)expectedItems.isEmpty());
        Assert.assertFalse((String)"Source reads a single item", (expectedItems.size() == 1 ? 1 : 0) != 0);
        ArrayList<List<Double>> allNonTrivialFractions = new ArrayList<List<Double>>();
        boolean anySuccessfulFractions = false;
        boolean anyNonTrivialFractions = false;
        for (i = 0; i < expectedItems.size(); ++i) {
            SplitFractionStatistics stats = new SplitFractionStatistics();
            SourceTestUtils.assertSplitAtFractionBinary(source, expectedItems, i, 0.0, null, 1.0, null, options, stats);
            if (!stats.successfulFractions.isEmpty()) {
                anySuccessfulFractions = true;
            }
            if (!stats.nonTrivialFractions.isEmpty()) {
                anyNonTrivialFractions = true;
            }
            allNonTrivialFractions.add(stats.nonTrivialFractions);
        }
        Assert.assertTrue((String)"splitAtFraction test completed vacuously: no successful split fractions found", (boolean)anySuccessfulFractions);
        Assert.assertTrue((String)"splitAtFraction test completed vacuously: no non-trivial split fractions found", (boolean)anyNonTrivialFractions);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        int numTotalTrials = 0;
        for (i = 0; i < expectedItems.size(); ++i) {
            int numTrials;
            block8: {
                double minNonTrivialFraction = 2.0;
                Iterator iterator = ((List)allNonTrivialFractions.get(i)).iterator();
                while (iterator.hasNext()) {
                    double fraction = (Double)iterator.next();
                    minNonTrivialFraction = Math.min(minNonTrivialFraction, fraction);
                }
                if (minNonTrivialFraction == 2.0) continue;
                numTrials = 0;
                boolean haveSuccess = false;
                boolean haveFailure = false;
                do {
                    if (++numTrials > 100) {
                        LOG.warn("After {} concurrent splitting trials at item #{}, observed only {}, giving up on this item", new Object[]{numTrials, i, haveSuccess ? "success" : "failure"});
                        break block8;
                    }
                    if (SourceTestUtils.assertSplitAtFractionConcurrent(executor, source, expectedItems, i, minNonTrivialFraction, options)) {
                        haveSuccess = true;
                        continue;
                    }
                    haveFailure = true;
                } while (!haveSuccess || !haveFailure);
                LOG.info("{} trials to observe both success and failure of concurrent splitting at item #{}", (Object)numTrials, (Object)i);
            }
            if ((numTotalTrials += numTrials) <= 1000) continue;
            LOG.warn("After {} total concurrent splitting trials, considered only {} items, giving up.", (Object)numTotalTrials, (Object)i);
            break;
        }
        LOG.info("{} total concurrent splitting trials for {} items", (Object)numTotalTrials, (Object)expectedItems.size());
    }

    private static <T> boolean assertSplitAtFractionConcurrent(ExecutorService executor, BoundedSource<T> source, List<T> expectedItems, final int numItemsToReadBeforeSplitting, final double fraction, PipelineOptions options) throws Exception {
        final BoundedSource.BoundedReader<T> reader = source.createReader(options);
        final CountDownLatch unblockSplitter = new CountDownLatch(1);
        Future readerThread = executor.submit(new Callable<List<T>>(){

            @Override
            public List<T> call() throws Exception {
                try {
                    List items = SourceTestUtils.readNItemsFromUnstartedReader(reader, numItemsToReadBeforeSplitting);
                    unblockSplitter.countDown();
                    items.addAll(SourceTestUtils.readRemainingFromReader(reader, numItemsToReadBeforeSplitting > 0));
                    List list = items;
                    return list;
                }
                finally {
                    reader.close();
                }
            }
        });
        Future splitterThread = executor.submit(new Callable<KV<BoundedSource<T>, BoundedSource<T>>>(){

            @Override
            public KV<BoundedSource<T>, BoundedSource<T>> call() throws Exception {
                unblockSplitter.await();
                BoundedSource residual = reader.splitAtFraction(fraction);
                if (residual == null) {
                    return null;
                }
                return KV.of(reader.getCurrentSource(), residual);
            }
        });
        List currentItems = (List)readerThread.get();
        KV splitSources = (KV)splitterThread.get();
        if (splitSources == null) {
            return false;
        }
        SplitAtFractionResult res = SourceTestUtils.verifySingleSplitAtFractionResult(source, expectedItems, currentItems, (BoundedSource)splitSources.getKey(), (BoundedSource)splitSources.getValue(), numItemsToReadBeforeSplitting, fraction, options);
        return res.numResidualItems > 0;
    }

    public static <T> BoundedSource<T> toUnsplittableSource(BoundedSource<T> boundedSource) {
        return new UnsplittableSource(boundedSource);
    }

    private static class UnsplittableSource<T>
    extends BoundedSource<T> {
        private final BoundedSource<T> boundedSource;

        private UnsplittableSource(BoundedSource<T> boundedSource) {
            this.boundedSource = Preconditions.checkNotNull(boundedSource, "boundedSource");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            this.boundedSource.populateDisplayData(builder);
        }

        @Override
        public List<? extends BoundedSource<T>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            return ImmutableList.of(this);
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
            return this.boundedSource.getEstimatedSizeBytes(options);
        }

        @Override
        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return this.boundedSource.producesSortedKeys(options);
        }

        @Override
        public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException {
            return new UnsplittableReader(this.boundedSource, this.boundedSource.createReader(options));
        }

        @Override
        public void validate() {
            this.boundedSource.validate();
        }

        @Override
        public Coder<T> getDefaultOutputCoder() {
            return this.boundedSource.getDefaultOutputCoder();
        }

        private static class UnsplittableReader<T>
        extends BoundedSource.BoundedReader<T> {
            private final BoundedSource<T> boundedSource;
            private final BoundedSource.BoundedReader<T> boundedReader;

            private UnsplittableReader(BoundedSource<T> boundedSource, BoundedSource.BoundedReader<T> boundedReader) {
                this.boundedSource = Preconditions.checkNotNull(boundedSource, "boundedSource");
                this.boundedReader = Preconditions.checkNotNull(boundedReader, "boundedReader");
            }

            @Override
            public BoundedSource<T> getCurrentSource() {
                return this.boundedSource;
            }

            @Override
            public boolean start() throws IOException {
                return this.boundedReader.start();
            }

            @Override
            public boolean advance() throws IOException {
                return this.boundedReader.advance();
            }

            @Override
            public T getCurrent() throws NoSuchElementException {
                return this.boundedReader.getCurrent();
            }

            @Override
            public void close() throws IOException {
                this.boundedReader.close();
            }

            @Override
            @Nullable
            public BoundedSource<T> splitAtFraction(double fraction) {
                return null;
            }

            @Override
            @Nullable
            public Double getFractionConsumed() {
                return this.boundedReader.getFractionConsumed();
            }

            @Override
            public long getSplitPointsConsumed() {
                return this.boundedReader.getSplitPointsConsumed();
            }

            @Override
            public long getSplitPointsRemaining() {
                return this.boundedReader.getSplitPointsRemaining();
            }

            @Override
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return this.boundedReader.getCurrentTimestamp();
            }
        }
    }

    private static class SplitFractionStatistics {
        List<Double> successfulFractions = new ArrayList<Double>();
        List<Double> nonTrivialFractions = new ArrayList<Double>();

        private SplitFractionStatistics() {
        }
    }

    private static class SplitAtFractionResult {
        public int numPrimaryItems;
        public int numResidualItems;

        public SplitAtFractionResult(int numPrimaryItems, int numResidualItems) {
            this.numPrimaryItems = numPrimaryItems;
            this.numResidualItems = numResidualItems;
        }
    }

    public static enum ExpectedSplitOutcome {
        MUST_SUCCEED_AND_BE_CONSISTENT,
        MUST_FAIL,
        MUST_BE_CONSISTENT_IF_SUCCEEDS;

    }

    private static class ReadableStructuralValue<T> {
        private T originalValue;
        private Object structuralValue;

        public ReadableStructuralValue(T originalValue, Object structuralValue) {
            this.originalValue = originalValue;
            this.structuralValue = structuralValue;
        }

        public int hashCode() {
            return Objects.hashCode(this.structuralValue);
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof ReadableStructuralValue)) {
                return false;
            }
            return Objects.equals(this.structuralValue, ((ReadableStructuralValue)obj).structuralValue);
        }

        public String toString() {
            return String.format("[%s (structural %s)]", this.originalValue, this.structuralValue);
        }
    }
}

