package org.apache.flink.connector.rocketmq.source;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.rocketmq.source.enumerator.RocketMQSourceEnumState;
import org.apache.flink.connector.rocketmq.source.enumerator.RocketMQSourceEnumStateSerializer;
import org.apache.flink.connector.rocketmq.source.enumerator.RocketMQSourceEnumerator;
import org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics;
import org.apache.flink.connector.rocketmq.source.reader.RocketMQRecordEmitter;
import org.apache.flink.connector.rocketmq.source.reader.RocketMQSourceFetcherManager;
import org.apache.flink.connector.rocketmq.source.reader.RocketMQSourceReader;
import org.apache.flink.connector.rocketmq.source.reader.RocketMQSplitReader;
import org.apache.flink.connector.rocketmq.source.reader.deserializer.RocketMQDeserializationSchema;
import org.apache.flink.connector.rocketmq.source.split.RocketMQPartitionSplitSerializer;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/rocketmq/source/RocketMQSource.class */
public class RocketMQSource<OUT> implements Source<OUT, RocketMQSourceSplit, RocketMQSourceEnumState>, ResultTypeQueryable<OUT> {
    private static final long serialVersionUID = -1;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMQSource.class);
    private final OffsetsSelector startingOffsetsSelector;
    private final OffsetsSelector stoppingOffsetsSelector;
    private final Configuration configuration;
    private final Boundedness boundedness;
    private final RocketMQDeserializationSchema<OUT> deserializationSchema;

    public RocketMQSource(OffsetsSelector offsetsSelector, OffsetsSelector offsetsSelector2, Boundedness boundedness, RocketMQDeserializationSchema<OUT> rocketMQDeserializationSchema, Configuration configuration) {
        this.startingOffsetsSelector = offsetsSelector;
        this.stoppingOffsetsSelector = offsetsSelector2;
        this.boundedness = boundedness;
        this.deserializationSchema = rocketMQDeserializationSchema;
        this.configuration = configuration;
    }

    public static <OUT> RocketMQSourceBuilder<OUT> builder() {
        return new RocketMQSourceBuilder<>();
    }

    public Boundedness getBoundedness() {
        return this.boundedness;
    }

    public SourceReader<OUT, RocketMQSourceSplit> createReader(final SourceReaderContext sourceReaderContext) throws Exception {
        FutureCompletingBlockingQueue futureCompletingBlockingQueue = new FutureCompletingBlockingQueue();
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext() { // from class: org.apache.flink.connector.rocketmq.source.RocketMQSource.1
            public MetricGroup getMetricGroup() {
                return sourceReaderContext.metricGroup().addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return sourceReaderContext.getUserCodeClassLoader();
            }
        });
        RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics = new RocketMQSourceReaderMetrics(sourceReaderContext.metricGroup());
        return new RocketMQSourceReader(futureCompletingBlockingQueue, new RocketMQSourceFetcherManager(futureCompletingBlockingQueue, () -> {
            return new RocketMQSplitReader(this.configuration, sourceReaderContext, this.deserializationSchema, rocketMQSourceReaderMetrics);
        }, collection -> {
        }), new RocketMQRecordEmitter(this.deserializationSchema), this.configuration, sourceReaderContext, rocketMQSourceReaderMetrics);
    }

    public SplitEnumerator<RocketMQSourceSplit, RocketMQSourceEnumState> createEnumerator(SplitEnumeratorContext<RocketMQSourceSplit> splitEnumeratorContext) {
        return new RocketMQSourceEnumerator(this.startingOffsetsSelector, this.stoppingOffsetsSelector, this.boundedness, this.configuration, splitEnumeratorContext);
    }

    public SplitEnumerator<RocketMQSourceSplit, RocketMQSourceEnumState> restoreEnumerator(SplitEnumeratorContext<RocketMQSourceSplit> splitEnumeratorContext, RocketMQSourceEnumState rocketMQSourceEnumState) {
        return new RocketMQSourceEnumerator(this.startingOffsetsSelector, this.stoppingOffsetsSelector, this.boundedness, this.configuration, splitEnumeratorContext, rocketMQSourceEnumState.getCurrentSplitAssignment());
    }

    public SimpleVersionedSerializer<RocketMQSourceSplit> getSplitSerializer() {
        return new RocketMQPartitionSplitSerializer();
    }

    public SimpleVersionedSerializer<RocketMQSourceEnumState> getEnumeratorCheckpointSerializer() {
        return new RocketMQSourceEnumStateSerializer();
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<RocketMQSourceSplit>) splitEnumeratorContext, (RocketMQSourceEnumState) obj);
    }
}
