package org.apache.flink.connector.rocketmq.source;

import com.alibaba.fastjson.JSON;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.reader.MessageView;
import org.apache.flink.connector.rocketmq.source.reader.MessageViewExt;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.class */
public class InnerConsumerImpl implements InnerConsumer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) InnerConsumerImpl.class);
    private final Configuration configuration;
    private final DefaultMQAdminExt adminExt;
    private final DefaultLitePullConsumer consumer;
    private final ExecutorService commonExecutorService;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/connector/rocketmq/source/InnerConsumerImpl$RemotingOffsetsRetrieverImpl.class */
    public static class RemotingOffsetsRetrieverImpl implements OffsetsSelector.MessageQueueOffsetsRetriever, AutoCloseable {
        private final InnerConsumer innerConsumer;

        public RemotingOffsetsRetrieverImpl(InnerConsumer innerConsumer) {
            this.innerConsumer = innerConsumer;
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.innerConsumer.close();
        }

        @Override // org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector.MessageQueueOffsetsRetriever
        public Map<MessageQueue, Long> committedOffsets(Collection<MessageQueue> collection) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            CompletableFuture.allOf((CompletableFuture[]) collection.stream().map(messageQueue -> {
                return CompletableFuture.supplyAsync(() -> {
                    return this.innerConsumer.seekCommittedOffset(messageQueue);
                }).thenAccept(completableFuture -> {
                    try {
                        concurrentHashMap.put(messageQueue, completableFuture.get());
                    } catch (Exception e) {
                        InnerConsumerImpl.LOG.error("Consumer offsets retriever fetch committed offset error", (Throwable) e);
                    }
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            return concurrentHashMap;
        }

        @Override // org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector.MessageQueueOffsetsRetriever
        public Map<MessageQueue, Long> minOffsets(Collection<MessageQueue> collection) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            CompletableFuture.allOf((CompletableFuture[]) collection.stream().map(messageQueue -> {
                return CompletableFuture.supplyAsync(() -> {
                    return this.innerConsumer.seekMinOffset(messageQueue);
                }).thenAccept(completableFuture -> {
                    try {
                        concurrentHashMap.put(messageQueue, completableFuture.get());
                    } catch (Exception e) {
                        InnerConsumerImpl.LOG.error("Consumer offsets retriever fetch min offset error", (Throwable) e);
                    }
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            return concurrentHashMap;
        }

        @Override // org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector.MessageQueueOffsetsRetriever
        public Map<MessageQueue, Long> maxOffsets(Collection<MessageQueue> collection) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            CompletableFuture.allOf((CompletableFuture[]) collection.stream().map(messageQueue -> {
                return CompletableFuture.supplyAsync(() -> {
                    return this.innerConsumer.seekMaxOffset(messageQueue);
                }).thenAccept(completableFuture -> {
                    try {
                        concurrentHashMap.put(messageQueue, completableFuture.get());
                    } catch (Exception e) {
                        InnerConsumerImpl.LOG.error("Consumer offsets retriever fetch committed offset error", (Throwable) e);
                    }
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            return concurrentHashMap;
        }

        @Override // org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector.MessageQueueOffsetsRetriever
        public Map<MessageQueue, Long> offsetsForTimes(Map<MessageQueue, Long> map) {
            ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            CompletableFuture.allOf((CompletableFuture[]) map.entrySet().stream().map(entry -> {
                return CompletableFuture.supplyAsync(() -> {
                    return this.innerConsumer.seekOffsetByTimestamp((MessageQueue) entry.getKey(), ((Long) entry.getValue()).longValue());
                }).thenAccept(completableFuture -> {
                    try {
                        concurrentHashMap.put(entry.getKey(), completableFuture.get());
                    } catch (Exception e) {
                        InnerConsumerImpl.LOG.error("Consumer offsets retriever fetch offset by timestamp error", (Throwable) e);
                    }
                });
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            return concurrentHashMap;
        }
    }

    public InnerConsumerImpl(Configuration configuration) {
        this.configuration = configuration;
        this.commonExecutorService = buildExecutorService(configuration);
        String string = configuration.getString(RocketMQSourceOptions.OPTIONAL_ACCESS_KEY);
        String string2 = configuration.getString(RocketMQSourceOptions.OPTIONAL_SECRET_KEY);
        if (StringUtils.isNullOrWhitespaceOnly(string) || StringUtils.isNullOrWhitespaceOnly(string2)) {
            this.adminExt = new DefaultMQAdminExt();
            this.consumer = new DefaultLitePullConsumer();
        } else {
            AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(string, string2));
            this.adminExt = new DefaultMQAdminExt(aclClientRPCHook);
            this.consumer = new DefaultLitePullConsumer(aclClientRPCHook);
        }
        String string3 = configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
        String string4 = configuration.getString(RocketMQSourceOptions.ENDPOINTS);
        this.consumer.setNamesrvAddr(string4);
        this.consumer.setConsumerGroup(string3);
        this.consumer.setAutoCommit(false);
        this.consumer.setVipChannelEnabled(false);
        this.consumer.setInstanceName(String.join(UtilAll.SEPARATOR, ManagementFactory.getRuntimeMXBean().getName(), string3, UUID.randomUUID().toString()));
        this.adminExt.setNamesrvAddr(string4);
        this.adminExt.setAdminExtGroup(string3);
        this.adminExt.setVipChannelEnabled(false);
        this.adminExt.setInstanceName(String.join(UtilAll.SEPARATOR, ManagementFactory.getRuntimeMXBean().getName(), string3, UUID.randomUUID().toString()));
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public void start() {
        try {
            this.adminExt.start();
            this.consumer.start();
            LOG.info("RocketMQ consumer started success, group={}, consumerId={}", this.consumer.getConsumerGroup(), this.consumer.getInstanceName());
        } catch (Throwable th) {
            LOG.error("RocketMQ consumer started failed", th);
            throw new FlinkRuntimeException("RocketMQ consumer started failed.", th);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.commonExecutorService.shutdown();
        this.adminExt.shutdown();
        this.consumer.shutdown();
    }

    private ExecutorService buildExecutorService(Configuration configuration) {
        int integer = configuration.getInteger(RocketMQSourceOptions.PULL_THREADS_NUM, Runtime.getRuntime().availableProcessors());
        return new ThreadPoolExecutor(integer, integer, TimeUnit.MINUTES.toMillis(1L), TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024), new ThreadFactoryImpl("RocketMQCommonExecutorThread_"));
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public String getConsumerGroup() {
        return this.configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public CompletableFuture<Collection<MessageQueue>> fetchMessageQueues(String str) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Collection<MessageQueue> fetchMessageQueues = this.consumer.fetchMessageQueues(str);
                LOG.info("Consumer request topic route for service discovery, topic={}, route={}", str, JSON.toJSONString(fetchMessageQueues));
                return fetchMessageQueues;
            } catch (Exception e) {
                LOG.error("Consumer request topic route for service discovery, topic={}, nsAddress={}", str, this.consumer.getNamesrvAddr(), e);
                return Collections.emptyList();
            }
        }, this.commonExecutorService);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public void assign(Collection<MessageQueue> collection) {
        this.consumer.assign(collection);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public Set<MessageQueue> assignment() {
        try {
            return this.consumer.assignment();
        } catch (MQClientException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public List<MessageView> poll(Duration duration) {
        return (List) this.consumer.poll(duration.toMillis()).stream().map(MessageViewExt::new).collect(Collectors.toList());
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public void wakeup() {
        try {
            Set<MessageQueue> assignment = this.consumer.assignment();
            if (assignment != null) {
                this.consumer.pause(assignment);
            }
        } catch (MQClientException e) {
            LOG.warn("Consume wakeup long polling failed", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public void seek(MessageQueue messageQueue, long j) {
        try {
            this.consumer.seek(messageQueue, j);
            LOG.info("Consumer current offset has been reset, mq={}, next poll will start from offset={}", UtilAll.getQueueDescription(messageQueue), Long.valueOf(j));
        } catch (MQClientException e) {
            throw new FlinkRuntimeException(e);
        }
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public void pause(Collection<MessageQueue> collection) {
        this.consumer.pause(collection);
        LOG.info("Consumer pause fetch messages, mq(s)={}", collection);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public void resume(Collection<MessageQueue> collection) {
        this.consumer.resume(collection);
        LOG.info("Consumer resume fetch messages, mq(s)={}", collection);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public CompletableFuture<Long> seekCommittedOffset(MessageQueue messageQueue) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                long readOffset = this.consumer.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE);
                LOG.error("Consumer seek committed offset from remote, mq={}, offset={}", UtilAll.getQueueDescription(messageQueue), Long.valueOf(readOffset));
                return Long.valueOf(readOffset);
            } catch (Exception e) {
                LOG.error("Consumer seek committed offset from remote error, mq={}", UtilAll.getQueueDescription(messageQueue), e);
                throw new RuntimeException(e);
            }
        }, this.commonExecutorService);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public CompletableFuture<Long> seekMinOffset(MessageQueue messageQueue) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                long minOffset = this.adminExt.minOffset(messageQueue);
                LOG.info("Consumer seek min offset from remote, mq={}, offset={}", UtilAll.getQueueDescription(messageQueue), Long.valueOf(minOffset));
                return Long.valueOf(minOffset);
            } catch (Exception e) {
                LOG.info("Consumer seek min offset from remote error, mq={}", UtilAll.getQueueDescription(messageQueue), e);
                throw new RuntimeException(e);
            }
        }, this.commonExecutorService);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public CompletableFuture<Long> seekMaxOffset(MessageQueue messageQueue) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                long maxOffset = this.adminExt.maxOffset(messageQueue);
                LOG.info("Consumer seek max offset from remote, mq={}, offset={}", UtilAll.getQueueDescription(messageQueue), Long.valueOf(maxOffset));
                return Long.valueOf(maxOffset);
            } catch (Exception e) {
                LOG.info("Consumer seek max offset from remote error, mq={}", UtilAll.getQueueDescription(messageQueue), e);
                throw new RuntimeException(e);
            }
        }, this.commonExecutorService);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public CompletableFuture<Long> seekOffsetByTimestamp(MessageQueue messageQueue, long j) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                long searchOffset = this.adminExt.searchOffset(messageQueue, j);
                LOG.info("Consumer seek offset by timestamp from remote, mq={}, timestamp={}, offset={}", UtilAll.getQueueDescription(messageQueue), Long.valueOf(j), Long.valueOf(searchOffset));
                return Long.valueOf(searchOffset);
            } catch (MQClientException e) {
                LOG.info("Consumer seek offset by timestamp from remote error, mq={}, timestamp={}", UtilAll.getQueueDescription(messageQueue), Long.valueOf(j), e);
                throw new RuntimeException(e);
            }
        }, this.commonExecutorService);
    }

    @Override // org.apache.flink.connector.rocketmq.source.InnerConsumer
    public CompletableFuture<Void> commitOffset(MessageQueue messageQueue, long j) {
        return CompletableFuture.runAsync(() -> {
            this.consumer.getOffsetStore().updateOffset(messageQueue, j, true);
        }, this.commonExecutorService);
    }
}
