package com.xunlei.niux.serialization;

import com.google.common.collect.Lists;
import com.xunlei.niux.util.EventUtil;
import com.xunlei.niux.util.JsonUtil;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.ResettableInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:com/xunlei/niux/serialization/AvroDeserializer.class */
public class AvroDeserializer implements EventDeserializer {
    private static final Logger logger = LoggerFactory.getLogger(AvroDeserializer.class);
    private final ResettableInputStream in;
    private final Charset outputCharset;
    private final int maxLineLength;
    private volatile boolean isOpen = true;
    private final String schemaPath;
    private final Schema schema;
    private final String contentSeperator;
    private final String fields;
    private final List<String> fieldList;
    private final boolean isForField;
    private final boolean isDebug;
    public static final String SCHEMA_PATH_KEY = "schemaPath";
    public static final String IS_FOR_FIELD_KEY = "isForField";
    public static final String IS_DEBUG_KEY = "isDebug";
    public static final String FIELDS_KEY = "fields";
    public static final String CONTENT_SEPERATOR_KEY = "contentSeperator";
    public static final String OUT_CHARSET_KEY = "outputCharset";
    public static final String CHARSET_DFLT = "UTF-8";
    public static final String MAXLINE_KEY = "maxLineLength";
    public static final int MAXLINE_DFLT = 2048;

    /* loaded from: input_file:com/xunlei/niux/serialization/AvroDeserializer$Builder.class */
    public static class Builder implements EventDeserializer.Builder {
        public EventDeserializer build(Context context, ResettableInputStream resettableInputStream) {
            return new AvroDeserializer(context, resettableInputStream);
        }
    }

    AvroDeserializer(Context context, ResettableInputStream resettableInputStream) {
        this.in = resettableInputStream;
        this.outputCharset = Charset.forName(context.getString(OUT_CHARSET_KEY, CHARSET_DFLT));
        this.maxLineLength = context.getInteger(MAXLINE_KEY, Integer.valueOf(MAXLINE_DFLT)).intValue();
        this.schemaPath = context.getString(SCHEMA_PATH_KEY);
        if (this.schemaPath == null || this.schemaPath.isEmpty()) {
            throw new RuntimeException("schemaPath没有配置。");
        }
        try {
            this.schema = EventUtil.buildSchema(this.schemaPath);
            this.isDebug = context.getBoolean(IS_DEBUG_KEY, Boolean.FALSE).booleanValue();
            this.isForField = context.getBoolean(IS_FOR_FIELD_KEY, Boolean.FALSE).booleanValue();
            if (this.isForField) {
                this.contentSeperator = context.getString(CONTENT_SEPERATOR_KEY, "\t");
                String string = context.getString(FIELDS_KEY);
                if (string == null || string.isEmpty()) {
                    throw new RuntimeException("fields没有配置。");
                }
                String replace = string.replace(" ", "");
                this.fields = replace;
                this.fieldList = Arrays.asList(replace.split(","));
            } else {
                this.fields = null;
                this.fieldList = null;
                this.contentSeperator = null;
            }
            logger.info("schemaPath: " + this.schemaPath);
            logger.info("schema: " + this.schema);
            logger.info("isDebug: " + this.isDebug);
            logger.info("isForField: " + this.isForField);
            logger.info("contentSeperator: " + this.contentSeperator);
            logger.info("fields: " + this.fields);
            logger.info("fieldList: " + this.fieldList);
        } catch (Exception e) {
            throw new RuntimeException("初始化schema出错，schemaPath: " + this.schemaPath, e);
        }
    }

    public Event readEvent() throws IOException {
        ensureOpen();
        String readLine = readLine();
        if (readLine == null) {
            return null;
        }
        Map<String, String> buildMap = this.isForField ? JsonUtil.buildMap(readLine, this.contentSeperator, this.fieldList) : JsonUtil.buildMap(readLine);
        if (buildMap == null) {
            return null;
        }
        if (this.isDebug) {
            logger.info("====================================================================");
            logger.info(buildMap.toString());
        }
        Event buildEvent = EventUtil.buildEvent(buildMap, this.schema, null, null);
        if (this.isDebug) {
            logger.info(buildEvent.toString());
        }
        return buildEvent;
    }

    public List<Event> readEvents(int i) throws IOException {
        Event readEvent;
        ensureOpen();
        LinkedList newLinkedList = Lists.newLinkedList();
        for (int i2 = 0; i2 < i && (readEvent = readEvent()) != null; i2++) {
            newLinkedList.add(readEvent);
        }
        return newLinkedList;
    }

    public void mark() throws IOException {
        ensureOpen();
        this.in.mark();
    }

    public void reset() throws IOException {
        ensureOpen();
        this.in.reset();
    }

    public void close() throws IOException {
        if (this.isOpen) {
            reset();
            this.in.close();
            this.isOpen = false;
        }
    }

    private void ensureOpen() {
        if (!this.isOpen) {
            throw new IllegalStateException("Serializer has been closed");
        }
    }

    private String readLine() throws IOException {
        StringBuilder sb = new StringBuilder();
        int i = 0;
        while (true) {
            int readChar = this.in.readChar();
            if (readChar == -1) {
                break;
            }
            i++;
            if (readChar == 10) {
                break;
            }
            sb.append((char) readChar);
            if (i >= this.maxLineLength) {
                logger.warn("Line length exceeds max ({}), truncating line!", Integer.valueOf(this.maxLineLength));
                break;
            }
        }
        if (i > 0) {
            return sb.toString();
        }
        return null;
    }
}
