package com.xunyi.beast.hand.websocket.pipe.factory;

import com.xunyi.beast.hand.websocket.handler.WSEvent;
import com.xunyi.beast.hand.websocket.handler.WSPushMessage;
import com.xunyi.beast.hand.websocket.pipe.Pipe;
import io.reactivex.Observer;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xunyi/beast/hand/websocket/pipe/factory/RocketMQPipeFactory.class */
public class RocketMQPipeFactory extends AbstractPipeFactory<Config> {
    private static final Logger log = LoggerFactory.getLogger(RocketMQPipeFactory.class);

    /* loaded from: input_file:com/xunyi/beast/hand/websocket/pipe/factory/RocketMQPipeFactory$Config.class */
    public static class Config {
        private String namesvrAddr;
        private String topic;
        private String subExpression;

        public String getNamesvrAddr() {
            return this.namesvrAddr;
        }

        public String getTopic() {
            return this.topic;
        }

        public String getSubExpression() {
            return this.subExpression;
        }

        public void setNamesvrAddr(String str) {
            this.namesvrAddr = str;
        }

        public void setTopic(String str) {
            this.topic = str;
        }

        public void setSubExpression(String str) {
            this.subExpression = str;
        }
    }

    /* loaded from: input_file:com/xunyi/beast/hand/websocket/pipe/factory/RocketMQPipeFactory$RocketMQPipe.class */
    public static class RocketMQPipe extends AbstractPipeEmitter implements Pipe {
        private MQPushConsumer consumer;

        public RocketMQPipe(MQPushConsumer mQPushConsumer) {
            this.consumer = mQPushConsumer;
        }

        private void init() {
        }

        @Override // com.xunyi.beast.hand.websocket.pipe.Pipe
        public void subscribe(final Observer<? super WSPushMessage> observer) {
            this.consumer.registerMessageListener(new MessageListenerOrderly() { // from class: com.xunyi.beast.hand.websocket.pipe.factory.RocketMQPipeFactory.RocketMQPipe.1
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    Iterator<MessageExt> it = list.iterator();
                    while (it.hasNext()) {
                        try {
                            observer.onNext(RocketMQPipe.this.parse(it.next().getBody()));
                        } catch (IOException e) {
                            RocketMQPipeFactory.log.warn("parse message, exception", e);
                        }
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
        }

        @Override // com.xunyi.beast.hand.websocket.pipe.Pipe
        public void next(WSEvent wSEvent) {
        }
    }

    @Override // com.xunyi.beast.hand.websocket.pipe.factory.PipeFactory
    public Pipe apply(Config config) {
        try {
            DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("");
            defaultMQPushConsumer.setNamesrvAddr(config.getNamesvrAddr());
            defaultMQPushConsumer.subscribe(config.getTopic(), config.getSubExpression());
            return new RocketMQPipe(defaultMQPushConsumer);
        } catch (MQClientException e) {
            e.printStackTrace();
            throw new RuntimeException((Throwable) e);
        }
    }
}
