package org.apache.flink.connector.rocketmq.source.reader;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.rocketmq.common.config.RocketMQOptions;
import org.apache.flink.connector.rocketmq.source.InnerConsumer;
import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
import org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics;
import org.apache.flink.connector.rocketmq.source.reader.deserializer.RocketMQDeserializationSchema;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.class */
public class RocketMQSplitReader<T> implements SplitReader<MessageView, RocketMQSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RocketMQSplitReader.class);
    private final InnerConsumer consumer;
    private final Configuration configuration;
    private final SourceReaderContext sourceReaderContext;
    private final RocketMQDeserializationSchema<T> deserializationSchema;
    private final boolean commitOffsetsOnCheckpoint;
    private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
    private volatile boolean wakeup = false;
    private final SortedMap<Long, Map<MessageQueue, Long>> offsetsToCommit = new TreeMap();
    private final ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader$RocketMQRecordsWithSplitIds.class */
    private static class RocketMQRecordsWithSplitIds<T> implements RecordsWithSplitIds<T> {
        private String currentSplitId;
        private Iterator<T> recordIterator;
        private Iterator<Map.Entry<String, List<T>>> splitIterator;
        private final RocketMQSourceReaderMetrics readerMetrics;
        private final Set<String> finishedSplits = new HashSet();
        private final Map<String, List<T>> recordsBySplits = new HashMap();

        public RocketMQRecordsWithSplitIds(RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) {
            this.readerMetrics = rocketMQSourceReaderMetrics;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<T> recordsForSplit(String str) {
            return this.recordsBySplits.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            });
        }

        private void addFinishedSplit(String str) {
            this.finishedSplits.add(str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareForRead() {
            this.splitIterator = this.recordsBySplits.entrySet().iterator();
        }

        @Nullable
        public String nextSplit() {
            if (!this.splitIterator.hasNext()) {
                this.currentSplitId = null;
                this.recordIterator = null;
                return null;
            }
            Map.Entry<String, List<T>> next = this.splitIterator.next();
            this.currentSplitId = next.getKey();
            this.recordIterator = next.getValue().iterator();
            return this.currentSplitId;
        }

        @Nullable
        public T nextRecordFromSplit() {
            Preconditions.checkNotNull(this.currentSplitId, "Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext()) {
                return this.recordIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }

    public RocketMQSplitReader(Configuration configuration, SourceReaderContext sourceReaderContext, RocketMQDeserializationSchema<T> rocketMQDeserializationSchema, RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) {
        this.configuration = configuration;
        this.sourceReaderContext = sourceReaderContext;
        this.deserializationSchema = rocketMQDeserializationSchema;
        this.consumer = new InnerConsumerImpl(configuration);
        this.consumer.start();
        this.rocketmqSourceReaderMetrics = rocketMQSourceReaderMetrics;
        this.commitOffsetsOnCheckpoint = configuration.getBoolean(RocketMQOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
    }

    public RecordsWithSplitIds<MessageView> fetch() throws IOException {
        this.wakeup = false;
        RocketMQRecordsWithSplitIds rocketMQRecordsWithSplitIds = new RocketMQRecordsWithSplitIds(this.rocketmqSourceReaderMetrics);
        try {
            for (MessageView messageView : this.consumer.poll(Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT)))) {
                String splitId = UtilAll.getSplitId(new MessageQueue(messageView.getTopic(), messageView.getBrokerName(), messageView.getQueueId()));
                rocketMQRecordsWithSplitIds.recordsForSplit(splitId).add(messageView);
                if (this.configuration.getBoolean(RocketMQSourceOptions.GLOBAL_DEBUG_MODE)) {
                    LOG.info("Reader fetch splitId: {}, messageId: {}", splitId, messageView.getMessageId());
                }
            }
            rocketMQRecordsWithSplitIds.prepareForRead();
        } catch (Exception e) {
            LOG.error("Reader fetch split error", (Throwable) e);
        }
        return rocketMQRecordsWithSplitIds;
    }

    public void handleSplitsChanges(SplitsChange<RocketMQSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        splitsChange.splits().forEach(rocketMQSourceSplit -> {
            MessageQueue messageQueue = new MessageQueue(rocketMQSourceSplit.getTopic(), rocketMQSourceSplit.getBrokerName(), rocketMQSourceSplit.getQueueId());
            concurrentHashMap.put(messageQueue, new Tuple2(Long.valueOf(rocketMQSourceSplit.getStartingOffset()), Long.valueOf(rocketMQSourceSplit.getStoppingOffset())));
            this.rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue);
        });
        this.consumer.assign(concurrentHashMap.keySet());
        for (Map.Entry entry : concurrentHashMap.entrySet()) {
            MessageQueue messageQueue = (MessageQueue) entry.getKey();
            Long l = (Long) ((Tuple2) entry.getValue()).f0;
            try {
                this.consumer.seek(messageQueue, l.longValue());
            } catch (Exception e) {
                throw new FlinkRuntimeException(String.format("messageQueue:%s, seek to starting offset:%s", messageQueue, l), e);
            }
        }
    }

    public void wakeUp() {
        LOG.debug("Wake up the split reader in case the fetcher thread is blocking in fetch().");
        this.wakeup = true;
        this.consumer.wakeup();
    }

    public void close() {
        try {
            this.consumer.close();
        } catch (Exception e) {
            LOG.error("close consumer error", (Throwable) e);
        }
    }

    public void notifyCheckpointComplete(Map<MessageQueue, Long> map) {
        if (map != null) {
            for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
                this.consumer.commitOffset(entry.getKey(), entry.getValue().longValue());
            }
        }
    }

    private void finishSplitAtRecord(MessageQueue messageQueue, long j, RocketMQRecordsWithSplitIds<MessageView> rocketMQRecordsWithSplitIds) {
        LOG.info("message queue {} has reached stopping offset {}", messageQueue, Long.valueOf(j));
        this.currentOffsetTable.remove(messageQueue);
    }
}
