/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.sink.entry;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.position.LogIdentity;
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
import com.alibaba.otter.canal.sink.CanalEventDownStreamHandler;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.entry.HeartBeatEntryEventHandler;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.model.Event;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

public class EntryEventSink
extends AbstractCanalEventSink<List<CanalEntry.Entry>>
implements CanalEventSink<List<CanalEntry.Entry>> {
    private static final Logger logger = LoggerFactory.getLogger(EntryEventSink.class);
    private static final int maxFullTimes = 10;
    private CanalEventStore<Event> eventStore;
    protected boolean filterTransactionEntry = false;
    protected boolean filterEmtryTransactionEntry = true;
    protected long emptyTransactionInterval = 5000L;
    protected long emptyTransctionThresold = 8192L;
    protected volatile long lastEmptyTransactionTimestamp = 0L;
    protected AtomicLong lastEmptyTransactionCount = new AtomicLong(0L);

    public EntryEventSink() {
        this.addHandler(new HeartBeatEntryEventHandler());
    }

    public void start() {
        super.start();
        Assert.notNull(this.eventStore);
        for (CanalEventDownStreamHandler handler : this.getHandlers()) {
            if (handler.isStart()) continue;
            handler.start();
        }
    }

    public void stop() {
        super.stop();
        for (CanalEventDownStreamHandler handler : this.getHandlers()) {
            if (!handler.isStart()) continue;
            handler.stop();
        }
    }

    public boolean filter(List<CanalEntry.Entry> event, InetSocketAddress remoteAddress, String destination) {
        return false;
    }

    @Override
    public boolean sink(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress, String destination) throws CanalSinkException, InterruptedException {
        List<CanalEntry.Entry> rowDatas = entrys;
        if (this.filterTransactionEntry) {
            rowDatas = new ArrayList<CanalEntry.Entry>();
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) continue;
                rowDatas.add(entry);
            }
        }
        return this.sinkData(rowDatas, remoteAddress);
    }

    private boolean sinkData(List<CanalEntry.Entry> entrys, InetSocketAddress remoteAddress) throws InterruptedException {
        long currentTimestamp;
        boolean hasRowData = false;
        boolean hasHeartBeat = false;
        ArrayList<Event> events = new ArrayList<Event>();
        for (CanalEntry.Entry entry : entrys) {
            Event event = new Event(new LogIdentity(remoteAddress, Long.valueOf(-1L)), entry);
            if (!this.doFilter(event)) continue;
            events.add(event);
            hasRowData |= entry.getEntryType() == CanalEntry.EntryType.ROWDATA;
            hasHeartBeat |= entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT;
        }
        if (hasRowData) {
            return this.doSink(events);
        }
        if (hasHeartBeat) {
            return this.doSink(events);
        }
        if (this.filterEmtryTransactionEntry && !CollectionUtils.isEmpty(events) && (Math.abs((currentTimestamp = ((Event)events.get(0)).getEntry().getHeader().getExecuteTime()) - this.lastEmptyTransactionTimestamp) > this.emptyTransactionInterval || this.lastEmptyTransactionCount.incrementAndGet() > this.emptyTransctionThresold)) {
            this.lastEmptyTransactionCount.set(0L);
            this.lastEmptyTransactionTimestamp = currentTimestamp;
            return this.doSink(events);
        }
        return true;
    }

    protected boolean doFilter(Event event) {
        if (this.filter != null && event.getEntry().getEntryType() == CanalEntry.EntryType.ROWDATA) {
            String name = this.getSchemaNameAndTableName(event.getEntry());
            boolean need = this.filter.filter((Object)name);
            if (!need) {
                logger.debug("filter name[{}] entry : {}:{}", new Object[]{name, event.getEntry().getHeader().getLogfileName(), event.getEntry().getHeader().getLogfileOffset()});
            }
            return need;
        }
        return true;
    }

    protected boolean doSink(List<Event> events) {
        for (CanalEventDownStreamHandler handler : this.getHandlers()) {
            events = handler.before(events);
        }
        int fullTimes = 0;
        do {
            if (this.eventStore.tryPut(events)) {
                for (CanalEventDownStreamHandler handler : this.getHandlers()) {
                    events = handler.after(events);
                }
                return true;
            }
            this.applyWait(++fullTimes);
            for (CanalEventDownStreamHandler handler : this.getHandlers()) {
                events = handler.retry(events);
            }
        } while (this.running && !Thread.interrupted());
        return false;
    }

    private void applyWait(int fullTimes) {
        int newFullTimes;
        int n = newFullTimes = fullTimes > 10 ? 10 : fullTimes;
        if (fullTimes <= 3) {
            Thread.yield();
        } else {
            LockSupport.parkNanos(1000000L * (long)newFullTimes);
        }
    }

    private String getSchemaNameAndTableName(CanalEntry.Entry entry) {
        return entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
    }

    public void setEventStore(CanalEventStore<Event> eventStore) {
        this.eventStore = eventStore;
    }

    public void setFilterTransactionEntry(boolean filterTransactionEntry) {
        this.filterTransactionEntry = filterTransactionEntry;
    }

    public void setFilterEmtryTransactionEntry(boolean filterEmtryTransactionEntry) {
        this.filterEmtryTransactionEntry = filterEmtryTransactionEntry;
    }

    public void setEmptyTransactionInterval(long emptyTransactionInterval) {
        this.emptyTransactionInterval = emptyTransactionInterval;
    }

    public void setEmptyTransctionThresold(long emptyTransctionThresold) {
        this.emptyTransctionThresold = emptyTransctionThresold;
    }
}

