/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.kafka.common.utils.Utils;
import org.apache.log4j.Logger;

public class KafkaMigrationTool {
    private static final Logger log = Logger.getLogger((String)KafkaMigrationTool.class.getName());
    private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer";
    private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "kafka.consumer.ConsumerConfig";
    private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "kafka.consumer.KafkaStream";
    private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "kafka.consumer.ConsumerIterator";
    private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "kafka.javaapi.consumer.ConsumerConnector";
    private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "kafka.message.MessageAndMetadata";
    private static final String KAFKA_07_MESSAGE_CLASS_NAME = "kafka.message.Message";
    private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "kafka.consumer.Whitelist";
    private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "kafka.consumer.TopicFilter";
    private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "kafka.consumer.Blacklist";
    private static Class<?> kafkaStaticConsumer07 = null;
    private static Class<?> consumerConfig07 = null;
    private static Class<?> consumerConnector07 = null;
    private static Class<?> kafkaStream07 = null;
    private static Class<?> topicFilter07 = null;
    private static Class<?> whiteList07 = null;
    private static Class<?> blackList07 = null;
    private static Class<?> kafkaConsumerIteratorClass07 = null;
    private static Class<?> kafkaMessageAndMetaDataClass07 = null;
    private static Class<?> kafkaMessageClass07 = null;

