/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.junit.Assert;
import org.junit.Test;

public class TestLocalMode {
    private static final File TEST_DIR = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), "TestLocalMode-tez-localmode");

    @Test(timeout=30000L)
    public void testMultipleClientsWithSession() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf1 = new TezConfiguration();
        tezConf1.setBoolean("tez.local.mode", true);
        tezConf1.set("fs.defaultFS", "file:///");
        tezConf1.setBoolean("tez.runtime.optimize.local.fetch", true);
        TezClient tezClient1 = TezClient.create((String)"commonName", (TezConfiguration)tezConf1, (boolean)true);
        tezClient1.start();
        DAG dag1 = this.createSimpleDAG("dag1", SleepProcessor.class.getName());
        DAGClient dagClient1 = tezClient1.submitDAG(dag1);
        dagClient1.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient1.getDAGStatus(null).getState());
        dagClient1.close();
        tezClient1.stop();
        TezConfiguration tezConf2 = new TezConfiguration();
        tezConf2.setBoolean("tez.local.mode", true);
        tezConf2.set("fs.defaultFS", "file:///");
        tezConf2.setBoolean("tez.runtime.optimize.local.fetch", true);
        DAG dag2 = this.createSimpleDAG("dag2", SleepProcessor.class.getName());
        TezClient tezClient2 = TezClient.create((String)"commonName", (TezConfiguration)tezConf2, (boolean)true);
        tezClient2.start();
        DAGClient dagClient2 = tezClient2.submitDAG(dag2);
        dagClient2.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient2.getDAGStatus(null).getState());
        Assert.assertFalse((boolean)dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
        dagClient2.close();
        tezClient2.stop();
    }

    @Test(timeout=10000L)
    public void testMultipleClientsWithoutSession() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf1 = new TezConfiguration();
        tezConf1.setBoolean("tez.local.mode", true);
        tezConf1.set("fs.defaultFS", "file:///");
        tezConf1.setBoolean("tez.runtime.optimize.local.fetch", true);
        TezClient tezClient1 = TezClient.create((String)"commonName", (TezConfiguration)tezConf1, (boolean)false);
        tezClient1.start();
        DAG dag1 = this.createSimpleDAG("dag1", SleepProcessor.class.getName());
        DAGClient dagClient1 = tezClient1.submitDAG(dag1);
        dagClient1.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient1.getDAGStatus(null).getState());
        dagClient1.close();
        tezClient1.stop();
        TezConfiguration tezConf2 = new TezConfiguration();
        tezConf2.setBoolean("tez.local.mode", true);
        tezConf2.set("fs.defaultFS", "file:///");
        tezConf2.setBoolean("tez.runtime.optimize.local.fetch", true);
        DAG dag2 = this.createSimpleDAG("dag2", SleepProcessor.class.getName());
        TezClient tezClient2 = TezClient.create((String)"commonName", (TezConfiguration)tezConf2, (boolean)false);
        tezClient2.start();
        DAGClient dagClient2 = tezClient2.submitDAG(dag2);
        dagClient2.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient2.getDAGStatus(null).getState());
        Assert.assertFalse((boolean)dagClient1.getExecutionContext().equals(dagClient2.getExecutionContext()));
        dagClient2.close();
        tezClient2.stop();
    }

    @Test(timeout=20000L)
    public void testNoSysExitOnSuccessfulDAG() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf1 = new TezConfiguration();
        tezConf1.setBoolean("tez.local.mode", true);
        tezConf1.set("fs.defaultFS", "file:///");
        tezConf1.setBoolean("tez.runtime.optimize.local.fetch", true);
        TezClient tezClient1 = TezClient.create((String)"commonName", (TezConfiguration)tezConf1, (boolean)false);
        tezClient1.start();
        DAG dag1 = this.createSimpleDAG("dag1", SleepProcessor.class.getName());
        DAGClient dagClient1 = tezClient1.submitDAG(dag1);
        dagClient1.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient1.getDAGStatus(null).getState());
        Thread.sleep(7500L);
        dagClient1.close();
        tezClient1.stop();
    }

    @Test(timeout=20000L)
    public void testNoSysExitOnFailinglDAG() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf1 = new TezConfiguration();
        tezConf1.setBoolean("tez.local.mode", true);
        tezConf1.set("fs.defaultFS", "file:///");
        tezConf1.setBoolean("tez.runtime.optimize.local.fetch", true);
        TezClient tezClient1 = TezClient.create((String)"commonName", (TezConfiguration)tezConf1, (boolean)false);
        tezClient1.start();
        DAG dag1 = this.createSimpleDAG("dag1", FailingProcessor.class.getName());
        DAGClient dagClient1 = tezClient1.submitDAG(dag1);
        dagClient1.waitForCompletion();
        Assert.assertEquals((Object)DAGStatus.State.FAILED, (Object)dagClient1.getDAGStatus(null).getState());
        Thread.sleep(7500L);
        dagClient1.close();
        tezClient1.stop();
    }

    private DAG createSimpleDAG(String dagName, String processorName) {
        DAG dag = DAG.create((String)dagName).addVertex(Vertex.create((String)"Sleep", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)processorName).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload())), (int)1));
        return dag;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=30000L)
    public void testMultiDAGsOnSession() throws IOException, TezException, InterruptedException {
        int i;
        int dags = 2;
        String[] inputPaths = new String[dags];
        String[] outputPaths = new String[dags];
        DAGClient[] dagClients = new DAGClient[dags];
        TezConfiguration tezConf = new TezConfiguration();
        tezConf.setBoolean("tez.local.mode", true);
        tezConf.set("fs.defaultFS", "file:///");
        tezConf.setBoolean("tez.runtime.optimize.local.fetch", true);
        TezClient tezClient = TezClient.create((String)"testMultiDAGOnSession", (TezConfiguration)tezConf, (boolean)true);
        tezClient.start();
        FileSystem fs = FileSystem.get((Configuration)tezConf);
        for (i = 0; i < dags; ++i) {
            inputPaths[i] = new Path(TEST_DIR.getAbsolutePath(), "in-" + i).toString();
            this.createInputFile(fs, inputPaths[i]);
            outputPaths[i] = new Path(TEST_DIR.getAbsolutePath(), "out-" + i).toString();
        }
        try {
            for (i = 0; i < inputPaths.length; ++i) {
                DAG dag = OrderedWordCount.createDAG((TezConfiguration)tezConf, (String)inputPaths[i], (String)outputPaths[i], (int)1, (boolean)false, (boolean)false, (String)("DAG-Iteration-" + i));
                tezClient.waitTillReady();
                System.out.println("Running dag number " + i);
                dagClients[i] = tezClient.submitDAG(dag);
                DAGStatus dagStatus = dagClients[i].waitForCompletion();
                if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
                    Assert.fail((String)("Iteration " + i + " failed with diagnostics: " + dagStatus.getDiagnostics()));
                }
                if (i <= 0) continue;
                Assert.assertTrue((boolean)dagClients[i - 1].getExecutionContext().equals(dagClients[i].getExecutionContext()));
            }
        }
        finally {
            tezClient.stop();
        }
    }

    private void createInputFile(FileSystem fs, String path) throws IOException {
        Path file = new Path(new Path(path), "input.txt");
        try {
            FSDataOutputStream fsdos = fs.create(file);
            fsdos.write("This is a small test file !".getBytes());
            fsdos.flush();
            fsdos.close();
        }
        catch (IOException ioe) {
            Assert.fail((String)"Can not create input File!");
        }
    }

    public static class FailingProcessor
    extends AbstractLogicalIOProcessor {
        public FailingProcessor(ProcessorContext context) {
            super(context);
        }

        public void initialize() throws Exception {
        }

        public void handleEvents(List<Event> processorEvents) {
        }

        public void close() throws Exception {
        }

        public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception {
            throw new TezException("FailingProcessor");
        }
    }
}

