package org.apache.activemq.broker.region;

import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.AsyncTaskExecutor;

/* loaded from: input_file:WEB-INF/lib/activemq-core-4.1.1.jar:org/apache/activemq/broker/region/Queue.class */
public class Queue implements Destination {
    private final Log log;
    protected final ActiveMQDestination destination;
    protected final UsageManager usageManager;
    private LockOwner exclusiveOwner;
    private MessageGroupMap messageGroupOwners;
    protected final MessageStore store;
    protected int highestSubscriptionPriority;
    static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue());
    protected final List consumers = new CopyOnWriteArrayList();
    protected final Valve dispatchValve = new Valve(true);
    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
    protected PendingMessageCursor messages = new VMPendingMessageCursor();
    protected long garbageSize = 0;
    protected long garbageSizeBeforeCollection = 1000;
    private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
    private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
    private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();

    /* renamed from: org.apache.activemq.broker.region.Queue$2, reason: invalid class name */
    /* loaded from: input_file:WEB-INF/lib/activemq-core-4.1.1.jar:org/apache/activemq/broker/region/Queue$2.class */
    class AnonymousClass2 implements Runnable {
        private final ConnectionContext val$context;
        private final Message val$message;
        private final Queue this$0;

        AnonymousClass2(Queue queue, ConnectionContext connectionContext, Message message) {
            this.this$0 = queue;
            this.val$context = connectionContext;
            this.val$message = message;
        }

        @Override // java.lang.Runnable
        public void run() {
            Queue.threadPool.execute(new Runnable(this) { // from class: org.apache.activemq.broker.region.Queue.2.1
                private final AnonymousClass2 this$1;

                {
                    this.this$1 = this;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        this.this$1.this$0.sendMessage(this.this$1.val$context, this.this$1.val$message);
                        Response response = new Response();
                        response.setCorrelationId(this.this$1.val$message.getCommandId());
                        this.this$1.val$context.getConnection().dispatchAsync(response);
                    } catch (Exception e) {
                        ExceptionResponse exceptionResponse = new ExceptionResponse(e);
                        exceptionResponse.setCorrelationId(this.this$1.val$message.getCommandId());
                        this.this$1.val$context.getConnection().dispatchAsync(exceptionResponse);
                    }
                }
            });
        }
    }

    public Queue(ActiveMQDestination activeMQDestination, UsageManager usageManager, MessageStore messageStore, DestinationStatistics destinationStatistics, TaskRunnerFactory taskRunnerFactory) throws Exception {
        this.destination = activeMQDestination;
        this.usageManager = new UsageManager(usageManager);
        this.usageManager.setLimit(AsyncTaskExecutor.TIMEOUT_INDEFINITE);
        this.store = messageStore;
        if (messageStore != null) {
            messageStore.setUsageManager(this.usageManager);
        }
        this.destinationStatistics.setParent(destinationStatistics);
        this.log = LogFactory.getLog(new StringBuffer().append(getClass().getName()).append(".").append(activeMQDestination.getPhysicalName()).toString());
    }

    public void initialize() throws Exception {
        if (this.store != null) {
            this.store.recover(new MessageRecoveryListener(this) { // from class: org.apache.activemq.broker.region.Queue.1
                private final Queue this$0;

                {
                    this.this$0 = this;
                }

                @Override // org.apache.activemq.store.MessageRecoveryListener
                public void recoverMessage(Message message) {
                    message.setRegionDestination(this.this$0);
                    MessageReference createMessageReference = this.this$0.createMessageReference(message);
                    synchronized (this.this$0.messages) {
                        try {
                            this.this$0.messages.addMessageLast(createMessageReference);
                        } catch (Exception e) {
                            this.this$0.log.fatal("Failed to add message to cursor", e);
                        }
                    }
                    createMessageReference.decrementReferenceCount();
                    this.this$0.destinationStatistics.getMessages().increment();
                }

                @Override // org.apache.activemq.store.MessageRecoveryListener
                public void recoverMessageReference(String str) throws Exception {
                    throw new RuntimeException("Should not be called.");
                }

                @Override // org.apache.activemq.store.MessageRecoveryListener
                public void finished() {
                }
            });
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public synchronized boolean lock(MessageReference messageReference, LockOwner lockOwner) {
        if (this.exclusiveOwner == lockOwner) {
            return true;
        }
        if (this.exclusiveOwner != null || lockOwner.getLockPriority() < this.highestSubscriptionPriority) {
            return false;
        }
        if (!lockOwner.isLockExclusive()) {
            return true;
        }
        this.exclusiveOwner = lockOwner;
        return true;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void addSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        subscription.add(connectionContext, this);
        this.destinationStatistics.getConsumers().increment();
        this.dispatchValve.turnOff();
        MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
        try {
            synchronized (this.consumers) {
                this.consumers.add(subscription);
            }
            this.highestSubscriptionPriority = calcHighestSubscriptionPriority();
            messageEvaluationContext.setDestination(this.destination);
            synchronized (this.messages) {
                this.messages.reset();
                while (this.messages.hasNext()) {
                    QueueMessageReference queueMessageReference = (QueueMessageReference) this.messages.next();
                    if (!queueMessageReference.isDropped()) {
                        try {
                            messageEvaluationContext.setMessageReference(queueMessageReference);
                            if (subscription.matches(queueMessageReference, messageEvaluationContext)) {
                                subscription.add(queueMessageReference);
                            }
                        } catch (IOException e) {
                            this.log.warn(new StringBuffer().append("Could not load message: ").append(e).toString(), e);
                        }
                    }
                }
            }
        } finally {
            messageEvaluationContext.clear();
            this.dispatchValve.turnOn();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.activemq.broker.region.Destination
    public void removeSubscription(ConnectionContext connectionContext, Subscription subscription) throws Exception {
        this.destinationStatistics.getConsumers().decrement();
        this.dispatchValve.turnOff();
        try {
            synchronized (this.consumers) {
                this.consumers.remove(subscription);
            }
            subscription.remove(connectionContext, this);
            this.highestSubscriptionPriority = calcHighestSubscriptionPriority();
            boolean z = false;
            if (this.exclusiveOwner == subscription) {
                this.exclusiveOwner = null;
                z = true;
            }
            MessageGroupSet removeConsumer = getMessageGroupOwners().removeConsumer(subscription.getConsumerInfo().getConsumerId());
            if (!subscription.getConsumerInfo().isBrowser()) {
                MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
                try {
                    messageEvaluationContext.setDestination(this.destination);
                    ArrayList<QueueMessageReference> arrayList = new ArrayList();
                    synchronized (this.messages) {
                        this.messages.reset();
                        while (this.messages.hasNext()) {
                            QueueMessageReference queueMessageReference = (QueueMessageReference) this.messages.next();
                            if (!queueMessageReference.isDropped()) {
                                String groupID = queueMessageReference.getGroupID();
                                if (queueMessageReference.getLockOwner() == subscription || z || (groupID != null && removeConsumer.contains(groupID))) {
                                    arrayList.add(queueMessageReference);
                                }
                            }
                        }
                    }
                    for (QueueMessageReference queueMessageReference2 : arrayList) {
                        queueMessageReference2.incrementRedeliveryCounter();
                        queueMessageReference2.unlock();
                        messageEvaluationContext.setMessageReference(queueMessageReference2);
                        this.dispatchPolicy.dispatch(connectionContext, queueMessageReference2, messageEvaluationContext, this.consumers);
                    }
                    messageEvaluationContext.clear();
                } catch (Throwable th) {
                    messageEvaluationContext.clear();
                    throw th;
                }
            }
        } finally {
            this.dispatchValve.turnOn();
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void send(ConnectionContext connectionContext, Message message) throws Exception {
        if (connectionContext.isProducerFlowControl()) {
            if (!message.isResponseRequired() && !connectionContext.isNetworkConnection()) {
                if (this.usageManager.isSendFailIfNoSpace()) {
                    throw new ResourceAllocationException("Usage Manager memory limit reached");
                }
                while (!this.usageManager.waitForSpace(1000L)) {
                    if (connectionContext.getStopping().get()) {
                        throw new IOException("Connection closed, send aborted.");
                    }
                }
            } else if (this.usageManager.isFull()) {
                if (this.usageManager.notifyCallbackWhenNotFull(new AnonymousClass2(this, connectionContext, message))) {
                    connectionContext.setDontSendReponse(true);
                    return;
                }
            }
        }
        sendMessage(connectionContext, message);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMessage(ConnectionContext connectionContext, Message message) throws IOException, Exception {
        message.setRegionDestination(this);
        if (this.store != null && message.isPersistent()) {
            this.store.addMessage(connectionContext, message);
        }
        MessageReference createMessageReference = createMessageReference(message);
        try {
            if (connectionContext.isInTransaction()) {
                connectionContext.getTransaction().addSynchronization(new Synchronization(this, connectionContext, createMessageReference, message) { // from class: org.apache.activemq.broker.region.Queue.3
                    private final ConnectionContext val$context;
                    private final MessageReference val$node;
                    private final Message val$message;
                    private final Queue this$0;

                    {
                        this.this$0 = this;
                        this.val$context = connectionContext;
                        this.val$node = createMessageReference;
                        this.val$message = message;
                    }

                    @Override // org.apache.activemq.transaction.Synchronization
                    public void afterCommit() throws Exception {
                        this.this$0.dispatch(this.val$context, this.val$node, this.val$message);
                    }
                });
            } else {
                dispatch(connectionContext, createMessageReference, message);
            }
        } finally {
            createMessageReference.decrementReferenceCount();
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void dispose(ConnectionContext connectionContext) throws IOException {
        if (this.store != null) {
            this.store.removeAllMessages(connectionContext);
        }
        this.destinationStatistics.setParent(null);
    }

    public void dropEvent() {
        dropEvent(false);
    }

    public void dropEvent(boolean z) {
        this.destinationStatistics.getMessages().decrement();
        synchronized (this.messages) {
            this.garbageSize++;
            if (!z && this.garbageSize > this.garbageSizeBeforeCollection) {
                gc();
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void gc() {
        synchronized (this.messages) {
            this.messages.resetForGC();
            while (this.messages.hasNext()) {
                if (((QueueMessageReference) this.messages.next()).isDropped()) {
                    this.garbageSize--;
                    this.messages.remove();
                }
            }
        }
    }

    @Override // org.apache.activemq.broker.region.Destination
    public void acknowledge(ConnectionContext connectionContext, Subscription subscription, MessageAck messageAck, MessageReference messageReference) throws IOException {
        if (this.store == null || !messageReference.isPersistent()) {
            return;
        }
        if (messageAck.getMessageCount() > 0) {
            MessageAck messageAck2 = new MessageAck();
            messageAck.copy(messageAck2);
            messageAck = messageAck2;
            messageAck.setFirstMessageId(messageReference.getMessageId());
            messageAck.setLastMessageId(messageReference.getMessageId());
            messageAck.setMessageCount(1);
        }
        this.store.removeMessage(connectionContext, messageAck);
    }

    Message loadMessage(MessageId messageId) throws IOException {
        Message message = this.store.getMessage(messageId);
        if (message != null) {
            message.setRegionDestination(this);
        }
        return message;
    }

    public String toString() {
        int size;
        synchronized (this.messages) {
            size = this.messages.size();
        }
        return new StringBuffer().append("Queue: destination=").append(this.destination.getPhysicalName()).append(", subscriptions=").append(this.consumers.size()).append(", memory=").append(this.usageManager.getPercentUsage()).append("%, size=").append(size).append(", in flight groups=").append(this.messageGroupOwners).toString();
    }

    @Override // org.apache.activemq.Service
    public void start() throws Exception {
    }

    @Override // org.apache.activemq.Service
    public void stop() throws Exception {
    }

    @Override // org.apache.activemq.broker.region.Destination
    public ActiveMQDestination getActiveMQDestination() {
        return this.destination;
    }

    public String getDestination() {
        return this.destination.getPhysicalName();
    }

    @Override // org.apache.activemq.broker.region.Destination
    public UsageManager getUsageManager() {
        return this.usageManager;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public DestinationStatistics getDestinationStatistics() {
        return this.destinationStatistics;
    }

    public MessageGroupMap getMessageGroupOwners() {
        if (this.messageGroupOwners == null) {
            this.messageGroupOwners = getMessageGroupMapFactory().createMessageGroupMap();
        }
        return this.messageGroupOwners;
    }

    public DispatchPolicy getDispatchPolicy() {
        return this.dispatchPolicy;
    }

    public void setDispatchPolicy(DispatchPolicy dispatchPolicy) {
        this.dispatchPolicy = dispatchPolicy;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public DeadLetterStrategy getDeadLetterStrategy() {
        return this.deadLetterStrategy;
    }

    public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
        this.deadLetterStrategy = deadLetterStrategy;
    }

    public MessageGroupMapFactory getMessageGroupMapFactory() {
        return this.messageGroupMapFactory;
    }

    public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
        this.messageGroupMapFactory = messageGroupMapFactory;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public String getName() {
        return getActiveMQDestination().getPhysicalName();
    }

    public PendingMessageCursor getMessages() {
        return this.messages;
    }

    public void setMessages(PendingMessageCursor pendingMessageCursor) {
        this.messages = pendingMessageCursor;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageReference createMessageReference(Message message) {
        return new IndirectMessageReference(this, this.store, message);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void dispatch(ConnectionContext connectionContext, MessageReference messageReference, Message message) throws Exception {
        this.dispatchValve.increment();
        MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
        try {
            this.destinationStatistics.getEnqueues().increment();
            this.destinationStatistics.getMessages().increment();
            synchronized (this.messages) {
                this.messages.addMessageLast(messageReference);
            }
            synchronized (this.consumers) {
                if (this.consumers.isEmpty()) {
                    this.log.debug("No subscriptions registered, will not dispatch message at this time.");
                    return;
                }
                messageEvaluationContext.setDestination(this.destination);
                messageEvaluationContext.setMessageReference(messageReference);
                this.dispatchPolicy.dispatch(connectionContext, messageReference, messageEvaluationContext, this.consumers);
                messageEvaluationContext.clear();
                this.dispatchValve.decrement();
            }
        } finally {
            messageEvaluationContext.clear();
            this.dispatchValve.decrement();
        }
    }

    private int calcHighestSubscriptionPriority() {
        byte b = -2147483648;
        synchronized (this.consumers) {
            for (Subscription subscription : this.consumers) {
                if (subscription.getConsumerInfo().getPriority() > b) {
                    b = subscription.getConsumerInfo().getPriority();
                }
            }
        }
        return b;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageStore getMessageStore() {
        return this.store;
    }

    @Override // org.apache.activemq.broker.region.Destination
    public Message[] browse() {
        ArrayList arrayList = new ArrayList();
        synchronized (this.messages) {
            this.messages.reset();
            while (this.messages.hasNext()) {
                try {
                    MessageReference next = this.messages.next();
                    next.incrementReferenceCount();
                    try {
                        Message message = next.getMessage();
                        if (message != null) {
                            arrayList.add(message);
                        }
                        next.decrementReferenceCount();
                    } catch (Throwable th) {
                        next.decrementReferenceCount();
                        throw th;
                        break;
                    }
                } catch (IOException e) {
                }
            }
        }
        return (Message[]) arrayList.toArray(new Message[arrayList.size()]);
    }

    public Message getMessage(String str) {
        MessageReference next;
        synchronized (this.messages) {
            this.messages.reset();
            while (true) {
                if (!this.messages.hasNext()) {
                    break;
                }
                try {
                    next = this.messages.next();
                } catch (IOException e) {
                }
                if (str.equals(next.getMessageId().toString())) {
                    next.incrementReferenceCount();
                    try {
                        Message message = next.getMessage();
                        if (message != null) {
                            return message;
                        }
                        next.decrementReferenceCount();
                    } finally {
                        next.decrementReferenceCount();
                    }
                }
            }
            return null;
        }
    }

    public void purge() {
        synchronized (this.messages) {
            ConnectionContext createConnectionContext = createConnectionContext();
            this.messages.reset();
            while (this.messages.hasNext()) {
                try {
                    QueueMessageReference queueMessageReference = (QueueMessageReference) this.messages.next();
                    if (queueMessageReference.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) {
                        MessageAck messageAck = new MessageAck();
                        messageAck.setAckType((byte) 2);
                        messageAck.setDestination(this.destination);
                        messageAck.setMessageID(queueMessageReference.getMessageId());
                        acknowledge(createConnectionContext, null, messageAck, queueMessageReference);
                        queueMessageReference.drop();
                        dropEvent(true);
                    }
                } catch (IOException e) {
                }
            }
            gc();
        }
    }

    public boolean removeMessage(String str) throws Exception {
        return removeMatchingMessages(createMessageIdFilter(str), 1) > 0;
    }

    public int removeMatchingMessages(String str) throws Exception {
        return removeMatchingMessages(str, -1);
    }

    public int removeMatchingMessages(String str, int i) throws Exception {
        return removeMatchingMessages(createSelectorFilter(str), i);
    }

    public int removeMatchingMessages(MessageReferenceFilter messageReferenceFilter, int i) throws Exception {
        int i2 = 0;
        synchronized (this.messages) {
            ConnectionContext createConnectionContext = createConnectionContext();
            this.messages.reset();
            while (this.messages.hasNext()) {
                IndirectMessageReference indirectMessageReference = (IndirectMessageReference) this.messages.next();
                if (messageReferenceFilter.evaluate(createConnectionContext, indirectMessageReference) && lockMessage(indirectMessageReference)) {
                    removeMessage(createConnectionContext, indirectMessageReference);
                    i2++;
                    if (i2 >= i && i > 0) {
                        break;
                    }
                }
            }
        }
        return i2;
    }

    public boolean copyMessageTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return copyMatchingMessages(connectionContext, createMessageIdFilter(str), activeMQDestination, 1) > 0;
    }

    public int copyMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return copyMatchingMessagesTo(connectionContext, str, activeMQDestination, -1);
    }

    public int copyMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination, int i) throws Exception {
        return copyMatchingMessages(connectionContext, createSelectorFilter(str), activeMQDestination, i);
    }

    /* JADX WARN: Finally extract failed */
    public int copyMatchingMessages(ConnectionContext connectionContext, MessageReferenceFilter messageReferenceFilter, ActiveMQDestination activeMQDestination, int i) throws Exception {
        int i2 = 0;
        synchronized (this.messages) {
            this.messages.reset();
            while (true) {
                if (!this.messages.hasNext()) {
                    break;
                }
                MessageReference next = this.messages.next();
                if (messageReferenceFilter.evaluate(connectionContext, next)) {
                    next.incrementReferenceCount();
                    try {
                        BrokerSupport.resend(connectionContext, next.getMessage(), activeMQDestination);
                        i2++;
                        if (i2 >= i && i > 0) {
                            next.decrementReferenceCount();
                            break;
                        }
                        next.decrementReferenceCount();
                    } catch (Throwable th) {
                        next.decrementReferenceCount();
                        throw th;
                    }
                }
            }
        }
        return i2;
    }

    public boolean moveMessageTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return moveMatchingMessagesTo(connectionContext, createMessageIdFilter(str), activeMQDestination, 1) > 0;
    }

    public int moveMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination) throws Exception {
        return moveMatchingMessagesTo(connectionContext, str, activeMQDestination, -1);
    }

    public int moveMatchingMessagesTo(ConnectionContext connectionContext, String str, ActiveMQDestination activeMQDestination, int i) throws Exception {
        return moveMatchingMessagesTo(connectionContext, createSelectorFilter(str), activeMQDestination, i);
    }

    /* JADX WARN: Finally extract failed */
    public int moveMatchingMessagesTo(ConnectionContext connectionContext, MessageReferenceFilter messageReferenceFilter, ActiveMQDestination activeMQDestination, int i) throws Exception {
        int i2 = 0;
        synchronized (this.messages) {
            this.messages.reset();
            while (true) {
                if (!this.messages.hasNext()) {
                    break;
                }
                IndirectMessageReference indirectMessageReference = (IndirectMessageReference) this.messages.next();
                if (messageReferenceFilter.evaluate(connectionContext, indirectMessageReference) && lockMessage(indirectMessageReference)) {
                    indirectMessageReference.incrementReferenceCount();
                    try {
                        BrokerSupport.resend(connectionContext, indirectMessageReference.getMessage(), activeMQDestination);
                        removeMessage(connectionContext, indirectMessageReference);
                        i2++;
                        if (i2 >= i && i > 0) {
                            indirectMessageReference.decrementReferenceCount();
                            break;
                        }
                        indirectMessageReference.decrementReferenceCount();
                    } catch (Throwable th) {
                        indirectMessageReference.decrementReferenceCount();
                        throw th;
                    }
                }
            }
        }
        return i2;
    }

    protected MessageReferenceFilter createMessageIdFilter(String str) {
        return new MessageReferenceFilter(this, str) { // from class: org.apache.activemq.broker.region.Queue.4
            private final String val$messageId;
            private final Queue this$0;

            {
                this.this$0 = this;
                this.val$messageId = str;
            }

            @Override // org.apache.activemq.broker.region.MessageReferenceFilter
            public boolean evaluate(ConnectionContext connectionContext, MessageReference messageReference) {
                return this.val$messageId.equals(messageReference.getMessageId().toString());
            }
        };
    }

    protected MessageReferenceFilter createSelectorFilter(String str) throws InvalidSelectorException {
        return new MessageReferenceFilter(this, new SelectorParser().parse(str)) { // from class: org.apache.activemq.broker.region.Queue.5
            private final BooleanExpression val$selectorExpression;
            private final Queue this$0;

            {
                this.this$0 = this;
                this.val$selectorExpression = r5;
            }

            @Override // org.apache.activemq.broker.region.MessageReferenceFilter
            public boolean evaluate(ConnectionContext connectionContext, MessageReference messageReference) throws JMSException {
                MessageEvaluationContext messageEvaluationContext = connectionContext.getMessageEvaluationContext();
                messageEvaluationContext.setMessageReference(messageReference);
                if (messageEvaluationContext.getDestination() == null) {
                    messageEvaluationContext.setDestination(this.this$0.getActiveMQDestination());
                }
                return this.val$selectorExpression.matches(messageEvaluationContext);
            }
        };
    }

    protected void removeMessage(ConnectionContext connectionContext, IndirectMessageReference indirectMessageReference) throws IOException {
        MessageAck messageAck = new MessageAck();
        messageAck.setAckType((byte) 2);
        messageAck.setDestination(this.destination);
        messageAck.setMessageID(indirectMessageReference.getMessageId());
        acknowledge(connectionContext, null, messageAck, indirectMessageReference);
        indirectMessageReference.drop();
        dropEvent();
    }

    protected boolean lockMessage(IndirectMessageReference indirectMessageReference) {
        return indirectMessageReference.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER);
    }

    protected ConnectionContext createConnectionContext() {
        ConnectionContext connectionContext = new ConnectionContext();
        connectionContext.getMessageEvaluationContext().setDestination(getActiveMQDestination());
        return connectionContext;
    }
}
