/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.server.embedded;

import com.alibaba.otter.canal.common.AbstractCanalLifeCycle;
import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.instance.core.CanalInstanceGenerator;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.position.Position;
import com.alibaba.otter.canal.protocol.position.PositionRange;
import com.alibaba.otter.canal.server.CanalServer;
import com.alibaba.otter.canal.server.CanalService;
import com.alibaba.otter.canal.server.exception.CanalServerException;
import com.alibaba.otter.canal.store.CanalEventStore;
import com.alibaba.otter.canal.store.model.Event;
import com.alibaba.otter.canal.store.model.Events;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.MigrateMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.util.CollectionUtils;

public class CanalServerWithEmbedded
extends AbstractCanalLifeCycle
implements CanalServer,
CanalService {
    private static final Logger logger = LoggerFactory.getLogger(CanalServerWithEmbedded.class);
    private Map<String, CanalInstance> canalInstances;
    private CanalInstanceGenerator canalInstanceGenerator;

    public static CanalServerWithEmbedded instance() {
        return SingletonHolder.CANAL_SERVER_WITH_EMBEDDED;
    }

    @Override
    public void start() {
        if (!this.isStart()) {
            super.start();
            this.canalInstances = MigrateMap.makeComputingMap((Function)new Function<String, CanalInstance>(){

                public CanalInstance apply(String destination) {
                    return CanalServerWithEmbedded.this.canalInstanceGenerator.generate(destination);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        super.stop();
        for (Map.Entry<String, CanalInstance> entry : this.canalInstances.entrySet()) {
            try {
                CanalInstance instance = entry.getValue();
                if (!instance.isStart()) continue;
                try {
                    String destination = entry.getKey();
                    MDC.put((String)"destination", (String)destination);
                    entry.getValue().stop();
                    logger.info("stop CanalInstances[{}] successfully", (Object)destination);
                }
                finally {
                    MDC.remove((String)"destination");
                }
            }
            catch (Exception e) {
                logger.error(String.format("stop CanalInstance[%s] has an error", entry.getKey()), (Throwable)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(String destination) {
        CanalInstance canalInstance = this.canalInstances.get(destination);
        if (!canalInstance.isStart()) {
            try {
                MDC.put((String)"destination", (String)destination);
                canalInstance.start();
                logger.info("start CanalInstances[{}] successfully", (Object)destination);
            }
            finally {
                MDC.remove((String)"destination");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(String destination) {
        CanalInstance canalInstance = this.canalInstances.remove(destination);
        if (canalInstance != null && canalInstance.isStart()) {
            try {
                MDC.put((String)"destination", (String)destination);
                canalInstance.stop();
                logger.info("stop CanalInstances[{}] successfully", (Object)destination);
            }
            finally {
                MDC.remove((String)"destination");
            }
        }
    }

    public boolean isStart(String destination) {
        return this.canalInstances.containsKey(destination) && this.canalInstances.get(destination).isStart();
    }

    @Override
    public void subscribe(ClientIdentity clientIdentity) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        if (!canalInstance.getMetaManager().isStart()) {
            canalInstance.getMetaManager().start();
        }
        canalInstance.getMetaManager().subscribe(clientIdentity);
        Position position = canalInstance.getMetaManager().getCursor(clientIdentity);
        if (position == null) {
            position = canalInstance.getEventStore().getFirstPosition();
            if (position != null) {
                canalInstance.getMetaManager().updateCursor(clientIdentity, position);
            }
            logger.info("subscribe successfully, {} with first position:{} ", (Object)clientIdentity, (Object)position);
        } else {
            logger.info("subscribe successfully, use last cursor position:{} ", (Object)clientIdentity, (Object)position);
        }
        canalInstance.subscribeChange(clientIdentity);
    }

    @Override
    public void unsubscribe(ClientIdentity clientIdentity) throws CanalServerException {
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        canalInstance.getMetaManager().unsubscribe(clientIdentity);
        logger.info("unsubscribe successfully, {}", (Object)clientIdentity);
    }

    public List<ClientIdentity> listAllSubscribe(String destination) throws CanalServerException {
        CanalInstance canalInstance = this.canalInstances.get(destination);
        return canalInstance.getMetaManager().listAllSubscribeInfo(destination);
    }

    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
        return this.get(clientIdentity, batchSize, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message get(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException {
        CanalInstance canalInstance;
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance2 = canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance2) {
            PositionRange positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            if (positionRanges != null) {
                throw new CanalServerException(String.format("clientId:%s has last batch:[%s] isn't ack , maybe loss data", clientIdentity.getClientId(), positionRanges));
            }
            Events<Event> events = null;
            Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
            events = this.getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
            if (CollectionUtils.isEmpty((Collection)events.getEvents())) {
                logger.debug("get successfully, clientId:{} batchSize:{} but result is null", new Object[]{clientIdentity.getClientId(), batchSize});
                return new Message(-1L, new ArrayList());
            }
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            List entrys = Lists.transform((List)events.getEvents(), (Function)new Function<Event, CanalEntry.Entry>(){

                public CanalEntry.Entry apply(Event input) {
                    return input.getEntry();
                }
            });
            logger.info("get successfully, clientId:{} batchSize:{} real size is {} and result is [batchId:{} , position:{}]", new Object[]{clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()});
            this.ack(clientIdentity, batchId);
            return new Message(batchId.longValue(), entrys);
        }
    }

    @Override
    public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize) throws CanalServerException {
        return this.getWithoutAck(clientIdentity, batchSize, null, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Message getWithoutAck(ClientIdentity clientIdentity, int batchSize, Long timeout, TimeUnit unit) throws CanalServerException {
        CanalInstance canalInstance;
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance2 = canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        synchronized (canalInstance2) {
            PositionRange positionRanges = canalInstance.getMetaManager().getLastestBatch(clientIdentity);
            Events<Event> events = null;
            if (positionRanges != null) {
                events = this.getEvents(canalInstance.getEventStore(), positionRanges.getStart(), batchSize, timeout, unit);
            } else {
                Position start = canalInstance.getMetaManager().getCursor(clientIdentity);
                if (start == null) {
                    start = canalInstance.getEventStore().getFirstPosition();
                }
                events = this.getEvents(canalInstance.getEventStore(), start, batchSize, timeout, unit);
            }
            if (CollectionUtils.isEmpty((Collection)events.getEvents())) {
                logger.debug("getWithoutAck successfully, clientId:{} batchSize:{} but result is null", new Object[]{clientIdentity.getClientId(), batchSize});
                return new Message(-1L, new ArrayList());
            }
            Long batchId = canalInstance.getMetaManager().addBatch(clientIdentity, events.getPositionRange());
            List entrys = Lists.transform((List)events.getEvents(), (Function)new Function<Event, CanalEntry.Entry>(){

                public CanalEntry.Entry apply(Event input) {
                    return input.getEntry();
                }
            });
            logger.info("getWithoutAck successfully, clientId:{} batchSize:{}  real size is {} and result is [batchId:{} , position:{}]", new Object[]{clientIdentity.getClientId(), batchSize, entrys.size(), batchId, events.getPositionRange()});
            return new Message(batchId.longValue(), entrys);
        }
    }

    public List<Long> listBatchIds(ClientIdentity clientIdentity) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        Map batchs = canalInstance.getMetaManager().listAllBatchs(clientIdentity);
        ArrayList<Long> result = new ArrayList<Long>(batchs.keySet());
        Collections.sort(result);
        return result;
    }

    @Override
    public void ack(ClientIdentity clientIdentity, long batchId) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        this.checkSubscribe(clientIdentity);
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        PositionRange positionRanges = null;
        positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, Long.valueOf(batchId));
        if (positionRanges == null) {
            throw new CanalServerException(String.format("ack error , clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId));
        }
        if (positionRanges.getAck() != null) {
            canalInstance.getMetaManager().updateCursor(clientIdentity, positionRanges.getAck());
            logger.info("ack successfully, clientId:{} batchId:{} position:{}", new Object[]{clientIdentity.getClientId(), batchId, positionRanges});
        }
        canalInstance.getEventStore().ack(positionRanges.getEnd());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback(ClientIdentity clientIdentity) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }
        CanalInstance canalInstance2 = canalInstance;
        synchronized (canalInstance2) {
            canalInstance.getMetaManager().clearAllBatchs(clientIdentity);
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{}", new Object[]{clientIdentity.getClientId()});
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
        this.checkStart(clientIdentity.getDestination());
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            return;
        }
        CanalInstance canalInstance2 = canalInstance;
        synchronized (canalInstance2) {
            PositionRange positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity, batchId);
            if (positionRanges == null) {
                throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check", clientIdentity.getClientId(), batchId));
            }
            canalInstance.getEventStore().rollback();
            logger.info("rollback successfully, clientId:{} batchId:{} position:{}", new Object[]{clientIdentity.getClientId(), batchId, positionRanges});
        }
    }

    public Map<String, CanalInstance> getCanalInstances() {
        return Maps.newHashMap(this.canalInstances);
    }

    private Events<Event> getEvents(CanalEventStore eventStore, Position start, int batchSize, Long timeout, TimeUnit unit) {
        if (timeout == null) {
            return eventStore.tryGet(start, batchSize);
        }
        try {
            if (timeout <= 0L) {
                return eventStore.get(start, batchSize);
            }
            return eventStore.get(start, batchSize, timeout.longValue(), unit);
        }
        catch (Exception e) {
            throw new CanalServerException(e);
        }
    }

    private void checkSubscribe(ClientIdentity clientIdentity) {
        CanalInstance canalInstance = this.canalInstances.get(clientIdentity.getDestination());
        boolean hasSubscribe = canalInstance.getMetaManager().hasSubscribe(clientIdentity);
        if (!hasSubscribe) {
            throw new CanalServerException(String.format("ClientIdentity:%s should subscribe first", clientIdentity.toString()));
        }
    }

    private void checkStart(String destination) {
        if (!this.isStart(destination)) {
            throw new CanalServerException(String.format("destination:%s should start first", destination));
        }
    }

    public void setCanalInstanceGenerator(CanalInstanceGenerator canalInstanceGenerator) {
        this.canalInstanceGenerator = canalInstanceGenerator;
    }

    private static class SingletonHolder {
        private static final CanalServerWithEmbedded CANAL_SERVER_WITH_EMBEDDED = new CanalServerWithEmbedded();

        private SingletonHolder() {
        }
    }
}