    public static void main(String[] args) throws InterruptedException, IOException {
        int blackListCount;
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec consumerConfigOpt = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. You man specify multiple of these.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec producerConfigOpt = parser.accepts("producer.config", "Producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec numProducersOpt = parser.accepts("num.producers", "Number of producer instances").withRequiredArg().describedAs("Number of producers").ofType(Integer.class).defaultsTo((Object)1, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec zkClient01JarOpt = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file").withRequiredArg().describedAs("zkClient 0.1 jar file required by Kafka 0.7").ofType(String.class);
        ArgumentAcceptingOptionSpec kafka07JarOpt = parser.accepts("kafka.07.jar", "Kafka 0.7 jar file").withRequiredArg().describedAs("kafka 0.7 jar").ofType(String.class);
        ArgumentAcceptingOptionSpec numStreamsOpt = parser.accepts("num.streams", "Number of consumer streams").withRequiredArg().describedAs("Number of consumer threads").ofType(Integer.class).defaultsTo((Object)1, (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec queueSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer").withRequiredArg().describedAs("Queue size in terms of number of messages").ofType(Integer.class).defaultsTo((Object)10000, (Object[])new Integer[0]);
        OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message.");
        OptionSet options = parser.parse(args);
        if (options.has((OptionSpec)helpOpt)) {
            parser.printHelpOn((OutputStream)System.out);
            System.exit(0);
        }
        KafkaMigrationTool.checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
        int whiteListCount = options.has((OptionSpec)whitelistOpt) ? 1 : 0;
        int n = blackListCount = options.has((OptionSpec)blacklistOpt) ? 1 : 0;
        if (whiteListCount + blackListCount != 1) {
            System.err.println("Exactly one of whitelist or blacklist is required.");
            System.exit(1);
        }
        String kafkaJarFile07 = (String)options.valueOf((OptionSpec)kafka07JarOpt);
        String zkClientJarFile = (String)options.valueOf((OptionSpec)zkClient01JarOpt);
        String consumerConfigFile07 = (String)options.valueOf((OptionSpec)consumerConfigOpt);
        int numConsumers = (Integer)options.valueOf((OptionSpec)numStreamsOpt);
        String producerConfigFile08 = (String)options.valueOf((OptionSpec)producerConfigOpt);
        int numProducers = (Integer)options.valueOf((OptionSpec)numProducersOpt);
        final ArrayList<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
        final ArrayList<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
        try {
            File kafkaJar07 = new File(kafkaJarFile07);
            File zkClientJar = new File(zkClientJarFile);
            ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{kafkaJar07.toURI().toURL(), zkClientJar.toURI().toURL()});
            consumerConfig07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
            kafkaStaticConsumer07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
            consumerConnector07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
            kafkaStream07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
            topicFilter07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
            whiteList07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
            blackList07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
            kafkaMessageClass07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
            kafkaConsumerIteratorClass07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
            kafkaMessageAndMetaDataClass07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
            Constructor<?> consumerConfigConstructor07 = consumerConfig07.getConstructor(Properties.class);
            Properties kafkaConsumerProperties07 = new Properties();
            kafkaConsumerProperties07.load(new FileInputStream(consumerConfigFile07));
            if (kafkaConsumerProperties07.getProperty("shallow.iterator.enable", "").equals("true")) {
                log.warn((Object)"Shallow iterator should not be used in the migration tool");
                kafkaConsumerProperties07.setProperty("shallow.iterator.enable", "false");
            }
            Object consumerConfig07 = consumerConfigConstructor07.newInstance(kafkaConsumerProperties07);
            Method consumerConnectorCreationMethod07 = kafkaStaticConsumer07.getMethod("createJavaConsumerConnector", KafkaMigrationTool.consumerConfig07);
            final Object consumerConnector07 = consumerConnectorCreationMethod07.invoke(null, consumerConfig07);
            Method consumerConnectorCreateMessageStreamsMethod07 = KafkaMigrationTool.consumerConnector07.getMethod("createMessageStreamsByFilter", topicFilter07, Integer.TYPE);
            final Method consumerConnectorShutdownMethod07 = KafkaMigrationTool.consumerConnector07.getMethod("shutdown", new Class[0]);
            Constructor<?> whiteListConstructor07 = whiteList07.getConstructor(String.class);
            Constructor<?> blackListConstructor07 = blackList07.getConstructor(String.class);
            Object filterSpec = null;
            filterSpec = options.has((OptionSpec)whitelistOpt) ? whiteListConstructor07.newInstance(options.valueOf((OptionSpec)whitelistOpt)) : blackListConstructor07.newInstance(options.valueOf((OptionSpec)blacklistOpt));
            Object retKafkaStreams = consumerConnectorCreateMessageStreamsMethod07.invoke(consumerConnector07, filterSpec, numConsumers);
            Properties kafkaProducerProperties08 = new Properties();
            kafkaProducerProperties08.load(new FileInputStream(producerConfigFile08));
            kafkaProducerProperties08.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
            int queueSize = (Integer)options.valueOf((OptionSpec)queueSizeOpt);
            ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
            int threadId = 0;
            Runtime.getRuntime().addShutdownHook(new Thread(){

                @Override
                public void run() {
                    try {
                        consumerConnectorShutdownMethod07.invoke(consumerConnector07, new Object[0]);
                    }
                    catch (Exception e) {
                        log.error((Object)"Error while shutting down Kafka consumer", (Throwable)e);
                    }
                    for (MigrationThread migrationThread : migrationThreads) {
                        migrationThread.shutdown();
                    }
                    for (ProducerThread producerThread : producerThreads) {
                        producerThread.shutdown();
                    }
                    for (ProducerThread producerThread : producerThreads) {
                        producerThread.awaitShutdown();
                    }
                    log.info((Object)"Kafka migration tool shutdown successfully");
                }
            });
            for (Object stream : (List)retKafkaStreams) {
                MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
                ++threadId;
                thread.start();
                migrationThreads.add(thread);
            }
            String clientId = kafkaProducerProperties08.getProperty("client.id");
            for (int i = 0; i < numProducers; ++i) {
                kafkaProducerProperties08.put("client.id", clientId + "-" + i);
                ProducerConfig producerConfig08 = new ProducerConfig(kafkaProducerProperties08);
                Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig08);
                ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
                producerThread.start();
                producerThreads.add(producerThread);
            }
        }
        catch (Throwable e) {
            System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace((Throwable)e));
            log.error((Object)"Kafka migration tool failed: ", e);
        }
    }

    private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
        for (OptionSpec arg : required) {
            if (options.has(arg)) continue;
            System.err.println("Missing required argument \"" + arg + "\"");
            parser.printHelpOn((OutputStream)System.err);
            System.exit(1);
        }
    }

    private static class ParentLastURLClassLoader
    extends ClassLoader {
        private ChildURLClassLoader childClassLoader;

        public ParentLastURLClassLoader(URL[] urls) {
            super(Thread.currentThread().getContextClassLoader());
            this.childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
        }

        @Override
        protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
            try {
                return this.childClassLoader.findClass(name);
            }
            catch (ClassNotFoundException e) {
                return super.loadClass(name, resolve);
            }
        }

        private static class ChildURLClassLoader
        extends URLClassLoader {
            private FindClassClassLoader realParent;

            public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) {
                super(urls, (ClassLoader)null);
                this.realParent = realParent;
            }

            @Override
            public Class<?> findClass(String name) throws ClassNotFoundException {
                try {
                    return super.findClass(name);
                }
                catch (ClassNotFoundException e) {
                    return this.realParent.loadClass(name);
                }
            }
        }

        private static class FindClassClassLoader
        extends ClassLoader {
            public FindClassClassLoader(ClassLoader parent) {
                super(parent);
            }

            @Override
            public Class<?> findClass(String name) throws ClassNotFoundException {
                return super.findClass(name);
            }
        }
    }

    static class ProducerThread
    extends Thread {
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final Producer<byte[], byte[]> producer;
        private final int threadId;
        private String threadName;
        private Logger logger;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage<Object, Object>("shutdown", null, null);

        public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, Producer<byte[], byte[]> producer, int threadId) {
            this.producerDataChannel = producerDataChannel;
            this.producer = producer;
            this.threadId = threadId;
            this.threadName = "ProducerThread-" + threadId;
            this.logger = Logger.getLogger((String)ProducerThread.class.getName());
            this.setName(this.threadName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                KeyedMessage<byte[], byte[]> data;
                while (!(data = this.producerDataChannel.receiveRequest()).equals(this.shutdownMessage)) {
                    this.producer.send(data);
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug((Object)String.format("Sending message %s", new String(data.message())));
                }
                this.logger.info((Object)("Producer thread " + this.threadName + " finished running"));
            }
            catch (Throwable t) {
                this.logger.fatal((Object)"Producer thread failure due to ", t);
            }
            finally {
                this.shutdownComplete.countDown();
            }
        }

        public void shutdown() {
            try {
                this.logger.info((Object)("Producer thread " + this.threadName + " shutting down"));
                this.producerDataChannel.sendRequest(this.shutdownMessage);
            }
            catch (InterruptedException ie) {
                this.logger.warn((Object)"Interrupt during shutdown of ProducerThread", (Throwable)ie);
            }
        }

        public void awaitShutdown() {
            try {
                this.shutdownComplete.await();
                this.producer.close();
                this.logger.info((Object)("Producer thread " + this.threadName + " shutdown complete"));
            }
            catch (InterruptedException ie) {
                this.logger.warn((Object)"Interrupt during shutdown of ProducerThread", (Throwable)ie);
            }
        }
    }

    private static class MigrationThread
    extends Thread {
        private final Object stream;
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final int threadId;
        private final String threadName;
        private final Logger logger;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private final AtomicBoolean isRunning = new AtomicBoolean(true);

        MigrationThread(Object stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel, int threadId) {
            this.stream = stream;
            this.producerDataChannel = producerDataChannel;
            this.threadId = threadId;
            this.threadName = "MigrationThread-" + threadId;
            this.logger = Logger.getLogger((String)MigrationThread.class.getName());
            this.setName(this.threadName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Method messageGetPayloadMethod07 = kafkaMessageClass07.getMethod("payload", new Class[0]);
                Method kafkaGetMessageMethod07 = kafkaMessageAndMetaDataClass07.getMethod("message", new Class[0]);
                Method kafkaGetTopicMethod07 = kafkaMessageAndMetaDataClass07.getMethod("topic", new Class[0]);
                Method consumerIteratorMethod = kafkaStream07.getMethod("iterator", new Class[0]);
                Method kafkaStreamHasNextMethod07 = kafkaConsumerIteratorClass07.getMethod("hasNext", new Class[0]);
                Method kafkaStreamNextMethod07 = kafkaConsumerIteratorClass07.getMethod("next", new Class[0]);
                Object iterator = consumerIteratorMethod.invoke(this.stream, new Object[0]);
                while (((Boolean)kafkaStreamHasNextMethod07.invoke(iterator, new Object[0])).booleanValue()) {
                    Object messageAndMetaData07 = kafkaStreamNextMethod07.invoke(iterator, new Object[0]);
                    Object message07 = kafkaGetMessageMethod07.invoke(messageAndMetaData07, new Object[0]);
                    Object topic = kafkaGetTopicMethod07.invoke(messageAndMetaData07, new Object[0]);
                    Object payload07 = messageGetPayloadMethod07.invoke(message07, new Object[0]);
                    int size2 = ((ByteBuffer)payload07).remaining();
                    byte[] bytes = new byte[size2];
                    ((ByteBuffer)payload07).get(bytes);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Migration thread " + this.threadId + " sending message of size " + bytes.length + " to topic " + topic));
                    }
                    KeyedMessage<Object, byte[]> producerData = new KeyedMessage<Object, byte[]>((String)topic, null, bytes);
                    this.producerDataChannel.sendRequest(producerData);
                }
                this.logger.info((Object)("Migration thread " + this.threadName + " finished running"));
            }
            catch (InvocationTargetException t) {
                this.logger.fatal((Object)"Migration thread failure due to root cause ", t.getCause());
            }
            catch (Throwable t) {
                this.logger.fatal((Object)"Migration thread failure due to ", t);
            }
            finally {
                this.shutdownComplete.countDown();
            }
        }

        public void shutdown() {
            this.logger.info((Object)("Migration thread " + this.threadName + " shutting down"));
            this.isRunning.set(false);
            this.interrupt();
            try {
                this.shutdownComplete.await();
            }
            catch (InterruptedException ie) {
                this.logger.warn((Object)"Interrupt during shutdown of MigrationThread", (Throwable)ie);
            }
            this.logger.info((Object)("Migration thread " + this.threadName + " shutdown complete"));
        }
    }

    static class ProducerDataChannel<T> {
        private final int producerQueueSize;
        private final BlockingQueue<T> producerRequestQueue;

        public ProducerDataChannel(int queueSize) {
            this.producerQueueSize = queueSize;
            this.producerRequestQueue = new ArrayBlockingQueue<T>(this.producerQueueSize);
        }

        public void sendRequest(T data) throws InterruptedException {
            this.producerRequestQueue.put(data);
        }

        public T receiveRequest() throws InterruptedException {
            return this.producerRequestQueue.take();
        }
    }
}

