package com.xunlei.niux.dc;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/xunlei/niux/dc/RpcClientFacade.class */
public class RpcClientFacade {
    private static final long QUEUE_OFFER_TIME_OUT_MILLIS = 100;
    private static final long QUEUE_POLL_TIME_OUT_MILLIS = 10;
    private static final int THREAD_NUMBER = 1;
    private boolean isOpen = Boolean.TRUE.booleanValue();
    private Properties properties;
    private List<Thread> dataConsumerThreadList;
    private static final Logger LOG = Logger.getLogger(RpcClientFacade.class);
    private static AtomicInteger totalNumber = new AtomicInteger(0);
    private static final int QUEUE_CAPACITY = 100000;
    private static LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);

    /* loaded from: input_file:com/xunlei/niux/dc/RpcClientFacade$DataConsumer.class */
    static class DataConsumer implements Runnable {
        private Properties properties;
        private int batchSize;
        private RpcClient client;

        public DataConsumer(Properties properties) {
            this.batchSize = 1000;
            this.properties = properties;
            this.batchSize = Integer.parseInt(properties.getProperty("batch-size", "" + this.batchSize));
            init();
        }

        public void init() {
            this.client = RpcClientFactory.getInstance(this.properties);
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            while (true) {
                long currentTimeMillis2 = System.currentTimeMillis();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < this.batchSize; i += RpcClientFacade.THREAD_NUMBER) {
                    try {
                        Event event = (Event) RpcClientFacade.queue.poll(RpcClientFacade.QUEUE_POLL_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS);
                        if (event != null) {
                            arrayList.add(event);
                        }
                    } catch (InterruptedException e) {
                        RpcClientFacade.LOG.error(e);
                    }
                }
                int size = arrayList.size();
                RpcClientFacade.LOG.info("build: " + size + "条记录耗费：" + (System.currentTimeMillis() - currentTimeMillis2) + "ms");
                long currentTimeMillis3 = System.currentTimeMillis();
                try {
                    this.client.appendBatch(arrayList);
                    RpcClientFacade.totalNumber.addAndGet(size);
                } catch (EventDeliveryException e2) {
                    RpcClientFacade.LOG.error(e2);
                    this.client.close();
                    this.client = null;
                    init();
                }
                RpcClientFacade.LOG.info("append: " + size + "条记录耗费：" + (System.currentTimeMillis() - currentTimeMillis3) + "ms");
                RpcClientFacade.LOG.info("当前总共消费记录数: " + RpcClientFacade.totalNumber.get() + ", 队列中记录数； " + RpcClientFacade.queue.size() + ", 总耗时: " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + "s");
            }
        }
    }

    public void init(Properties properties) {
        this.properties = properties;
        this.isOpen = Boolean.parseBoolean(properties.getProperty("dcclient.is.open", "true"));
        LOG.info("isOpen: " + this.isOpen);
        cleanUp();
        if (this.isOpen) {
            int parseInt = Integer.parseInt(properties.getProperty("dcclient.customer.thread.number", "1"));
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < parseInt; i += THREAD_NUMBER) {
                Thread thread = new Thread(new DataConsumer(this.properties));
                thread.start();
                arrayList.add(thread);
            }
        }
    }

    public void sendDataToFlume(List<Event> list) throws InterruptedException {
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            sendDataToFlume(it.next());
        }
    }

    public void sendDataToFlume(Event event) throws InterruptedException {
        if (!this.isOpen || queue.offer(event, QUEUE_OFFER_TIME_OUT_MILLIS, TimeUnit.MILLISECONDS)) {
            return;
        }
        LOG.warn("offer记录失败，超时。event: " + event);
    }

    public void cleanUp() {
        if (!this.isOpen || this.dataConsumerThreadList == null || this.dataConsumerThreadList.isEmpty()) {
            return;
        }
        for (Thread thread : this.dataConsumerThreadList) {
            if (thread != null && thread.isAlive()) {
                thread.stop();
            }
        }
    }
}
