/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.otter.canal.server.netty.handler;

import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitor;
import com.alibaba.otter.canal.common.zookeeper.running.ServerRunningMonitors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalPacket;
import com.alibaba.otter.canal.protocol.ClientIdentity;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.server.embedded.CanalServerWithEmbedded;
import com.alibaba.otter.canal.server.netty.NettyUtils;
import com.google.protobuf.ByteString;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.helpers.MessageFormatter;
import org.springframework.util.CollectionUtils;

public class SessionHandler
extends SimpleChannelHandler {
    private static final Logger logger = LoggerFactory.getLogger(SessionHandler.class);
    private CanalServerWithEmbedded embeddedServer;

    public SessionHandler() {
    }

    public SessionHandler(CanalServerWithEmbedded embeddedServer) {
        this.embeddedServer = embeddedServer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        logger.info("message receives in session handler...");
        ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
        CanalPacket.Packet packet = CanalPacket.Packet.parseFrom((byte[])buffer.readBytes(buffer.readableBytes()).array());
        ClientIdentity clientIdentity = null;
        try {
            switch (packet.getType()) {
                case SUBSCRIPTION: {
                    CanalPacket.Sub sub = CanalPacket.Sub.parseFrom((ByteString)packet.getBody());
                    if (StringUtils.isNotEmpty((String)sub.getDestination()) && StringUtils.isNotEmpty((String)sub.getClientId())) {
                        ServerRunningMonitor runningMonitor;
                        clientIdentity = new ClientIdentity(sub.getDestination(), Short.valueOf(sub.getClientId()).shortValue(), sub.getFilter());
                        MDC.put((String)"destination", (String)clientIdentity.getDestination());
                        if (!this.embeddedServer.isStart(clientIdentity.getDestination()) && !(runningMonitor = ServerRunningMonitors.getRunningMonitor((String)clientIdentity.getDestination())).isStart()) {
                            runningMonitor.start();
                        }
                        this.embeddedServer.subscribe(clientIdentity);
                        ctx.setAttachment((Object)clientIdentity);
                        NettyUtils.ack(ctx.getChannel(), null);
                        return;
                    } else {
                        NettyUtils.error(401, MessageFormatter.format((String)"destination or clientId is null", (Object)sub.toString()).getMessage(), ctx.getChannel(), null);
                        return;
                    }
                }
                case UNSUBSCRIPTION: {
                    CanalPacket.Unsub unsub = CanalPacket.Unsub.parseFrom((ByteString)packet.getBody());
                    if (StringUtils.isNotEmpty((String)unsub.getDestination()) && StringUtils.isNotEmpty((String)unsub.getClientId())) {
                        clientIdentity = new ClientIdentity(unsub.getDestination(), Short.valueOf(unsub.getClientId()).shortValue(), unsub.getFilter());
                        MDC.put((String)"destination", (String)clientIdentity.getDestination());
                        this.embeddedServer.unsubscribe(clientIdentity);
                        this.stopCanalInstanceIfNecessary(clientIdentity);
                        NettyUtils.ack(ctx.getChannel(), null);
                        return;
                    } else {
                        NettyUtils.error(401, MessageFormatter.format((String)"destination or clientId is null", (Object)unsub.toString()).getMessage(), ctx.getChannel(), null);
                        return;
                    }
                }
                case GET: {
                    CanalPacket.Get get = CanalPacket.Get.parseFrom((ByteString)packet.getBody());
                    if (StringUtils.isNotEmpty((String)get.getDestination()) && StringUtils.isNotEmpty((String)get.getClientId())) {
                        clientIdentity = new ClientIdentity(get.getDestination(), Short.valueOf(get.getClientId()).shortValue());
                        MDC.put((String)"destination", (String)clientIdentity.getDestination());
                        Message message = null;
                        if (get.getTimeout() == -1L) {
                            message = this.embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize());
                        } else {
                            TimeUnit unit = this.convertTimeUnit(get.getUnit());
                            message = this.embeddedServer.getWithoutAck(clientIdentity, get.getFetchSize(), get.getTimeout(), unit);
                        }
                        CanalPacket.Packet.Builder packetBuilder = CanalPacket.Packet.newBuilder();
                        packetBuilder.setType(CanalPacket.PacketType.MESSAGES);
                        CanalPacket.Messages.Builder messageBuilder = CanalPacket.Messages.newBuilder();
                        messageBuilder.setBatchId(message.getId());
                        if (message.getId() != -1L && !CollectionUtils.isEmpty((Collection)message.getEntries())) {
                            for (CanalEntry.Entry entry : message.getEntries()) {
                                messageBuilder.addMessages(entry.toByteString());
                            }
                        }
                        packetBuilder.setBody(messageBuilder.build().toByteString());
                        NettyUtils.write(ctx.getChannel(), packetBuilder.build().toByteArray(), null);
                        return;
                    } else {
                        NettyUtils.error(401, MessageFormatter.format((String)"destination or clientId is null", (Object)get.toString()).getMessage(), ctx.getChannel(), null);
                        return;
                    }
                }
                case CLIENTACK: {
                    CanalPacket.ClientAck ack = CanalPacket.ClientAck.parseFrom((ByteString)packet.getBody());
                    MDC.put((String)"destination", (String)ack.getDestination());
                    if (StringUtils.isNotEmpty((String)ack.getDestination()) && StringUtils.isNotEmpty((String)ack.getClientId())) {
                        if (ack.getBatchId() == 0L) {
                            NettyUtils.error(402, MessageFormatter.format((String)"batchId should assign value", (Object)ack.toString()).getMessage(), ctx.getChannel(), null);
                            return;
                        } else if (ack.getBatchId() == -1L) {
                            return;
                        } else {
                            clientIdentity = new ClientIdentity(ack.getDestination(), Short.valueOf(ack.getClientId()).shortValue());
                            this.embeddedServer.ack(clientIdentity, ack.getBatchId());
                            return;
                        }
                    } else {
                        NettyUtils.error(401, MessageFormatter.format((String)"destination or clientId is null", (Object)ack.toString()).getMessage(), ctx.getChannel(), null);
                        return;
                    }
                }
                case CLIENTROLLBACK: {
                    CanalPacket.ClientRollback rollback = CanalPacket.ClientRollback.parseFrom((ByteString)packet.getBody());
                    MDC.put((String)"destination", (String)rollback.getDestination());
                    if (StringUtils.isNotEmpty((String)rollback.getDestination()) && StringUtils.isNotEmpty((String)rollback.getClientId())) {
                        clientIdentity = new ClientIdentity(rollback.getDestination(), Short.valueOf(rollback.getClientId()).shortValue());
                        if (rollback.getBatchId() == 0L) {
                            this.embeddedServer.rollback(clientIdentity);
                            return;
                        } else {
                            this.embeddedServer.rollback(clientIdentity, rollback.getBatchId());
                            return;
                        }
                    } else {
                        NettyUtils.error(401, MessageFormatter.format((String)"destination or clientId is null", (Object)rollback.toString()).getMessage(), ctx.getChannel(), null);
                        return;
                    }
                }
                default: {
                    NettyUtils.error(400, MessageFormatter.format((String)"packet type={} is NOT supported!", (Object)packet.getType()).getMessage(), ctx.getChannel(), null);
                    return;
                }
            }
        }
        catch (Throwable exception) {
            NettyUtils.error(400, MessageFormatter.format((String)"something goes wrong with channel:{}, exception={}", (Object)ctx.getChannel(), (Object)ExceptionUtils.getStackTrace((Throwable)exception)).getMessage(), ctx.getChannel(), null);
            return;
        }
        finally {
            MDC.remove((String)"destination");
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        logger.error("something goes wrong with channel:{}, exception={}", (Object)ctx.getChannel(), (Object)ExceptionUtils.getStackTrace((Throwable)e.getCause()));
        ctx.getChannel().close();
    }

    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    }

    private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) {
        ServerRunningMonitor runningMonitor;
        List<ClientIdentity> clientIdentitys = this.embeddedServer.listAllSubscribe(clientIdentity.getDestination());
        if (clientIdentitys != null && clientIdentitys.size() == 1 && clientIdentitys.contains(clientIdentity) && (runningMonitor = ServerRunningMonitors.getRunningMonitor((String)clientIdentity.getDestination())).isStart()) {
            runningMonitor.release();
        }
    }

    private TimeUnit convertTimeUnit(int unit) {
        switch (unit) {
            case 0: {
                return TimeUnit.NANOSECONDS;
            }
            case 1: {
                return TimeUnit.MICROSECONDS;
            }
            case 2: {
                return TimeUnit.MILLISECONDS;
            }
            case 3: {
                return TimeUnit.SECONDS;
            }
            case 4: {
                return TimeUnit.MINUTES;
            }
            case 5: {
                return TimeUnit.HOURS;
            }
            case 6: {
                return TimeUnit.DAYS;
            }
        }
        return TimeUnit.MILLISECONDS;
    }

    public void setEmbeddedServer(CanalServerWithEmbedded embeddedServer) {
        this.embeddedServer = embeddedServer;
    }
}

