/*
 * Decompiled with CFR 0.152.
 */
package com.xunlei.channel.sms.queue.kafka;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.xunlei.channel.sms.health.exception.ExceptionHealthEvent;
import com.xunlei.channel.sms.health.exception.ExceptionReporter;
import com.xunlei.channel.sms.health.exception.ExceptionType;
import com.xunlei.channel.sms.queue.Queue;
import com.xunlei.channel.sms.queue.QueueElement;
import com.xunlei.channel.sms.queue.kafka.KafkaQueueSubmitter;
import com.xunlei.channel.sms.serialize.QueueDataConverter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class KafkaQueue<T>
implements Queue<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaQueue.class);
    public static final long DEFAULT_TIMEOUT_MILLS = 5000L;
    public static final long DEFAULT_READ_TIMEOUT_MILLS = 1000L;
    private long timeoutMills = 5000L;
    private long readTimeoutMills = 1000L;
    private String topic;
    private String brokers;
    private String groupId;
    private QueueDataConverter<QueueElement<T>, String> queueDataConverter;
    private ExecutorService executorService = new ThreadPoolExecutor(10, 10, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("kafka-queue-thread-%d").build());
    private final ThreadLocal<Producer<String, String>> producer = new ThreadLocal<Producer<String, String>>(){

        @Override
        protected Producer<String, String> initialValue() {
            Producer producer = KafkaQueue.this.createProducer();
            KafkaQueueSubmitter.watchProducer(producer, KafkaQueue.this.readTimeoutMills);
            logger.info("Initialized kafka producer: {}", (Object)producer);
            return producer;
        }
    };
    private final ThreadLocal<Consumer<String, String>> consumer = new ThreadLocal<Consumer<String, String>>(){

        @Override
        protected Consumer<String, String> initialValue() {
            Consumer consumer = KafkaQueue.this.createConsumer();
            logger.info("Initialized kafka consumer: {}", (Object)consumer);
            return consumer;
        }
    };

    public KafkaQueue(QueueDataConverter<? extends QueueElement<T>, String> queueDataConverter, String brokers, String topic, String groupId) {
        this.queueDataConverter = queueDataConverter;
        this.brokers = brokers;
        this.topic = topic;
        this.groupId = groupId;
    }

    @Override
    public boolean push(QueueElement<T> t) {
        return this.pushToKafka(t);
    }

    @Override
    public boolean asynchronousPush(QueueElement<T> t) {
        Future<RecordMetadata> submit = this.pushToKafkaAndReturnFuture(t);
        return submit != null;
    }

    public boolean pushToKafka(QueueElement<T> t) {
        Future<RecordMetadata> recordMetadataFuture = this.pushToKafkaAndReturnFuture(t);
        try {
            boolean success;
            RecordMetadata recordMetadata = recordMetadataFuture.get(this.timeoutMills, TimeUnit.MILLISECONDS);
            boolean bl = success = recordMetadata != null;
            if (success) {
                this.reportRecordMetadata(recordMetadata);
                if (logger.isDebugEnabled()) {
                    logger.debug("Succeed push data to queue. data: {}, RecordMetadata: {}", t, (Object)recordMetadata);
                }
            } else {
                logger.warn("Error push data to kafka. data: {}, RecordMetadata: {}", t, (Object)recordMetadata);
            }
            return success;
        }
        catch (Exception e) {
            logger.error("Failed to push element to queue, data: " + t + ". Message: " + e.getMessage(), (Throwable)e);
            return false;
        }
    }

    private void reportRecordMetadata(RecordMetadata recordMetadata) {
        if (recordMetadata == null) {
            return;
        }
        long timestamp = recordMetadata.timestamp();
        long now = System.currentTimeMillis();
        long timeConsuming = now - timestamp;
        if (timeConsuming >= 5000L) {
            logger.warn("Too long to consume this message: {}, timeConsuming: {}", (Object)recordMetadata, (Object)timeConsuming);
        } else if (logger.isDebugEnabled()) {
            logger.warn("Use time: {} to consume this message: {}", (Object)timeConsuming, (Object)recordMetadata);
        }
    }

    public Future<RecordMetadata> pushToKafkaAndReturnFuture(final QueueElement<T> t) {
        Future<RecordMetadata> future = this.executorService.submit(new Callable<RecordMetadata>(){

            @Override
            public RecordMetadata call() throws Exception {
                Future future = KafkaQueue.this.pushToKafkaAndReturnsMetadata(t);
                if (future == null) {
                    return null;
                }
                RecordMetadata recordMetadata = (RecordMetadata)future.get(KafkaQueue.this.timeoutMills, TimeUnit.MILLISECONDS);
                if (logger.isDebugEnabled()) {
                    logger.debug("Succeed send element: {} to kafka", (Object)t, (Object)recordMetadata);
                }
                return recordMetadata;
            }
        });
        return future;
    }

    private Future<RecordMetadata> pushToKafkaAndReturnsMetadata(QueueElement<T> t) {
        String rawData;
        try {
            rawData = this.queueDataConverter.serialize(t);
        }
        catch (Exception e) {
            logger.error("", (Throwable)e);
            return null;
        }
        try {
            ProducerRecord record = new ProducerRecord(this.topic, (Object)rawData);
            Future future = this.producer.get().send(record);
            KafkaQueueSubmitter.submitted(this.producer.get());
            return future;
        }
        catch (Exception e) {
            logger.error("Failed to send element: " + t + " to kafka. Error message: " + e.getMessage(), (Throwable)e);
            ExceptionReporter.reportHealthEvent((ExceptionHealthEvent)ExceptionHealthEvent.newHealthEvent((ExceptionType)ExceptionType.PUSH_TO_QUEUE_ERROR));
            return null;
        }
    }

    @Override
    public void setQueueId(String queueId) {
    }

    @Override
    public String getQueueId() {
        return this.topic;
    }

    @Override
    public List<QueueElement<T>> pull() {
        ConsumerRecords records = null;
        try {
            records = this.consumer.get().poll(this.readTimeoutMills);
        }
        catch (Exception e) {
            logger.error("Pull from queue error. Error message: " + e.getMessage(), (Throwable)e);
            ExceptionReporter.reportHealthEvent((ExceptionHealthEvent)ExceptionHealthEvent.newHealthEvent((ExceptionType)ExceptionType.PULL_FROM_QUEUE_ERROR));
        }
        if (records == null || records.isEmpty()) {
            return null;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Succeed pull data from queue: {}", (Object)records);
        }
        ArrayList<QueueElement<T>> queueElements = new ArrayList<QueueElement<T>>(records.count());
        for (ConsumerRecord record : records) {
            String topic = record.topic();
            if (!topic.equals(this.topic)) {
                logger.error("Poll error topic record! topic: {} our topic: {}", (Object)topic, (Object)this.topic);
                continue;
            }
            String value = (String)record.value();
            try {
                QueueElement<T> queueElement = this.queueDataConverter.deserialize(value);
                if (logger.isDebugEnabled()) {
                    logger.debug("Poll a new record: {} from raw data: {}", queueElement, (Object)value);
                }
                queueElements.add(queueElement);
            }
            catch (Exception e) {
                logger.error("", (Throwable)e);
            }
        }
        return queueElements;
    }

    @Override
    public List<QueueElement<T>> pull(boolean blocking) {
        List<QueueElement<T>> queueElements = this.pull();
        while (blocking && CollectionUtils.isEmpty(queueElements)) {
            queueElements = this.pull();
        }
        return queueElements;
    }

    public String getTopic() {
        return this.topic;
    }

    public void initKafka() {
        Assert.notNull((Object)this.brokers, (String)"Brokers must presents!");
        Assert.notNull((Object)this.groupId, (String)"GroupId must presents!");
    }

    private Consumer<String, String> createConsumer() {
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put("bootstrap.servers", this.brokers);
        consumerProps.put("group.id", this.groupId);
        consumerProps.put("enable.auto.commit", "true");
        consumerProps.put("auto.commit.interval.ms", "1000");
        consumerProps.put("session.timeout.ms", 30000);
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        logger.info("Initializing kafka consumer with properties: {}", consumerProps);
        KafkaConsumer consumer = new KafkaConsumer(consumerProps);
        consumer.subscribe(Arrays.asList(this.topic));
        return consumer;
    }

    private Producer<String, String> createProducer() {
        HashMap<String, String> producerProps = new HashMap<String, String>();
        producerProps.put("bootstrap.servers", this.brokers);
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("linger.ms", this.readTimeoutMills + "");
        logger.info("Initializing kafka producer with properties: {}", producerProps);
        KafkaProducer producer = new KafkaProducer(producerProps);
        return producer;
    }

    public Producer<String, String> getProducer() {
        return this.producer.get();
    }

    public void setProducer(Producer<String, String> producer) {
        this.producer.set(producer);
    }

    public Consumer<String, String> getConsumer() {
        return this.consumer.get();
    }

    public void setConsumer(Consumer<String, String> consumer) {
        this.consumer.set(consumer);
    }

    public long getTimeoutMills() {
        return this.timeoutMills;
    }

    public void setTimeoutMills(long timeoutMills) {
        this.timeoutMills = timeoutMills;
    }

    public long getReadTimeoutMills() {
        return this.readTimeoutMills;
    }

    public void setReadTimeoutMills(long readTimeoutMills) {
        this.readTimeoutMills = readTimeoutMills;
    }

    public static void main(String[] args) {
        KafkaQueue stringKafkaQueue = new KafkaQueue(null, null, "", "");
        stringKafkaQueue.getConsumer();
    }
}

