package org.apache.flink.connector.rocketmq.source.enumerator;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.rocketmq.source.InnerConsumer;
import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
import org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateStrategy;
import org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateStrategyFactory;
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.class */
public class RocketMQSourceEnumerator implements SplitEnumerator<RocketMQSourceSplit, RocketMQSourceEnumState> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMQSourceEnumerator.class);
    private final Configuration configuration;
    private final SplitEnumeratorContext<RocketMQSourceSplit> context;
    private final Boundedness boundedness;
    private InnerConsumer consumer;
    private final AllocateStrategy allocateStrategy;
    private final OffsetsSelector startingOffsetsSelector;
    private final OffsetsSelector stoppingOffsetsSelector;
    private final Set<MessageQueue> allocatedSet;
    private final Map<Integer, Set<RocketMQSourceSplit>> pendingSplitAssignmentMap;
    private final String groupId;
    private final long partitionDiscoveryIntervalMs;

    /* JADX INFO: Access modifiers changed from: private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator$SourceChangeResult.class */
    public static class SourceChangeResult {
        private final Set<MessageQueue> increaseSet;
        private final Set<MessageQueue> decreaseSet;

        public SourceChangeResult(Set<MessageQueue> set, Set<MessageQueue> set2) {
            this.increaseSet = set;
            this.decreaseSet = set2;
        }

        public Set<MessageQueue> getIncreaseSet() {
            return this.increaseSet;
        }

        public Set<MessageQueue> getDecreaseSet() {
            return this.decreaseSet;
        }

        public boolean isEmpty() {
            return this.increaseSet.isEmpty() && this.decreaseSet.isEmpty();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator$SourceSplitChangeResult.class */
    public static class SourceSplitChangeResult {
        private final Set<RocketMQSourceSplit> increaseSet;
        private final Set<MessageQueue> decreaseSet;

        private SourceSplitChangeResult(Set<RocketMQSourceSplit> set) {
            this.increaseSet = Collections.unmodifiableSet(set);
            this.decreaseSet = Sets.newHashSet();
        }

        private SourceSplitChangeResult(Set<RocketMQSourceSplit> set, Set<MessageQueue> set2) {
            this.increaseSet = Collections.unmodifiableSet(set);
            this.decreaseSet = Collections.unmodifiableSet(set2);
        }

        public Set<RocketMQSourceSplit> getIncreaseSet() {
            return this.increaseSet;
        }

        public Set<MessageQueue> getDecreaseSet() {
            return this.decreaseSet;
        }
    }

    public RocketMQSourceEnumerator(OffsetsSelector offsetsSelector, OffsetsSelector offsetsSelector2, Boundedness boundedness, Configuration configuration, SplitEnumeratorContext<RocketMQSourceSplit> splitEnumeratorContext) {
        this(offsetsSelector, offsetsSelector2, boundedness, configuration, splitEnumeratorContext, new HashSet());
    }

    public RocketMQSourceEnumerator(OffsetsSelector offsetsSelector, OffsetsSelector offsetsSelector2, Boundedness boundedness, Configuration configuration, SplitEnumeratorContext<RocketMQSourceSplit> splitEnumeratorContext, Set<MessageQueue> set) {
        this.configuration = configuration;
        this.context = splitEnumeratorContext;
        this.boundedness = boundedness;
        this.pendingSplitAssignmentMap = new ConcurrentHashMap();
        this.allocatedSet = new HashSet(set);
        this.allocateStrategy = AllocateStrategyFactory.getStrategy(configuration, splitEnumeratorContext, new RocketMQSourceEnumState(this.allocatedSet));
        this.groupId = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
        this.startingOffsetsSelector = offsetsSelector;
        this.stoppingOffsetsSelector = offsetsSelector2;
        this.partitionDiscoveryIntervalMs = configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
    }

    public void start() {
        this.consumer = new InnerConsumerImpl(this.configuration);
        this.consumer.start();
        if (this.partitionDiscoveryIntervalMs > 0) {
            log.info("Starting the RocketMQSourceEnumerator for consumer group {} with partition discovery interval of {} ms.", this.groupId, Long.valueOf(this.partitionDiscoveryIntervalMs));
            this.context.callAsync(this::requestServiceDiscovery, this::handleSourceQueueChange, 0L, this.partitionDiscoveryIntervalMs);
        } else {
            log.info("Starting the RocketMQSourceEnumerator for consumer group {} without periodic partition discovery.", this.groupId);
            this.context.callAsync(this::requestServiceDiscovery, this::handleSourceQueueChange);
        }
    }

    public void handleSplitRequest(int i, @Nullable String str) {
    }

    public void addSplitsBack(List<RocketMQSourceSplit> list, int i) {
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            sendSplitChangesToRemote(Collections.singleton(Integer.valueOf(i)));
        }
    }

    public void addReader(int i) {
        log.debug("Adding reader {} to RocketMQSourceEnumerator for consumer group {}.", Integer.valueOf(i), this.groupId);
        sendSplitChangesToRemote(Collections.singleton(Integer.valueOf(i)));
        if (this.boundedness == Boundedness.BOUNDED) {
            this.context.signalNoMoreSplits(i);
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public RocketMQSourceEnumState m5488snapshotState(long j) {
        return new RocketMQSourceEnumState(this.allocatedSet);
    }

    public void close() {
        if (this.consumer != null) {
            try {
                this.consumer.close();
                this.consumer = null;
            } catch (Exception e) {
                log.error("Shutdown rocketmq internal consumer error", (Throwable) e);
            }
        }
    }

    private Set<MessageQueue> requestServiceDiscovery() {
        return (Set) Sets.newHashSet(this.configuration.getString(RocketMQSourceOptions.TOPIC).split(RocketMQSourceOptions.TOPIC_SEPARATOR)).stream().flatMap(str -> {
            try {
                return this.consumer.fetchMessageQueues(str).get().stream();
            } catch (Exception e) {
                log.error("Request topic route for service discovery error, topic={}", str, e);
                return Stream.empty();
            }
        }).collect(Collectors.toSet());
    }

    private void handleSourceQueueChange(Set<MessageQueue> set, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to handle source splits change due to ", th);
        }
        SourceChangeResult sourceChangeResult = getSourceChangeResult(set);
        if (sourceChangeResult.isEmpty()) {
            log.debug("Skip handle source allocated due to not queue change");
        } else {
            this.context.callAsync(() -> {
                return initializeSourceSplits(sourceChangeResult);
            }, this::handleSplitChanges);
        }
    }

    private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult sourceChangeResult) {
        Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet();
        InnerConsumerImpl.RemotingOffsetsRetrieverImpl remotingOffsetsRetrieverImpl = new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(this.consumer);
        Map<MessageQueue, Long> messageQueueOffsets = this.startingOffsetsSelector.getMessageQueueOffsets(increaseSet, remotingOffsetsRetrieverImpl);
        Map<MessageQueue, Long> messageQueueOffsets2 = this.stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet, remotingOffsetsRetrieverImpl);
        return new SourceSplitChangeResult((Set) increaseSet.stream().map(messageQueue -> {
            return new RocketMQSourceSplit(messageQueue, ((Long) messageQueueOffsets.get(messageQueue)).longValue(), ((Long) messageQueueOffsets2.getOrDefault(messageQueue, -1L)).longValue());
        }).collect(Collectors.toSet()), sourceChangeResult.getDecreaseSet());
    }

    private void handleSplitChanges(SourceSplitChangeResult sourceSplitChangeResult, Throwable th) {
        if (th != null) {
            throw new FlinkRuntimeException("Failed to initialize partition splits due to ", th);
        }
        if (this.partitionDiscoveryIntervalMs <= 0) {
            log.info("Split changes, but dynamic partition discovery is disabled.");
        }
        calculateSplitAssignment(sourceSplitChangeResult);
        sendSplitChangesToRemote(this.context.registeredReaders().keySet());
    }

    private void calculateSplitAssignment(SourceSplitChangeResult sourceSplitChangeResult) {
        for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry : this.allocateStrategy.allocate(sourceSplitChangeResult.getIncreaseSet(), this.context.currentParallelism()).entrySet()) {
            this.pendingSplitAssignmentMap.computeIfAbsent(entry.getKey(), num -> {
                return new HashSet();
            }).addAll(entry.getValue());
        }
    }

    private void sendSplitChangesToRemote(Set<Integer> set) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (Integer num : set) {
            if (!this.context.registeredReaders().containsKey(num)) {
                throw new IllegalStateException(String.format("Reader %d is not registered to source coordinator", num));
            }
            Set<RocketMQSourceSplit> set2 = this.pendingSplitAssignmentMap.get(num);
            if (set2 != null && !set2.isEmpty()) {
                ((List) concurrentHashMap.computeIfAbsent(num, num2 -> {
                    return new ArrayList();
                })).addAll(set2);
                set2.forEach(rocketMQSourceSplit -> {
                    this.allocatedSet.add(rocketMQSourceSplit.getMessageQueue());
                });
            }
        }
        if (!concurrentHashMap.isEmpty()) {
            log.info("Enumerator assigning split(s) to readers {}", JSON.toJSONString((Object) concurrentHashMap, false));
            this.context.assignSplits(new SplitsAssignment(concurrentHashMap));
        }
        if (this.partitionDiscoveryIntervalMs > 0 || this.boundedness != Boundedness.BOUNDED) {
            return;
        }
        log.info("No more rocketmq partition to assign. Sending NoMoreSplitsEvent to the readers in consumer group {}.", this.groupId);
        SplitEnumeratorContext<RocketMQSourceSplit> splitEnumeratorContext = this.context;
        splitEnumeratorContext.getClass();
        set.forEach((v1) -> {
            r1.signalNoMoreSplits(v1);
        });
    }

    @VisibleForTesting
    private SourceChangeResult getSourceChangeResult(Set<MessageQueue> set) {
        Set unmodifiableSet = Collections.unmodifiableSet(this.allocatedSet);
        Sets.SetView difference = Sets.difference(set, unmodifiableSet);
        Sets.SetView difference2 = Sets.difference(unmodifiableSet, set);
        SourceChangeResult sourceChangeResult = new SourceChangeResult(difference, difference2);
        if (sourceChangeResult.isEmpty()) {
            log.info("Request topic route for service discovery, current allocated queues size={}", Integer.valueOf(unmodifiableSet.size()));
        } else {
            log.info("Request topic route for service discovery, current allocated queues size: {}. Changed details, current={}, latest={}, increase={}, decrease={}", Integer.valueOf(unmodifiableSet.size()), unmodifiableSet, set, difference, difference2);
        }
        return sourceChangeResult;
    }
}
