package org.apache.rocketmq.flink;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/rocketmq/flink/RocketMQSource.class */
public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);
    private transient MQPullConsumerScheduleService pullConsumerScheduleService;
    private DefaultMQPullConsumer consumer;
    private KeyValueDeserializationSchema<OUT> schema;
    private RunningChecker runningChecker;
    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
    private Map<MessageQueue, Long> offsetTable;
    private Map<MessageQueue, Long> restoredOffsets;
    private LinkedMap pendingOffsetsToCommit;
    private Properties props;
    private String topic;
    private String group;
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private volatile transient boolean restored;
    private transient boolean enableCheckpoint;

    /* renamed from: org.apache.rocketmq.flink.RocketMQSource$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/rocketmq/flink/RocketMQSource$3.class */
    static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus = new int[PullStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.NO_MATCHED_MSG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.NO_NEW_MSG.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$rocketmq$client$consumer$PullStatus[PullStatus.OFFSET_ILLEGAL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public RocketMQSource(KeyValueDeserializationSchema<OUT> keyValueDeserializationSchema, Properties properties) {
        this.schema = keyValueDeserializationSchema;
        this.props = properties;
    }

    public void open(Configuration configuration) throws Exception {
        LOG.debug("source open....");
        Validate.notEmpty(this.props, "Consumer properties can not be empty");
        Validate.notNull(this.schema, "KeyValueDeserializationSchema can not be null");
        this.topic = this.props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = this.props.getProperty(RocketMQConfig.CONSUMER_GROUP);
        Validate.notEmpty(this.topic, "Consumer topic can not be empty");
        Validate.notEmpty(this.group, "Consumer group can not be empty");
        this.enableCheckpoint = getRuntimeContext().isCheckpointingEnabled();
        if (this.offsetTable == null) {
            this.offsetTable = new ConcurrentHashMap();
        }
        if (this.restoredOffsets == null) {
            this.restoredOffsets = new ConcurrentHashMap();
        }
        if (this.pendingOffsetsToCommit == null) {
            this.pendingOffsetsToCommit = new LinkedMap();
        }
        this.runningChecker = new RunningChecker();
        this.pullConsumerScheduleService = new MQPullConsumerScheduleService(this.group, RocketMQConfig.buildAclRPCHook(this.props));
        this.consumer = this.pullConsumerScheduleService.getDefaultMQPullConsumer();
        this.consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
        RocketMQConfig.buildConsumerConfigs(this.props, this.consumer);
    }

    public void run(final SourceFunction.SourceContext sourceContext) throws Exception {
        LOG.debug("source run....");
        final Object checkpointLock = sourceContext.getCheckpointLock();
        final int integer = RocketMQUtils.getInteger(this.props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND, 10);
        final String property = this.props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
        int integer2 = RocketMQUtils.getInteger(this.props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE, 20);
        final int integer3 = RocketMQUtils.getInteger(this.props, RocketMQConfig.CONSUMER_BATCH_SIZE, 32);
        this.pullConsumerScheduleService.setPullThreadNums(integer2);
        this.pullConsumerScheduleService.registerPullTaskCallback(this.topic, new PullTaskCallback() { // from class: org.apache.rocketmq.flink.RocketMQSource.1
            /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
            /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0035. Please report as an issue. */
            /* JADX WARN: Removed duplicated region for block: B:35:0x010a A[EXC_TOP_SPLITTER, SYNTHETIC] */
            /*
                Code decompiled incorrectly, please refer to instructions dump.
                To view partially-correct add '--show-bad-code' argument
            */
            public void doPullTask(org.apache.rocketmq.common.message.MessageQueue r8, org.apache.rocketmq.client.consumer.PullTaskContext r9) {
                /*
                    Method dump skipped, instructions count: 328
                    To view this dump add '--comments-level debug' option
                */
                throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.flink.RocketMQSource.AnonymousClass1.doPullTask(org.apache.rocketmq.common.message.MessageQueue, org.apache.rocketmq.client.consumer.PullTaskContext):void");
            }
        });
        try {
            this.pullConsumerScheduleService.start();
            this.runningChecker.setRunning(true);
            awaitTermination();
        } catch (MQClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private void awaitTermination() throws InterruptedException {
        while (this.runningChecker.isRunning()) {
            Thread.sleep(50L);
        }
    }

    private long getMessageQueueOffset(MessageQueue messageQueue) throws MQClientException {
        Long l = this.offsetTable.get(messageQueue);
        if (this.restored && l == null) {
            l = this.restoredOffsets.get(messageQueue);
        }
        if (l == null) {
            l = Long.valueOf(this.consumer.fetchConsumeOffset(messageQueue, false));
            if (l.longValue() < 0) {
                String property = this.props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, RocketMQConfig.CONSUMER_OFFSET_LATEST);
                boolean z = -1;
                switch (property.hashCode()) {
                    case -1109880953:
                        if (property.equals(RocketMQConfig.CONSUMER_OFFSET_LATEST)) {
                            z = true;
                            break;
                        }
                        break;
                    case -809579181:
                        if (property.equals(RocketMQConfig.CONSUMER_OFFSET_EARLIEST)) {
                            z = false;
                            break;
                        }
                        break;
                    case 55126294:
                        if (property.equals(RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP)) {
                            z = 2;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case RocketMQConfig.MSG_DELAY_LEVEL00 /* 0 */:
                        l = Long.valueOf(this.consumer.minOffset(messageQueue));
                        break;
                    case RocketMQConfig.MSG_DELAY_LEVEL01 /* 1 */:
                        l = Long.valueOf(this.consumer.maxOffset(messageQueue));
                        break;
                    case RocketMQConfig.MSG_DELAY_LEVEL02 /* 2 */:
                        l = Long.valueOf(this.consumer.searchOffset(messageQueue, RocketMQUtils.getLong(this.props, RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis())));
                        break;
                    default:
                        throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
                }
            }
        }
        this.offsetTable.put(messageQueue, l);
        return this.offsetTable.get(messageQueue).longValue();
    }

    private void putMessageQueueOffset(MessageQueue messageQueue, long j) throws MQClientException {
        this.offsetTable.put(messageQueue, Long.valueOf(j));
        if (this.enableCheckpoint) {
            return;
        }
        this.consumer.updateConsumeOffset(messageQueue, j);
    }

    public void cancel() {
        LOG.debug("cancel ...");
        this.runningChecker.setRunning(false);
        if (this.pullConsumerScheduleService != null) {
            this.pullConsumerScheduleService.shutdown();
        }
        if (this.offsetTable != null) {
            this.offsetTable.clear();
        }
        if (this.restoredOffsets != null) {
            this.restoredOffsets.clear();
        }
        if (this.pendingOffsetsToCommit != null) {
            this.pendingOffsetsToCommit.clear();
        }
    }

    public void close() throws Exception {
        LOG.debug("close ...");
        try {
            cancel();
        } finally {
            super.close();
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.runningChecker.isRunning()) {
            LOG.debug("snapshotState() called on closed source; returning null.");
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state {} ...", Long.valueOf(functionSnapshotContext.getCheckpointId()));
        }
        this.unionOffsetStates.clear();
        HashMap hashMap = new HashMap(this.offsetTable.size());
        Set fetchMessageQueuesInBalance = this.consumer.fetchMessageQueuesInBalance(this.topic);
        this.offsetTable.entrySet().removeIf(entry -> {
            return !fetchMessageQueuesInBalance.contains(entry.getKey());
        });
        for (Map.Entry<MessageQueue, Long> entry2 : this.offsetTable.entrySet()) {
            this.unionOffsetStates.add(Tuple2.of(entry2.getKey(), entry2.getValue()));
            hashMap.put(entry2.getKey(), entry2.getValue());
        }
        this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), hashMap);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", new Object[]{this.offsetTable, Long.valueOf(functionSnapshotContext.getCheckpointId()), Long.valueOf(functionSnapshotContext.getCheckpointTimestamp())});
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        LOG.debug("initialize State ...");
        this.unionOffsetStates = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { // from class: org.apache.rocketmq.flink.RocketMQSource.2
        })));
        this.restored = functionInitializationContext.isRestored();
        if (!this.restored) {
            LOG.info("No restore state for the consumer.");
            return;
        }
        if (this.restoredOffsets == null) {
            this.restoredOffsets = new ConcurrentHashMap();
        }
        for (Tuple2 tuple2 : (Iterable) this.unionOffsetStates.get()) {
            if (!this.restoredOffsets.containsKey(tuple2.f0) || this.restoredOffsets.get(tuple2.f0).longValue() < ((Long) tuple2.f1).longValue()) {
                this.restoredOffsets.put((MessageQueue) tuple2.f0, (Long) tuple2.f1);
            }
        }
        LOG.info("Setting restore state in the consumer. Using the following offsets: {}", this.restoredOffsets);
    }

    public TypeInformation<OUT> getProducedType() {
        return this.schema.getProducedType();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!this.runningChecker.isRunning()) {
            LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
            return;
        }
        int indexOf = this.pendingOffsetsToCommit.indexOf(Long.valueOf(j));
        if (indexOf == -1) {
            LOG.warn("Received confirmation for unknown checkpoint id {}", Long.valueOf(j));
            return;
        }
        Map map = (Map) this.pendingOffsetsToCommit.remove(indexOf);
        for (int i = 0; i < indexOf; i++) {
            this.pendingOffsetsToCommit.remove(0);
        }
        if (map == null || map.size() == 0) {
            LOG.debug("Checkpoint state was empty.");
            return;
        }
        for (Map.Entry entry : map.entrySet()) {
            this.consumer.updateConsumeOffset((MessageQueue) entry.getKey(), ((Long) entry.getValue()).longValue());
        }
    }
}
