package com.xunlei.channel.report.producer.impl;

import com.xunlei.channel.report.pojo.ReportData;
import com.xunlei.channel.report.pojo.ReportDataSet;
import com.xunlei.channel.report.producer.AbstractDataReporter;
import com.xunlei.channel.report.serialize.impl.JacksonSerializer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

@Service
/* loaded from: input_file:WEB-INF/lib/pay-report-core-1.0-SNAPSHOT.jar:com/xunlei/channel/report/producer/impl/KafkaDataReporter.class */
public class KafkaDataReporter extends AbstractDataReporter<JacksonSerializer> implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(KafkaDataReporter.class);
    private static final Long TIME_OUT_SECONDS = 5L;
    private KafkaProducer kafkaProducer;
    private String brokers;
    private String topic;

    @Override // com.xunlei.channel.report.producer.DataReporter
    public boolean report(ReportDataSet reportDataSet) {
        Collection<ReportData> dataSet = reportDataSet.dataSet();
        HashSet hashSet = new HashSet();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (ReportData reportData : dataSet) {
            try {
                String serialize = ((JacksonSerializer) this.reportDataSerializer).serialize((Object) reportData);
                concurrentHashMap.put(this.kafkaProducer.send(new ProducerRecord(this.topic, reportData.getId() != null ? reportData.getId() : null, serialize), new Callback() { // from class: com.xunlei.channel.report.producer.impl.KafkaDataReporter.1
                    @Override // org.apache.kafka.clients.producer.Callback
                    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                        if (exc != null) {
                            KafkaDataReporter.logger.error("A exception acquired, message: " + exc.getMessage(), (Throwable) exc);
                        }
                    }
                }), reportData);
            } catch (Exception e) {
                logger.error("Caught exception when sending data: " + reportData + " message: " + e.getMessage(), (Throwable) e);
            }
        }
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            try {
                if (((Future) entry.getKey()).get(TIME_OUT_SECONDS.longValue(), TimeUnit.SECONDS) == null) {
                    hashSet.add(entry.getValue());
                }
            } catch (Exception e2) {
                logger.error("", (Throwable) e2);
                hashSet.add(entry.getValue());
            }
        }
        if (!CollectionUtils.isEmpty(hashSet)) {
            logger.warn("Returning failed records: {}", hashSet);
        }
        reportDataSet.setFailedRecords(hashSet);
        return true;
    }

    private void removeRecord(String str, Set<ReportData> set) {
        if (set == null) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Removing failed record: {} from records: {}", str, set);
        }
        Iterator<ReportData> it = set.iterator();
        while (it.hasNext()) {
            String id = it.next().getId();
            if (StringUtils.hasText(id) && id.equals(str)) {
                it.remove();
                if (logger.isDebugEnabled()) {
                    logger.debug("Removing failed record: {}", id);
                    return;
                }
                return;
            }
        }
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() throws Exception {
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        this.topic = str;
    }

    public String getBrokers() {
        return this.brokers;
    }

    public void setBrokers(String str) {
        this.brokers = str;
    }

    public void initKafka() {
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", this.brokers);
            hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            logger.info("Initializing kafka producer with properties: {}", hashMap);
            this.kafkaProducer = new KafkaProducer(hashMap);
        } catch (Exception e) {
            logger.error("Initializing kafka error with message: " + e.getMessage(), (Throwable) e);
        }
    }
}
