/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.ha.FailoverController;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.TestNodeFencer;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobTrackerHADaemon;
import org.apache.hadoop.mapred.JobTrackerHAServiceTarget;
import org.apache.hadoop.mapred.MiniMRHACluster;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.util.ExitUtil;
import org.junit.Before;
import org.junit.Test;

public class TestHAStress {
    private static final Log LOG = LogFactory.getLog(TestHAStress.class);
    private static final HAServiceProtocol.StateChangeRequestInfo REQ_INFO = new HAServiceProtocol.StateChangeRequestInfo(HAServiceProtocol.RequestSource.REQUEST_BY_USER_FORCED);
    private static final int NUM_THREADS = 2;
    private static final long RUNTIME = 35000L;
    private MiniMRHACluster cluster;
    private JobTrackerHADaemon jt1;
    private JobTrackerHADaemon jt2;
    private JobTrackerHAServiceTarget target1;
    private JobTrackerHAServiceTarget target2;
    private Configuration conf;

    @Before
    public void setUp() throws Exception {
        ExitUtil.disableSystemExit();
        this.conf = new Configuration();
        this.conf.set("mapred.ha.fencing.methods", TestNodeFencer.AlwaysSucceedFencer.class.getName());
        this.conf.setLong("mapred.ha.jobtracker.active-check.millis", 1000L);
        this.conf.setBoolean("mapred.job.tracker.persist.jobstatus.active", true);
        this.conf.setInt("mapred.job.tracker.persist.jobstatus.hours", 1);
        this.conf.set("mapred.job.tracker.persist.jobstatus.dir", "/tmp/jobtracker/jobsInfo");
        this.cluster = new MiniMRHACluster(this.conf);
        this.cluster.getJobTrackerHaDaemon(0).makeActive();
        this.cluster.startTaskTracker(0, 1);
        this.cluster.waitActive();
        this.jt1 = this.cluster.getJobTrackerHaDaemon(0);
        this.jt2 = this.cluster.getJobTrackerHaDaemon(1);
        this.target1 = new JobTrackerHAServiceTarget(this.jt1.getConf());
        this.target2 = new JobTrackerHAServiceTarget(this.jt2.getConf());
    }

    @Test
    public void test() throws Exception {
        MultithreadedTestUtil.TestContext flippers = new MultithreadedTestUtil.TestContext();
        flippers.addThread((MultithreadedTestUtil.TestingThread)new FailoverThread(flippers));
        MultithreadedTestUtil.TestContext submitters = new MultithreadedTestUtil.TestContext();
        for (int i = 0; i < 2; ++i) {
            submitters.addThread((MultithreadedTestUtil.TestingThread)new JobSubmitterThread(submitters, this.conf));
        }
        flippers.startThreads();
        submitters.startThreads();
        submitters.waitFor(35000L);
        submitters.stop();
        flippers.stop();
    }

    private static class JobSubmitterThread
    extends MultithreadedTestUtil.RepeatingTestThread {
        private Configuration conf;

        public JobSubmitterThread(MultithreadedTestUtil.TestContext ctx, Configuration conf) {
            super(ctx);
            this.conf = conf;
            this.setName(((Object)((Object)this)).getClass().getName());
        }

        public void doAnAction() throws Exception {
            RunningJob rj;
            System.out.println("==============================\nSubmitting job\n==================================");
            SleepJob job = new SleepJob();
            job.setConf(this.conf);
            JobConf jobConf = job.setupJobConf(1, 0, 1L, 1, 1L, 1);
            JobClient jc = new JobClient(jobConf);
            try {
                rj = jc.submitJob(jobConf);
            }
            catch (IOException e) {
                System.out.println("==============================\nJob submission failed. Ignore.\n==================================");
                return;
            }
            System.out.println("==============================\nSuccessfully submitted job " + rj.getJobID() + "\n" + "==================================");
            if (!jc.monitorAndPrintJob(jobConf, rj)) {
                throw new IOException("Job failed! " + rj.getFailureInfo());
            }
        }
    }

    private class FailoverThread
    extends MultithreadedTestUtil.RepeatingTestThread {
        private long msBetweenFailovers;
        private FailoverController fc;

        public FailoverThread(MultithreadedTestUtil.TestContext ctx) {
            super(ctx);
            this.msBetweenFailovers = 10000L;
            this.fc = new FailoverController(TestHAStress.this.conf, HAServiceProtocol.RequestSource.REQUEST_BY_USER);
            this.setName(((Object)((Object)this)).getClass().getName());
        }

        public void doAnAction() throws Exception {
            System.out.println("==============================\nFailing over from 0->1\n==================================");
            this.fc.failover((HAServiceTarget)TestHAStress.this.target1, (HAServiceTarget)TestHAStress.this.target2, false, false);
            Thread.sleep(this.msBetweenFailovers);
            System.out.println("==============================\nFailing over from 1->0\n==================================");
            this.fc.failover((HAServiceTarget)TestHAStress.this.target2, (HAServiceTarget)TestHAStress.this.target1, false, false);
            Thread.sleep(this.msBetweenFailovers);
        }
    }
}

