package org.apache.flink.connector.rocketmq.source.reader;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
import org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplitState;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.class */
public class RocketMQSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<MessageView, T, RocketMQSourceSplit, RocketMQSourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) RocketMQSourceReader.class);
    private final boolean commitOffsetsOnCheckpoint;
    private final SortedMap<Long, Map<MessageQueue, Long>> offsetsToCommit;
    private final ConcurrentMap<MessageQueue, Long> offsetsOfFinishedSplits;
    private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;

    public RocketMQSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>> futureCompletingBlockingQueue, RocketMQSourceFetcherManager rocketMQSourceFetcherManager, RecordEmitter<MessageView, T, RocketMQSourceSplitState> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext, RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) {
        super(futureCompletingBlockingQueue, rocketMQSourceFetcherManager, recordEmitter, configuration, sourceReaderContext);
        this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.offsetsOfFinishedSplits = new ConcurrentHashMap();
        this.commitOffsetsOnCheckpoint = ((Boolean) configuration.get(RocketMQSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)).booleanValue();
        this.rocketmqSourceReaderMetrics = rocketMQSourceReaderMetrics;
    }

    protected void onSplitFinished(Map<String, RocketMQSourceSplitState> map) {
        map.forEach((str, rocketMQSourceSplitState) -> {
            if (rocketMQSourceSplitState.getCurrentOffset() >= 0) {
                this.offsetsOfFinishedSplits.put(rocketMQSourceSplitState.getMessageQueue(), Long.valueOf(rocketMQSourceSplitState.getCurrentOffset()));
            }
        });
    }

    public List<RocketMQSourceSplit> snapshotState(long j) {
        List<RocketMQSourceSplit> snapshotState = super.snapshotState(j);
        if (!this.commitOffsetsOnCheckpoint) {
            return snapshotState;
        }
        if (snapshotState.isEmpty() && this.offsetsOfFinishedSplits.isEmpty()) {
            this.offsetsToCommit.put(Long.valueOf(j), Collections.emptyMap());
        } else {
            Map<MessageQueue, Long> computeIfAbsent = this.offsetsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashMap();
            });
            for (RocketMQSourceSplit rocketMQSourceSplit : snapshotState) {
                if (rocketMQSourceSplit.getStartingOffset() >= 0) {
                    computeIfAbsent.put(UtilAll.getMessageQueue(rocketMQSourceSplit), Long.valueOf(rocketMQSourceSplit.getStartingOffset()));
                }
            }
            computeIfAbsent.putAll(this.offsetsOfFinishedSplits);
        }
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) {
        LOG.debug("Committing offsets for checkpoint {}", Long.valueOf(j));
        if (this.commitOffsetsOnCheckpoint) {
            Map<MessageQueue, Long> map = this.offsetsToCommit.get(Long.valueOf(j));
            if (map == null) {
                LOG.info("Offsets for checkpoint {} either do not exist or have already been committed.", Long.valueOf(j));
            } else {
                this.splitFetcherManager.commitOffsets(map);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RocketMQSourceSplitState initializedState(RocketMQSourceSplit rocketMQSourceSplit) {
        return new RocketMQSourceSplitState(rocketMQSourceSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RocketMQSourceSplit toSplitType(String str, RocketMQSourceSplitState rocketMQSourceSplitState) {
        return rocketMQSourceSplitState.getSourceSplit();
    }

    @VisibleForTesting
    SortedMap<Long, Map<MessageQueue, Long>> getOffsetsToCommit() {
        return this.offsetsToCommit;
    }

    @VisibleForTesting
    int getNumAliveFetchers() {
        return this.splitFetcherManager.getNumAliveFetchers();
    }
}
