package com.xunyi.beast.hand.websocket.pipe.factory;

import com.xunyi.beast.hand.websocket.handler.WSEvent;
import com.xunyi.beast.hand.websocket.handler.WSPushMessage;
import com.xunyi.beast.hand.websocket.pipe.Pipe;
import io.reactivex.Observer;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.lang.NonNull;

/* loaded from: input_file:com/xunyi/beast/hand/websocket/pipe/factory/RedisPipeFactory.class */
public class RedisPipeFactory extends AbstractPipeFactory<Config> {
    private static final Logger log = LoggerFactory.getLogger(RedisPipeFactory.class);

    /* loaded from: input_file:com/xunyi/beast/hand/websocket/pipe/factory/RedisPipeFactory$Config.class */
    public static class Config {
        private String hostname;
        private int port;
        private String password;
        private int database;
        private String channel;

        public String getHostname() {
            return this.hostname;
        }

        public int getPort() {
            return this.port;
        }

        public String getPassword() {
            return this.password;
        }

        public int getDatabase() {
            return this.database;
        }

        public String getChannel() {
            return this.channel;
        }

        public void setHostname(String str) {
            this.hostname = str;
        }

        public void setPort(int i) {
            this.port = i;
        }

        public void setPassword(String str) {
            this.password = str;
        }

        public void setDatabase(int i) {
            this.database = i;
        }

        public void setChannel(String str) {
            this.channel = str;
        }
    }

    /* loaded from: input_file:com/xunyi/beast/hand/websocket/pipe/factory/RedisPipeFactory$RedisPipe.class */
    public static class RedisPipe extends AbstractPipeEmitter implements Pipe {
        private String channel;
        private JedisConnectionFactory connectionFactory;
        private RedisMessageListenerContainer container;

        public RedisPipe(String str, JedisConnectionFactory jedisConnectionFactory) {
            this.channel = str;
            this.connectionFactory = jedisConnectionFactory;
            init();
        }

        private void init() {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(this.connectionFactory);
            redisMessageListenerContainer.afterPropertiesSet();
            redisMessageListenerContainer.start();
            this.container = redisMessageListenerContainer;
        }

        @Override // com.xunyi.beast.hand.websocket.pipe.Pipe
        public void subscribe(final Observer<? super WSPushMessage> observer) {
            this.container.addMessageListener(new MessageListener() { // from class: com.xunyi.beast.hand.websocket.pipe.factory.RedisPipeFactory.RedisPipe.1
                public void onMessage(@NonNull Message message, byte[] bArr) {
                    try {
                        observer.onNext(RedisPipe.this.parse(message.getBody()));
                    } catch (IOException e) {
                        e.printStackTrace();
                        RedisPipeFactory.log.warn("parse message, exception", e);
                    }
                }
            }, new ChannelTopic(this.channel));
        }

        @Override // com.xunyi.beast.hand.websocket.pipe.Pipe
        public void next(WSEvent wSEvent) {
            this.connectionFactory.getConnection().publish(this.channel.getBytes(), "协议".getBytes());
        }
    }

    @Override // com.xunyi.beast.hand.websocket.pipe.factory.PipeFactory
    public Pipe apply(Config config) {
        String channel = config.getChannel();
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();
        redisStandaloneConfiguration.setDatabase(config.getDatabase());
        redisStandaloneConfiguration.setHostName(config.getHostname());
        redisStandaloneConfiguration.setPassword(RedisPassword.of(config.getPassword()));
        return new RedisPipe(channel, new JedisConnectionFactory(redisStandaloneConfiguration));
    }
}
