/*
 * Decompiled with CFR 0.152.
 */
package com.xunlei.netty.httpserver.async;

import com.xunlei.netty.httpserver.async.AsyncCallback;
import com.xunlei.netty.httpserver.component.XLContextAttachment;
import com.xunlei.netty.httpserver.exception.ClosedChannelError;
import com.xunlei.netty.httpserver.handler.TextResponseHandlerManager;
import com.xunlei.netty.httpserver.util.NetUtil;
import com.xunlei.util.Log;
import com.xunlei.util.stat.AbstarctTimeSpanStat;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.slf4j.Logger;

public class AsyncProxyHandler
extends SimpleChannelHandler {
    public static final Object ASYNC_RESPONSE = new Object();
    private static final Logger log = Log.getLogger();
    protected Map<SocketAddress, AsyncStat> addressStatMap = new ConcurrentHashMap<SocketAddress, AsyncStat>();
    protected ClientBootstrap backstageClientBootstrap;
    protected final ConcurrentHashMap<Channel, Queue<XLContextAttachment>> channelAttachMap = new ConcurrentHashMap();
    protected int slowThreshold = 1000;

    public static void writeResponseToFront(TextResponseHandlerManager localHttpServerResponseHandlerManager, XLContextAttachment attach, Object cmdReturnObj) {
        try {
            localHttpServerResponseHandlerManager.writeResponse(attach, cmdReturnObj);
        }
        catch (ClosedChannelError e) {
            log.error("channelClosed    :{}", attach.getChannelHandlerContext().getChannel());
        }
    }

    public AsyncProxyHandler(ClientBootstrap backstageClientBootstrap) {
        this.backstageClientBootstrap = backstageClientBootstrap;
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.getClientAttach((ChannelHandlerContext)ctx).asyncStat.channelCloseCounter.incrementAndGet();
        this.channelAttachMap.remove(ctx.getChannel());
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        XLContextAttachment attach;
        NetUtil.exceptionCaught(ctx, e);
        Channel c = ctx.getChannel();
        Queue<XLContextAttachment> q = this.channelAttachMap.get(c);
        if (q != null && (attach = q.poll()) != null) {
            this.exceptionCaught(ctx, e.getCause(), attach);
        }
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e, XLContextAttachment attach) throws Exception {
        AsyncCallback asyncProxyNextHandler = (AsyncCallback)attach.getInnerAttach();
        if (asyncProxyNextHandler == null) {
            log.error("cannot find asyncCallback,channel:{},attach:{} ,reason:[attach didn't register asyncCallback],message:{}", ctx.getChannel(), (Object)attach);
        } else {
            asyncProxyNextHandler.exceptionCaught(ctx, e, attach);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public AsyncStat getAsyncStat(SocketAddress addr) {
        AsyncStat as = this.addressStatMap.get(addr);
        if (as == null) {
            Map<SocketAddress, AsyncStat> map = this.addressStatMap;
            synchronized (map) {
                as = this.addressStatMap.get(addr);
                if (as == null) {
                    as = new AsyncStat(addr);
                    this.addressStatMap.put(addr, as);
                }
            }
        }
        return as;
    }

    protected Channel getChannel(SocketAddress backstageHostAddress) {
        return this.newChannel(backstageHostAddress);
    }

    public Channel getChannel(XLContextAttachment attach, AsyncCallback asyncCallback, SocketAddress backstageHostAddress) {
        Channel channel = this.getChannel(backstageHostAddress);
        this.registerChannelAndAttach(channel, attach);
        attach.registerAsyncCallback(asyncCallback);
        return channel;
    }

    public ConcurrentHashMap<Channel, Queue<XLContextAttachment>> getChannelAttachMap() {
        return this.channelAttachMap;
    }

    public XLClientContextAttachment getClientAttach(ChannelHandlerContext ctx) {
        XLClientContextAttachment clientAttach = (XLClientContextAttachment)ctx.getAttachment();
        if (clientAttach == null) {
            XLClientContextAttachment newAttach = new XLClientContextAttachment(ctx);
            ctx.setAttachment(newAttach);
            return newAttach;
        }
        return clientAttach;
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        block10: {
            Channel c = ctx.getChannel();
            XLClientContextAttachment clientAttach = (XLClientContextAttachment)ctx.getAttachment();
            if (clientAttach == null) {
                log.error("cannot find clientAttach when messageReceived,channel:{},message:{}", c, e.getMessage());
            } else {
                clientAttach.messageReceived();
            }
            Queue<XLContextAttachment> q = this.channelAttachMap.get(c);
            if (q == null) {
                log.error("cannot find channnel:{} mapped ATTACH QUEUE,reason:[didn't getChannel() properly],message:{}", c, e.getMessage());
                c.close();
                return;
            }
            XLContextAttachment attach = q.poll();
            if (attach == null) {
                log.error("cannot find channnel:{} mapped ATTACH,reason:[getChannel() then send more request,i.e channel.write(req) twice],message:{}", c, e.getMessage());
                c.close();
            }
            try {
                try {
                    AsyncCallback asyncCallback = attach.getAsyncCallback();
                    if (asyncCallback == null) {
                        log.error("cannot find asyncCallback,channel:{},attach:{} ,reason:[attach didn't register asyncCallback],message:{}", c, (Object)attach);
                        break block10;
                    }
                    asyncCallback.messageReceived(ctx, e, attach);
                }
                catch (Throwable e2) {
                    log.error("exceptionCaught  :{}", attach, (Object)e2);
                    this.exceptionCaught(ctx, e2, attach);
                    this.messageReceivedFinally(ctx, e);
                }
            }
            finally {
                this.messageReceivedFinally(ctx, e);
            }
        }
    }

    protected void messageReceivedFinally(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ctx.getChannel().close();
    }

    public Channel newChannel(SocketAddress backstageHostAddress) {
        ChannelFuture cf = this.backstageClientBootstrap.connect(backstageHostAddress);
        Channel c = cf.awaitUninterruptibly().getChannel();
        AsyncStat as = this.getAsyncStat(backstageHostAddress);
        if (!c.isConnected()) {
            int count = as.newChannelFailCounter.incrementAndGet();
            log.error("cannot creat a connected channel:{},historyCount:{}", backstageHostAddress, (Object)count);
            throw new RuntimeException("cannt creat a connected channel:" + backstageHostAddress + ",historyCount:" + count);
        }
        int count = as.newChannelOkCounter.incrementAndGet();
        log.debug("create one channel:{},historyCount:{}", c, (Object)count);
        return c;
    }

    protected StringBuilder printAddressStatMap(StringBuilder sb) {
        for (AsyncStat as : this.addressStatMap.values()) {
            sb.append(as).append("\n\n");
        }
        return sb;
    }

    protected boolean printChannel(StringBuilder sb, Channel c, Queue<XLContextAttachment> q) {
        boolean r = false;
        sb.append(c).append("\t");
        if (q == null) {
            sb.append("null");
        } else if (q.isEmpty()) {
            sb.append("empty");
        } else {
            r = true;
            sb.append(q);
        }
        sb.append("\n");
        return r;
    }

    public StringBuilder printStatInfo() {
        StringBuilder sb = new StringBuilder();
        int runningChannel = 0;
        for (Map.Entry<Channel, Queue<XLContextAttachment>> entry : this.channelAttachMap.entrySet()) {
            Queue<XLContextAttachment> q;
            Channel c = entry.getKey();
            if (!this.printChannel(sb, c, q = entry.getValue())) continue;
            ++runningChannel;
        }
        sb.append("\n").append("runningChannel:").append(runningChannel);
        sb.append("\n----------------------------------------------------\n");
        this.printAddressStatMap(sb);
        return sb;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void registerChannelAndAttach(Channel channel, XLContextAttachment attach) {
        Queue<XLContextAttachment> q = null;
        this.getAsyncStat((SocketAddress)channel.getRemoteAddress()).registerChannelAndAttachCounter.incrementAndGet();
        Channel channel2 = channel;
        synchronized (channel2) {
            q = this.channelAttachMap.get(channel);
            if (q == null) {
                q = new LinkedBlockingQueue<XLContextAttachment>();
                this.channelAttachMap.put(channel, q);
            }
        }
        if (q.size() > 0) {
            log.error("channnel:{} has {} attach:{}", new Object[]{channel, q.size(), q});
        }
        q.offer(attach);
    }

    @Override
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
        this.getClientAttach(ctx).writeComplete();
    }

    public class AsyncStat {
        private SocketAddress address;
        protected StageTimeSpanStat asyncClientStat;
        protected AtomicInteger channelCloseCounter;
        protected AtomicInteger newChannelFailCounter;
        protected AtomicInteger newChannelOkCounter;
        protected AtomicInteger registerChannelAndAttachCounter;
        protected AtomicInteger requestCounter;
        protected AtomicInteger responseCounter;

        public AsyncStat(SocketAddress address) {
            this.asyncClientStat = new StageTimeSpanStat("\u8be6\u7ec6", true);
            this.channelCloseCounter = new AtomicInteger();
            this.newChannelFailCounter = new AtomicInteger();
            this.newChannelOkCounter = new AtomicInteger();
            this.registerChannelAndAttachCounter = new AtomicInteger();
            this.requestCounter = new AtomicInteger();
            this.responseCounter = new AtomicInteger();
            this.address = address;
        }

        public String toString() {
            return String.format("%s:\nnewChannelOkCounter=%s\nnewChannelFailCounter=%s\nregisterChannelAndAttachCounter=%s\nchannelCloseCounter=%s\nrequestCounter=%s\nresponseCounter=%s\nasyncClientStat:\n%s%s", this.address, this.newChannelOkCounter, this.newChannelFailCounter, this.registerChannelAndAttachCounter, this.channelCloseCounter, this.requestCounter, this.responseCounter, AbstarctTimeSpanStat.tableHeader, this.asyncClientStat);
        }
    }

    public class StageTimeSpanStat
    extends AbstarctTimeSpanStat {
        private boolean warn;

        public StageTimeSpanStat(String name, boolean warn) {
            super(name);
            this.warn = warn;
        }

        public void record(long end, long begin, XLClientContextAttachment attach) {
            if (begin <= 0L || end <= 0L) {
                return;
            }
            this.all_num.incrementAndGet();
            long span = end - begin;
            this.all_span.addAndGet(span);
            if (span >= (long)AsyncProxyHandler.this.slowThreshold) {
                this.slow_num.incrementAndGet();
                this.slow_span.addAndGet(span);
                if (this.warn) {
                    log.error("ASYNC_CIENT_SLOW:{}:{} [{}ms]\n[attach:{}]", new Object[]{this.name, attach, span, attach});
                }
            }
            if (span > this.max_span) {
                this.max_span = span;
            }
        }
    }

    public class XLClientContextAttachment {
        protected AsyncStat asyncStat;
        private ChannelHandlerContext ctx;
        protected long messageReceived;
        protected long writeComplete;

        public XLClientContextAttachment(ChannelHandlerContext ctx) {
            this.ctx = ctx;
            SocketAddress addr = ctx.getChannel().getRemoteAddress();
            this.asyncStat = AsyncProxyHandler.this.getAsyncStat(addr);
        }

        public void messageReceived() {
            this.messageReceived = System.currentTimeMillis();
            this.asyncStat.asyncClientStat.record(this.messageReceived, this.writeComplete, this);
        }

        public String toString() {
            return "XLClientContextAttachment:" + this.ctx.getChannel();
        }

        public void writeComplete() {
            this.writeComplete = System.currentTimeMillis();
            this.asyncStat.requestCounter.incrementAndGet();
        }
    }
}

