package org.apache.flink.connector.rocketmq.legacy;

import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.rocketmq.legacy.common.config.OffsetResetStrategy;
import org.apache.flink.connector.rocketmq.legacy.common.config.StartupMode;
import org.apache.flink.connector.rocketmq.legacy.common.serialization.KeyValueDeserializationSchema;
import org.apache.flink.connector.rocketmq.legacy.common.util.MetricUtils;
import org.apache.flink.connector.rocketmq.legacy.common.util.RetryUtil;
import org.apache.flink.connector.rocketmq.legacy.common.util.RocketMQUtils;
import org.apache.flink.connector.rocketmq.legacy.common.watermark.WaterMarkForAll;
import org.apache.flink.connector.rocketmq.legacy.common.watermark.WaterMarkPerQueue;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
import org.apache.flink.shaded.curator5.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.class */
public class RocketMQSourceFunction<OUT> extends RichParallelSourceFunction<OUT> implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMQSourceFunction.class);
    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    private RunningChecker runningChecker;
    private transient DefaultLitePullConsumer consumer;
    private KeyValueDeserializationSchema<OUT> schema;
    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
    private Map<MessageQueue, Long> offsetTable;
    private Map<MessageQueue, Long> restoredOffsets;
    private List<MessageQueue> messageQueues;
    private ExecutorService executor;
    private WaterMarkPerQueue waterMarkPerQueue;
    private WaterMarkForAll waterMarkForAll;
    private ScheduledExecutorService timer;
    private LinkedMap pendingOffsetsToCommit;
    private Properties props;
    private String topic;
    private String group;
    private volatile transient boolean restored;
    private transient boolean enableCheckpoint;
    private volatile Object checkPointLock;
    private Meter tpsMetric;
    private MetricUtils.TimestampGauge fetchDelay = new MetricUtils.TimestampGauge();
    private MetricUtils.TimestampGauge emitDelay = new MetricUtils.TimestampGauge();
    private StartupMode startMode = StartupMode.GROUP_OFFSETS;
    private OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.LATEST;
    private Map<MessageQueue, Long> specificStartupOffsets;
    private long specificTimeStamp;

    public RocketMQSourceFunction(KeyValueDeserializationSchema<OUT> keyValueDeserializationSchema, Properties properties) {
        this.schema = keyValueDeserializationSchema;
        this.props = properties;
    }

    public void open(Configuration configuration) throws Exception {
        log.debug("source open....");
        Validate.notEmpty(this.props, "Consumer properties can not be empty");
        this.topic = this.props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = this.props.getProperty(RocketMQConfig.CONSUMER_GROUP);
        Validate.notEmpty(this.topic, "Consumer topic can not be empty");
        Validate.notEmpty(this.group, "Consumer group can not be empty");
        Validate.isTrue((StringUtils.isNotEmpty(this.props.getProperty(RocketMQConfig.CONSUMER_TAG)) && StringUtils.isNotEmpty(this.props.getProperty(RocketMQConfig.CONSUMER_SQL))) ? false : true, "Consumer tag and sql can not set value at the same time");
        this.enableCheckpoint = getRuntimeContext().isCheckpointingEnabled();
        if (this.offsetTable == null) {
            this.offsetTable = new ConcurrentHashMap();
        }
        if (this.restoredOffsets == null) {
            this.restoredOffsets = new ConcurrentHashMap();
        }
        if (this.pendingOffsetsToCommit == null) {
            this.pendingOffsetsToCommit = new LinkedMap();
        }
        if (this.checkPointLock == null) {
            this.checkPointLock = new ReentrantLock();
        }
        if (this.waterMarkPerQueue == null) {
            this.waterMarkPerQueue = new WaterMarkPerQueue(5000L);
        }
        if (this.waterMarkForAll == null) {
            this.waterMarkForAll = new WaterMarkForAll(5000L);
        }
        if (this.timer == null) {
            this.timer = Executors.newSingleThreadScheduledExecutor();
        }
        this.runningChecker = new RunningChecker();
        this.runningChecker.setRunning(true);
        this.executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("rmq-pull-thread-%d").build());
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        this.consumer = new DefaultLitePullConsumer(this.group, RocketMQConfig.buildAclRPCHook(this.props));
        RocketMQConfig.buildConsumerConfigs(this.props, this.consumer);
        this.consumer.setInstanceName(RocketMQUtils.getInstanceName(ManagementFactory.getRuntimeMXBean().getName(), this.topic, this.group, String.valueOf(indexOfThisSubtask), String.valueOf(System.nanoTime())));
        this.consumer.start();
        this.tpsMetric = getRuntimeContext().getMetricGroup().meter(MetricUtils.METRICS_TPS, new MeterView(getRuntimeContext().getMetricGroup().counter("tps_counter", new SimpleCounter()), 60));
        getRuntimeContext().getMetricGroup().gauge(MetricUtils.CURRENT_FETCH_EVENT_TIME_LAG, this.fetchDelay);
        getRuntimeContext().getMetricGroup().gauge(MetricUtils.CURRENT_EMIT_EVENT_TIME_LAG, this.emitDelay);
        RuntimeContext runtimeContext = getRuntimeContext();
        int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
        log.info("Source run, NumberOfTotalTask={}, IndexOfThisSubTask={}", Integer.valueOf(numberOfParallelSubtasks), Integer.valueOf(runtimeContext.getIndexOfThisSubtask()));
        this.messageQueues = RocketMQUtils.allocate(this.consumer.fetchMessageQueues(this.topic), numberOfParallelSubtasks, runtimeContext.getIndexOfThisSubtask());
        if (this.restored) {
            initOffsetTableFromRestoredOffsets(this.messageQueues);
        } else {
            initOffsets(this.messageQueues);
        }
    }

    public void run(SourceFunction.SourceContext sourceContext) throws Exception {
        String property = this.props.getProperty(RocketMQConfig.CONSUMER_SQL);
        String property2 = this.props.getProperty(RocketMQConfig.CONSUMER_TAG, "*");
        int integer = RocketMQUtils.getInteger(this.props, RocketMQConfig.CONSUMER_BATCH_SIZE, 32);
        this.timer.scheduleAtFixedRate(() -> {
            sourceContext.emitWatermark(this.waterMarkForAll.getCurrentWatermark());
        }, 5L, 5L, TimeUnit.SECONDS);
        if (StringUtils.isEmpty(property)) {
            this.consumer.subscribe(this.topic, property2);
        } else {
            this.consumer.subscribe(this.topic, MessageSelector.bySql(property));
        }
        for (MessageQueue messageQueue : this.messageQueues) {
            this.executor.execute(() -> {
            });
        }
        awaitTermination();
    }

    private void awaitTermination() throws InterruptedException {
        while (this.runningChecker.isRunning()) {
            Thread.sleep(50L);
        }
    }

    private void initOffsets(List<MessageQueue> list) throws MQClientException {
        long longValue;
        for (MessageQueue messageQueue : list) {
            switch (this.startMode) {
                case LATEST:
                    this.consumer.seekToEnd(messageQueue);
                    longValue = this.consumer.committed(messageQueue).longValue();
                    break;
                case EARLIEST:
                    this.consumer.seekToBegin(messageQueue);
                    longValue = this.consumer.committed(messageQueue).longValue();
                    break;
                case GROUP_OFFSETS:
                    longValue = this.consumer.committed(messageQueue).longValue();
                    if (longValue <= 0) {
                        switch (this.offsetResetStrategy) {
                            case LATEST:
                                this.consumer.seekToEnd(messageQueue);
                                longValue = this.consumer.committed(messageQueue).longValue();
                                log.info("current consumer thread:{} has no committed offset,use Strategy:{} instead", messageQueue, this.offsetResetStrategy);
                                break;
                            case EARLIEST:
                                log.info("current consumer thread:{} has no committed offset,use Strategy:{} instead", messageQueue, this.offsetResetStrategy);
                                this.consumer.seekToBegin(messageQueue);
                                longValue = this.consumer.committed(messageQueue).longValue();
                                break;
                        }
                    } else {
                        break;
                    }
                case TIMESTAMP:
                    longValue = this.consumer.offsetForTimestamp(messageQueue, Long.valueOf(this.specificTimeStamp)).longValue();
                    break;
                case SPECIFIC_OFFSETS:
                    if (this.specificStartupOffsets == null) {
                        throw new RuntimeException("StartMode is specific_offsets.But none offsets has been specified");
                    }
                    Long l = this.specificStartupOffsets.get(messageQueue);
                    if (l != null) {
                        longValue = l.longValue();
                        break;
                    } else {
                        longValue = this.consumer.committed(messageQueue).longValue();
                        break;
                    }
                default:
                    throw new IllegalArgumentException("current startMode is not supported" + this.startMode);
            }
            log.info("current consumer queue:{} start from offset of: {}", messageQueue.getBrokerName() + HelpFormatter.DEFAULT_OPT_PREFIX + messageQueue.getQueueId(), Long.valueOf(longValue));
            this.offsetTable.put(messageQueue, Long.valueOf(longValue));
        }
    }

    public RocketMQSourceFunction<OUT> setStartFromEarliest() {
        this.startMode = StartupMode.EARLIEST;
        return this;
    }

    public RocketMQSourceFunction<OUT> setStartFromLatest() {
        this.startMode = StartupMode.LATEST;
        return this;
    }

    public RocketMQSourceFunction<OUT> setStartFromTimeStamp(long j) {
        this.startMode = StartupMode.TIMESTAMP;
        this.specificTimeStamp = j;
        return this;
    }

    public RocketMQSourceFunction<OUT> setStartFromGroupOffsets() {
        this.startMode = StartupMode.GROUP_OFFSETS;
        return this;
    }

    public RocketMQSourceFunction<OUT> setStartFromGroupOffsets(OffsetResetStrategy offsetResetStrategy) {
        this.startMode = StartupMode.GROUP_OFFSETS;
        this.offsetResetStrategy = offsetResetStrategy;
        return this;
    }

    public RocketMQSourceFunction<OUT> setStartFromSpecificOffsets(Map<MessageQueue, Long> map) {
        this.specificStartupOffsets = map;
        this.startMode = StartupMode.SPECIFIC_OFFSETS;
        return this;
    }

    private void updateMessageQueueOffset(MessageQueue messageQueue, long j) throws MQClientException {
        this.offsetTable.put(messageQueue, Long.valueOf(j));
        if (this.enableCheckpoint) {
            return;
        }
        this.consumer.getOffsetStore().updateOffset(messageQueue, j, false);
    }

    public void cancel() {
        log.debug("cancel ...");
        this.runningChecker.setRunning(false);
        if (this.timer != null) {
            this.timer.shutdown();
            this.timer = null;
        }
        if (this.executor != null) {
            this.executor.shutdown();
            this.executor = null;
        }
        if (this.consumer != null) {
            this.consumer.shutdown();
            this.consumer = null;
        }
        if (this.offsetTable != null) {
            this.offsetTable.clear();
            this.offsetTable = null;
        }
        if (this.restoredOffsets != null) {
            this.restoredOffsets.clear();
            this.restoredOffsets = null;
        }
        if (this.pendingOffsetsToCommit != null) {
            this.pendingOffsetsToCommit.clear();
            this.pendingOffsetsToCommit = null;
        }
    }

    public void close() throws Exception {
        log.debug("close ...");
        try {
            cancel();
        } finally {
            super.close();
        }
    }

    public void initOffsetTableFromRestoredOffsets(List<MessageQueue> list) {
        Preconditions.checkNotNull(this.restoredOffsets, "restoredOffsets can't be null");
        this.restoredOffsets.forEach((messageQueue, l) -> {
            if (list.contains(messageQueue)) {
                this.offsetTable.put(messageQueue, l);
            }
        });
        log.info("init offset table [{}] from restoredOffsets successful.", this.offsetTable);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        HashMap hashMap;
        log.info("Snapshotting state {} ...", Long.valueOf(functionSnapshotContext.getCheckpointId()));
        if (!this.runningChecker.isRunning()) {
            log.info("snapshotState() called on closed source; returning null.");
            return;
        }
        try {
            RetryUtil.call(() -> {
                Collection<MessageQueue> fetchMessageQueues = this.consumer.fetchMessageQueues(this.topic);
                int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                List<MessageQueue> allocate = RocketMQUtils.allocate(fetchMessageQueues, numberOfParallelSubtasks, indexOfThisSubtask);
                Collections.sort(allocate);
                log.debug(indexOfThisSubtask + " Topic route is same.");
                if (this.messageQueues.equals(allocate)) {
                    return true;
                }
                throw new RuntimeException();
            }, "RuntimeException due to topic route changed");
            this.unionOffsetStates.clear();
            hashMap = new HashMap(this.offsetTable.size());
        } catch (RuntimeException e) {
            log.warn("Retry failed multiple times for topic route change, keep previous offset.");
            ArrayList newArrayList = Lists.newArrayList(((Iterable) this.unionOffsetStates.get()).iterator());
            HashMap hashMap2 = new HashMap(newArrayList.size());
            newArrayList.forEach(tuple2 -> {
            });
            hashMap = new HashMap(newArrayList.size() + this.offsetTable.size());
            hashMap.putAll(hashMap2);
        }
        for (Map.Entry<MessageQueue, Long> entry : this.offsetTable.entrySet()) {
            this.unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
            hashMap.put(entry.getKey(), entry.getValue());
        }
        this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), hashMap);
        log.info("Snapshot state, last processed offsets: {}, checkpoint id: {}, timestamp: {}", this.offsetTable, Long.valueOf(functionSnapshotContext.getCheckpointId()), Long.valueOf(functionSnapshotContext.getCheckpointTimestamp()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        log.info("initialize State ...");
        this.unionOffsetStates = functionInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor(OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() { // from class: org.apache.flink.connector.rocketmq.legacy.RocketMQSourceFunction.1
        })));
        this.restored = functionInitializationContext.isRestored();
        if (!this.restored) {
            log.info("No restore state for the consumer.");
            return;
        }
        if (this.restoredOffsets == null) {
            this.restoredOffsets = new ConcurrentHashMap();
        }
        for (Tuple2 tuple2 : (Iterable) this.unionOffsetStates.get()) {
            if (!this.restoredOffsets.containsKey(tuple2.f0) || this.restoredOffsets.get(tuple2.f0).longValue() < ((Long) tuple2.f1).longValue()) {
                this.restoredOffsets.put(tuple2.f0, tuple2.f1);
            }
        }
        log.info("Setting restore state in the consumer. Using the following offsets: {}", this.restoredOffsets);
    }

    public TypeInformation<OUT> getProducedType() {
        return this.schema.getProducedType();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!this.runningChecker.isRunning()) {
            log.info("notifyCheckpointComplete() called on closed source; returning null.");
            return;
        }
        int indexOf = this.pendingOffsetsToCommit.indexOf(Long.valueOf(j));
        if (indexOf == -1) {
            log.warn("Received confirmation for unknown checkpoint id {}", Long.valueOf(j));
            return;
        }
        Map map = (Map) this.pendingOffsetsToCommit.remove(indexOf);
        for (int i = 0; i < indexOf; i++) {
            this.pendingOffsetsToCommit.remove(0);
        }
        if (map == null || map.size() == 0) {
            log.debug("Checkpoint state was empty.");
            return;
        }
        for (Map.Entry entry : map.entrySet()) {
            this.consumer.getOffsetStore().updateOffset((MessageQueue) entry.getKey(), ((Long) entry.getValue()).longValue(), false);
            this.consumer.getOffsetStore().persist(this.consumer.queueWithNamespace((MessageQueue) entry.getKey()));
        }
    }
}
