package com.xunlei.channel.sms.queue.kafka;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.xunlei.channel.sms.health.exception.ExceptionHealthEvent;
import com.xunlei.channel.sms.health.exception.ExceptionReporter;
import com.xunlei.channel.sms.health.exception.ExceptionType;
import com.xunlei.channel.sms.queue.Queue;
import com.xunlei.channel.sms.queue.QueueElement;
import com.xunlei.channel.sms.serialize.QueueDataConverter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/xunlei/channel/sms/queue/kafka/KafkaQueue.class */
public class KafkaQueue<T> implements Queue<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaQueue.class);
    public static final long DEFAULT_TIMEOUT_MILLS = 5000;
    public static final long DEFAULT_READ_TIMEOUT_MILLS = 1000;
    private String topic;
    private String brokers;
    private String groupId;
    private QueueDataConverter<QueueElement<T>, String> queueDataConverter;
    private long timeoutMills = DEFAULT_TIMEOUT_MILLS;
    private long readTimeoutMills = 1000;
    private ExecutorService executorService = new ThreadPoolExecutor(10, 10, DEFAULT_TIMEOUT_MILLS, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("kafka-queue-thread-%d").build());
    private final ThreadLocal<Consumer<String, String>> consumer = new ThreadLocal<Consumer<String, String>>() { // from class: com.xunlei.channel.sms.queue.kafka.KafkaQueue.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public Consumer<String, String> initialValue() {
            Consumer<String, String> createConsumer = KafkaQueue.this.createConsumer();
            KafkaQueue.logger.info("Initialized kafka consumer: {}", createConsumer);
            return createConsumer;
        }
    };
    private final Producer<String, String> producer = createProducer();

    /* JADX WARN: Multi-variable type inference failed */
    public KafkaQueue(QueueDataConverter<? extends QueueElement<T>, String> queueDataConverter, String str, String str2, String str3) {
        this.queueDataConverter = queueDataConverter;
        this.brokers = str;
        this.topic = str2;
        this.groupId = str3;
    }

    @Override // com.xunlei.channel.sms.queue.Queue
    public boolean push(QueueElement<T> queueElement) {
        return pushToKafka(queueElement);
    }

    @Override // com.xunlei.channel.sms.queue.Queue
    public boolean asynchronousPush(QueueElement<T> queueElement) {
        return pushToKafkaAndReturnFuture(queueElement) != null;
    }

    public boolean pushToKafka(QueueElement<T> queueElement) {
        try {
            RecordMetadata recordMetadata = pushToKafkaAndReturnFuture(queueElement).get(this.timeoutMills, TimeUnit.MILLISECONDS);
            boolean z = recordMetadata != null;
            if (z) {
                reportRecordMetadata(recordMetadata);
                if (logger.isDebugEnabled()) {
                    logger.debug("Succeed push data to queue. data: {}, RecordMetadata: {}", queueElement, recordMetadata);
                }
            } else {
                logger.warn("Error push data to kafka. data: {}, RecordMetadata: {}", queueElement, recordMetadata);
            }
            return z;
        } catch (Exception e) {
            logger.error("Failed to push element to queue, data: " + queueElement + ". Message: " + e.getMessage(), e);
            return false;
        }
    }

    private void reportRecordMetadata(RecordMetadata recordMetadata) {
        if (recordMetadata == null) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis() - recordMetadata.timestamp();
        if (currentTimeMillis >= DEFAULT_TIMEOUT_MILLS) {
            logger.warn("Too long to consume this message: {}, timeConsuming: {}", recordMetadata, Long.valueOf(currentTimeMillis));
        } else if (logger.isDebugEnabled()) {
            logger.warn("Use time: {} to consume this message: {}", Long.valueOf(currentTimeMillis), recordMetadata);
        }
    }

    public Future<RecordMetadata> pushToKafkaAndReturnFuture(final QueueElement<T> queueElement) {
        return this.executorService.submit(new Callable<RecordMetadata>() { // from class: com.xunlei.channel.sms.queue.kafka.KafkaQueue.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public RecordMetadata call() throws Exception {
                Future pushToKafkaAndReturnsMetadata = KafkaQueue.this.pushToKafkaAndReturnsMetadata(queueElement);
                if (pushToKafkaAndReturnsMetadata == null) {
                    return null;
                }
                RecordMetadata recordMetadata = (RecordMetadata) pushToKafkaAndReturnsMetadata.get(KafkaQueue.this.timeoutMills, TimeUnit.MILLISECONDS);
                if (KafkaQueue.logger.isDebugEnabled()) {
                    KafkaQueue.logger.debug("Succeed send element: {} to kafka", queueElement, recordMetadata);
                }
                return recordMetadata;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<RecordMetadata> pushToKafkaAndReturnsMetadata(QueueElement<T> queueElement) {
        try {
            try {
                return this.producer.send(new ProducerRecord(this.topic, this.queueDataConverter.serialize(queueElement)));
            } catch (Exception e) {
                logger.error("Failed to send element: " + queueElement + " to kafka. Error message: " + e.getMessage(), e);
                ExceptionReporter.reportHealthEvent(ExceptionHealthEvent.newHealthEvent(ExceptionType.PUSH_TO_QUEUE_ERROR));
                return null;
            }
        } catch (Exception e2) {
            logger.error("", e2);
            return null;
        }
    }

    @Override // com.xunlei.channel.sms.queue.Queue
    public void setQueueId(String str) {
    }

    @Override // com.xunlei.channel.sms.queue.Queue
    public String getQueueId() {
        return this.topic;
    }

    @Override // com.xunlei.channel.sms.queue.Queue
    public List<QueueElement<T>> pull() {
        ConsumerRecords consumerRecords = null;
        try {
            consumerRecords = this.consumer.get().poll(this.readTimeoutMills);
        } catch (Exception e) {
            logger.error("Pull from queue error. Error message: " + e.getMessage(), e);
            ExceptionReporter.reportHealthEvent(ExceptionHealthEvent.newHealthEvent(ExceptionType.PULL_FROM_QUEUE_ERROR));
        }
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            return null;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Succeed pull data from queue: {}", consumerRecords);
        }
        ArrayList arrayList = new ArrayList(consumerRecords.count());
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            String str = consumerRecord.topic();
            if (str.equals(this.topic)) {
                String str2 = (String) consumerRecord.value();
                try {
                    QueueElement<T> deserialize = this.queueDataConverter.deserialize(str2);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Poll a new record: {} from raw data: {}", deserialize, str2);
                    }
                    arrayList.add(deserialize);
                } catch (Exception e2) {
                    logger.error("", e2);
                }
            } else {
                logger.error("Poll error topic record! topic: {} our topic: {}", str, this.topic);
            }
        }
        return arrayList;
    }

    @Override // com.xunlei.channel.sms.queue.Queue
    public List<QueueElement<T>> pull(boolean z) {
        List<QueueElement<T>> list;
        List<QueueElement<T>> pull = pull();
        while (true) {
            list = pull;
            if (!z || !CollectionUtils.isEmpty(list)) {
                break;
            }
            pull = pull();
        }
        return list;
    }

    public String getTopic() {
        return this.topic;
    }

    public void initKafka() {
        Assert.notNull(this.brokers, "Brokers must presents!");
        Assert.notNull(this.groupId, "GroupId must presents!");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Consumer<String, String> createConsumer() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.brokers);
        hashMap.put("group.id", this.groupId);
        hashMap.put("enable.auto.commit", "true");
        hashMap.put("auto.commit.interval.ms", "10000");
        hashMap.put("session.timeout.ms", 30000);
        hashMap.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        hashMap.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        logger.info("Initializing kafka consumer with properties: {}", hashMap);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(hashMap);
        kafkaConsumer.subscribe(Arrays.asList(this.topic));
        return kafkaConsumer;
    }

    private Producer<String, String> createProducer() {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.brokers);
        hashMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        hashMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        logger.info("Initializing kafka producer with properties: {}", hashMap);
        return new KafkaProducer(hashMap);
    }

    public Producer<String, String> getProducer() {
        return this.producer;
    }

    public Consumer<String, String> getConsumer() {
        return this.consumer.get();
    }

    public void setConsumer(Consumer<String, String> consumer) {
        this.consumer.set(consumer);
    }

    public long getTimeoutMills() {
        return this.timeoutMills;
    }

    public void setTimeoutMills(long j) {
        this.timeoutMills = j;
    }

    public long getReadTimeoutMills() {
        return this.readTimeoutMills;
    }

    public void setReadTimeoutMills(long j) {
        this.readTimeoutMills = j;
    }

    public static void main(String[] strArr) {
        new KafkaQueue(null, null, "", "").getConsumer();
    }
}
