package org.apache.hadoop.mapred;

import com.google.common.base.Supplier;
import java.io.File;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthMonitor;
import org.apache.hadoop.ha.TestNodeFencer;
import org.apache.hadoop.ha.ZKFCTestUtil;
import org.apache.hadoop.mapred.tools.MRHAAdmin;
import org.apache.hadoop.mapred.tools.MRZKFailoverController;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hadoop/mapred/TestMRZKFailoverController.class */
public class TestMRZKFailoverController extends ClientBaseWithFixes {
    private static final Log LOG = LogFactory.getLog(TestMRZKFailoverController.class);
    private static final Path TEST_DIR = new Path("/tmp/tst");
    private Configuration conf;
    private MiniMRHACluster cluster;
    private MultithreadedTestUtil.TestContext ctx;
    private ZKFCThread thr1;
    private ZKFCThread thr2;

    /* loaded from: input_file:org/apache/hadoop/mapred/TestMRZKFailoverController$ZKFCThread.class */
    private class ZKFCThread extends MultithreadedTestUtil.TestingThread {
        private final MRZKFailoverController zkfc;

        public ZKFCThread(MultithreadedTestUtil.TestContext testContext, int i) {
            super(testContext);
            this.zkfc = MRZKFailoverController.create(TestMRZKFailoverController.this.cluster.getJobTrackerHaDaemon(i).getConf());
        }

        public void doWork() throws Exception {
            try {
                Assert.assertEquals(0L, this.zkfc.run(new String[0]));
            } catch (InterruptedException e) {
            }
        }
    }

    @Before
    public void setup() throws Exception {
        this.conf = new Configuration();
        this.conf.set(HAUtil.addKeySuffixes("ha.zookeeper.quorum", new String[]{MiniMRHACluster.LOGICAL_NAME}), this.hostPort);
        this.conf.set("mapred.ha.fencing.methods", TestNodeFencer.AlwaysSucceedFencer.class.getName());
        this.conf.setBoolean("mapred.ha.automatic-failover.enabled", true);
        this.conf.setInt("ipc.client.connection.maxidletime", 0);
        this.conf.setInt(HAUtil.addKeySuffixes("mapred.ha.zkfc.port", new String[]{MiniMRHACluster.LOGICAL_NAME, "jt1"}), 10003);
        this.conf.setInt(HAUtil.addKeySuffixes("mapred.ha.zkfc.port", new String[]{MiniMRHACluster.LOGICAL_NAME, "jt2"}), 10004);
        this.cluster = new MiniMRHACluster(this.conf);
        this.ctx = new MultithreadedTestUtil.TestContext();
        MultithreadedTestUtil.TestContext testContext = this.ctx;
        ZKFCThread zKFCThread = new ZKFCThread(this.ctx, 0);
        this.thr1 = zKFCThread;
        testContext.addThread(zKFCThread);
        Assert.assertEquals(0L, this.thr1.zkfc.run(new String[]{"-formatZK"}));
        this.thr1.start();
        waitForHAState(0, HAServiceProtocol.HAServiceState.ACTIVE);
        MultithreadedTestUtil.TestContext testContext2 = this.ctx;
        ZKFCThread zKFCThread2 = new ZKFCThread(this.ctx, 1);
        this.thr2 = zKFCThread2;
        testContext2.addThread(zKFCThread2);
        this.thr2.start();
        this.cluster.startTaskTracker(0, 1);
        this.cluster.waitActive();
        ZKFCTestUtil.waitForHealthState(this.thr1.zkfc, HealthMonitor.State.SERVICE_HEALTHY, this.ctx);
        ZKFCTestUtil.waitForHealthState(this.thr2.zkfc, HealthMonitor.State.SERVICE_HEALTHY, this.ctx);
    }

    @After
    public void shutdown() throws Exception {
        this.cluster.shutdown();
        if (this.thr1 != null) {
            this.thr1.interrupt();
        }
        if (this.thr2 != null) {
            this.thr2.interrupt();
        }
        if (this.ctx != null) {
            this.ctx.stop();
        }
    }

    @Test(timeout = 60000)
    public void testFailoverWhileRunningJob() throws Exception {
        LOG.info("Running job failover test");
        FileUtil.fullyDelete(new File("/tmp/tst"));
        JobConf jobConf = new JobConf(this.conf);
        String path = new Path(TEST_DIR, "signal").toString();
        UtilsForTests.configureWaitingJobConf(jobConf, new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output3"), 2, 0, "test-resubmission", path, path);
        RunningJob submitJob = new JobClient(jobConf).submitJob(jobConf);
        while (submitJob.mapProgress() < 0.5f) {
            LOG.info("Waiting for job " + submitJob.getID() + " to be 50% done: " + submitJob.mapProgress());
            UtilsForTests.waitFor(500L);
        }
        LOG.info("Waiting for job " + submitJob.getID() + " to be 50% done: " + submitJob.mapProgress());
        LOG.info("Shutting down jt1");
        this.cluster.shutdownJobTracker(0);
        FileSystem.getLocal(this.conf).create(new Path(TEST_DIR, "signal"));
        while (!submitJob.isComplete()) {
            LOG.info("Waiting for job " + submitJob.getID() + " to be successful: " + submitJob.mapProgress());
            UtilsForTests.waitFor(500L);
        }
        Assert.assertTrue("Job should be successful", submitJob.isSuccessful());
    }

    @Test(timeout = 60000)
    public void testManualFailover() throws Exception {
        LOG.info("Running manual failover test");
        this.thr2.zkfc.getLocalTarget().getZKFCProxy(this.conf, 15000).gracefulFailover();
        waitForHAState(0, HAServiceProtocol.HAServiceState.STANDBY);
        waitForHAState(1, HAServiceProtocol.HAServiceState.ACTIVE);
        this.thr1.zkfc.getLocalTarget().getZKFCProxy(this.conf, 15000).gracefulFailover();
        waitForHAState(0, HAServiceProtocol.HAServiceState.ACTIVE);
        waitForHAState(1, HAServiceProtocol.HAServiceState.STANDBY);
    }

    @Test(timeout = 60000)
    public void testManualFailoverWithMRHAAdmin() throws Exception {
        LOG.info("Running manual failover test with MRHAAdmin");
        new MRHAAdmin().setConf(this.conf);
        Assert.assertEquals(0L, r0.run(new String[]{"-failover", "jt1", "jt2"}));
        waitForHAState(0, HAServiceProtocol.HAServiceState.STANDBY);
        waitForHAState(1, HAServiceProtocol.HAServiceState.ACTIVE);
        Assert.assertEquals(0L, r0.run(new String[]{"-failover", "jt2", "jt1"}));
        waitForHAState(0, HAServiceProtocol.HAServiceState.ACTIVE);
        waitForHAState(1, HAServiceProtocol.HAServiceState.STANDBY);
    }

    private void waitForHAState(int i, final HAServiceProtocol.HAServiceState hAServiceState) throws TimeoutException, InterruptedException {
        final JobTrackerHADaemon jobTrackerHaDaemon = this.cluster.getJobTrackerHaDaemon(i);
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.mapred.TestMRZKFailoverController.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Boolean m104get() {
                try {
                    return Boolean.valueOf(jobTrackerHaDaemon.getServiceStatus().getState() == hAServiceState);
                } catch (Exception e) {
                    e.printStackTrace();
                    return false;
                }
            }
        }, 50, 5000);
    }
}
