package org.apache.flink.contrib.streaming.state.snapshot;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnegative;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
import org.apache.flink.contrib.streaming.state.iterator.RocksQueueIterator;
import org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator;
import org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper;
import org.apache.flink.contrib.streaming.state.iterator.SingleStateIterator;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyValueStateIterator;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueStateSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.ResourceGuard;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
import org.rocksdb.Snapshot;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources.class */
public class RocksDBFullSnapshotResources<K> implements FullSnapshotResources<K> {
    private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
    private final ResourceGuard.Lease lease;
    private final Snapshot snapshot;
    private final RocksDB db;
    private final List<MetaData> metaData;

    @Nonnegative
    private final int keyGroupPrefixBytes;
    private final KeyGroupRange keyGroupRange;
    private final TypeSerializer<K> keySerializer;
    private final StreamCompressionDecorator streamCompressionDecorator;
    private final List<HeapPriorityQueueStateSnapshot<?>> heapPriorityQueuesSnapshots;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/contrib/streaming/state/snapshot/RocksDBFullSnapshotResources$MetaData.class */
    public static class MetaData {
        final RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo;
        final StateSnapshotTransformer<byte[]> stateSnapshotTransformer;

        private MetaData(RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo, StateSnapshotTransformer<byte[]> stateSnapshotTransformer) {
            this.rocksDbKvStateInfo = rocksDbKvStateInfo;
            this.stateSnapshotTransformer = stateSnapshotTransformer;
        }
    }

    public RocksDBFullSnapshotResources(ResourceGuard.Lease lease, Snapshot snapshot, List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> list, List<HeapPriorityQueueStateSnapshot<?>> list2, List<StateMetaInfoSnapshot> list3, RocksDB rocksDB, int i, KeyGroupRange keyGroupRange, TypeSerializer<K> typeSerializer, StreamCompressionDecorator streamCompressionDecorator) {
        this.lease = lease;
        this.snapshot = snapshot;
        this.stateMetaInfoSnapshots = list3;
        this.heapPriorityQueuesSnapshots = list2;
        this.db = rocksDB;
        this.keyGroupPrefixBytes = i;
        this.keyGroupRange = keyGroupRange;
        this.keySerializer = typeSerializer;
        this.streamCompressionDecorator = streamCompressionDecorator;
        this.metaData = fillMetaData(list);
    }

    public static <K> RocksDBFullSnapshotResources<K> create(LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> linkedHashMap, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> map, RocksDB rocksDB, ResourceGuard resourceGuard, KeyGroupRange keyGroupRange, TypeSerializer<K> typeSerializer, int i, StreamCompressionDecorator streamCompressionDecorator) throws IOException {
        ArrayList arrayList = new ArrayList(linkedHashMap.size());
        ArrayList arrayList2 = new ArrayList(linkedHashMap.size());
        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : linkedHashMap.values()) {
            arrayList.add(rocksDbKvStateInfo.metaInfo.snapshot());
            arrayList2.add(rocksDbKvStateInfo);
        }
        ArrayList arrayList3 = new ArrayList(map.size());
        for (HeapPriorityQueueSnapshotRestoreWrapper<?> heapPriorityQueueSnapshotRestoreWrapper : map.values()) {
            arrayList.add(heapPriorityQueueSnapshotRestoreWrapper.getMetaInfo().snapshot());
            arrayList3.add(heapPriorityQueueSnapshotRestoreWrapper.stateSnapshot());
        }
        return new RocksDBFullSnapshotResources<>(resourceGuard.acquireResource(), rocksDB.getSnapshot(), arrayList2, arrayList3, arrayList, rocksDB, i, keyGroupRange, typeSerializer, streamCompressionDecorator);
    }

    private List<MetaData> fillMetaData(List<RocksDBKeyedStateBackend.RocksDbKvStateInfo> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (RocksDBKeyedStateBackend.RocksDbKvStateInfo rocksDbKvStateInfo : list) {
            StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
            if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
                stateSnapshotTransformer = ((RegisteredKeyValueStateBackendMetaInfo) rocksDbKvStateInfo.metaInfo).getStateSnapshotTransformFactory().createForSerializedState().orElse(null);
            }
            arrayList.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.state.FullSnapshotResources
    public KeyValueStateIterator createKVStateIterator() throws IOException {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        try {
            ReadOptions readOptions = new ReadOptions();
            readOptions.getClass();
            closeableRegistry.registerCloseable(readOptions::close);
            readOptions.setSnapshot(this.snapshot);
            return new RocksStatesPerKeyGroupMergeIterator(closeableRegistry, createKVStateIterators(closeableRegistry, readOptions), createHeapPriorityQueueIterators(), this.keyGroupPrefixBytes);
        } catch (Throwable th) {
            IOUtils.closeQuietly(closeableRegistry);
            throw new IOException("Error creating merge iterator", th);
        }
    }

    private List<SingleStateIterator> createHeapPriorityQueueIterators() {
        int size = this.metaData.size();
        ArrayList arrayList = new ArrayList(this.heapPriorityQueuesSnapshots.size());
        Iterator<HeapPriorityQueueStateSnapshot<?>> it = this.heapPriorityQueuesSnapshots.iterator();
        while (it.hasNext()) {
            int i = size;
            size++;
            arrayList.add(new RocksQueueIterator(it.next(), this.keyGroupRange, this.keyGroupPrefixBytes, i));
        }
        return arrayList;
    }

    private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(CloseableRegistry closeableRegistry, ReadOptions readOptions) throws IOException {
        ArrayList arrayList = new ArrayList(this.metaData.size());
        int i = 0;
        for (MetaData metaData : this.metaData) {
            RocksIteratorWrapper createRocksIteratorWrapper = createRocksIteratorWrapper(this.db, metaData.rocksDbKvStateInfo.columnFamilyHandle, metaData.stateSnapshotTransformer, readOptions);
            arrayList.add(Tuple2.of(createRocksIteratorWrapper, Integer.valueOf(i)));
            closeableRegistry.registerCloseable(createRocksIteratorWrapper);
            i++;
        }
        return arrayList;
    }

    private static RocksIteratorWrapper createRocksIteratorWrapper(RocksDB rocksDB, ColumnFamilyHandle columnFamilyHandle, StateSnapshotTransformer<byte[]> stateSnapshotTransformer, ReadOptions readOptions) {
        RocksIterator newIterator = rocksDB.newIterator(columnFamilyHandle, readOptions);
        return stateSnapshotTransformer == null ? new RocksIteratorWrapper(newIterator) : new RocksTransformingIteratorWrapper(newIterator, stateSnapshotTransformer);
    }

    @Override // org.apache.flink.runtime.state.FullSnapshotResources
    public List<StateMetaInfoSnapshot> getMetaInfoSnapshots() {
        return this.stateMetaInfoSnapshots;
    }

    @Override // org.apache.flink.runtime.state.FullSnapshotResources
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.FullSnapshotResources
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override // org.apache.flink.runtime.state.FullSnapshotResources
    public StreamCompressionDecorator getStreamCompressionDecorator() {
        return this.streamCompressionDecorator;
    }

    @Override // org.apache.flink.runtime.state.SnapshotResources
    public void release() {
        this.db.releaseSnapshot(this.snapshot);
        IOUtils.closeQuietly(this.snapshot);
        IOUtils.closeQuietly(this.lease);
    }
}
