package com.xunyi.micro.propagation.instrument.reactor;

import com.xunyi.micro.propagation.context.CurrentContext;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* loaded from: input_file:com/xunyi/micro/propagation/instrument/reactor/Reactors.class */
public class Reactors {
    private static final Logger log = LoggerFactory.getLogger(Reactors.class);
    private static Map<BeanFactory, CurrentContext> CACHE = new ConcurrentHashMap();

    public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> scopePassingSpanOperator(BeanFactory beanFactory) {
        BooleanSupplier booleanSupplier;
        if (beanFactory instanceof ConfigurableApplicationContext) {
            ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) beanFactory;
            Objects.requireNonNull(configurableApplicationContext);
            booleanSupplier = configurableApplicationContext::isActive;
        } else {
            booleanSupplier = () -> {
                return true;
            };
        }
        BooleanSupplier booleanSupplier2 = booleanSupplier;
        return Operators.liftPublisher((publisher, coreSubscriber) -> {
            if (publisher instanceof Fuseable.ScalarCallable) {
                return coreSubscriber;
            }
            Scannable from = Scannable.from(publisher);
            if (booleanSupplier2.getAsBoolean()) {
                if (log.isTraceEnabled()) {
                    log.trace("Spring Context [{}}] already refreshed. Creating a scope passing propagation subscriber with Reactor Context [{}] and name [{}]", new Object[]{beanFactory, coreSubscriber.currentContext(), from.name()});
                }
                return scopePassingSpanSubscription(beanFactory, coreSubscriber);
            }
            if (log.isTraceEnabled()) {
                log.trace("Spring Context [" + beanFactory + "] is not yet refreshed, falling back to lazy span subscriber. Reactor Context is [" + coreSubscriber.currentContext() + "] and name is [" + from.name() + "]");
            }
            return new LazyPassingSubscriber(lazyScopePassingSubscription(beanFactory, from, coreSubscriber));
        });
    }

    static <T> PassingSubscriptionProvider<T> lazyScopePassingSubscription(BeanFactory beanFactory, Scannable scannable, CoreSubscriber<? super T> coreSubscriber) {
        return new PassingSubscriptionProvider<>(beanFactory, coreSubscriber, coreSubscriber.currentContext());
    }

    static <T> CoreSubscriber<? super T> scopePassingSpanSubscription(BeanFactory beanFactory, CoreSubscriber<? super T> coreSubscriber) {
        CurrentContext computeIfAbsent = CACHE.computeIfAbsent(beanFactory, beanFactory2 -> {
            return (CurrentContext) beanFactory2.getBean(CurrentContext.class);
        });
        Context currentContext = coreSubscriber.currentContext();
        com.xunyi.micro.propagation.context.Context context = currentContext.hasKey(com.xunyi.micro.propagation.context.Context.class) ? (com.xunyi.micro.propagation.context.Context) currentContext.get(com.xunyi.micro.propagation.context.Context.class) : computeIfAbsent.get();
        return context != null ? new ScopePassingSubscriber(coreSubscriber, currentContext, computeIfAbsent, context) : coreSubscriber;
    }
}
