package org.apache.flink.runtime.state.restore;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.FullSnapshotUtil;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RestoreOperation;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/restore/FullSnapshotRestoreOperation.class */
public class FullSnapshotRestoreOperation<K> implements RestoreOperation<ThrowingIterator<SavepointRestoreResult>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FullSnapshotRestoreOperation.class);
    private final KeyGroupRange keyGroupRange;
    private final ClassLoader userCodeClassLoader;
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final StateSerializerProvider<K> keySerializerProvider;
    private boolean isKeySerializerCompatibilityChecked;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/restore/FullSnapshotRestoreOperation$KeyGroupEntriesIterator.class */
    public static class KeyGroupEntriesIterator implements ThrowingIterator<KeyGroupEntry> {
        private final DataInputViewStreamWrapper kgInputView;
        private Integer currentKvStateId;

        private KeyGroupEntriesIterator(@Nonnull DataInputViewStreamWrapper dataInputViewStreamWrapper) throws IOException {
            this.kgInputView = dataInputViewStreamWrapper;
            this.currentKvStateId = Integer.valueOf(65535 & dataInputViewStreamWrapper.readShort());
        }

        private KeyGroupEntriesIterator() {
            this.kgInputView = null;
            this.currentKvStateId = null;
        }

        @Override // org.apache.flink.runtime.state.restore.ThrowingIterator
        public boolean hasNext() {
            return this.currentKvStateId != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.state.restore.ThrowingIterator
        public KeyGroupEntry next() throws IOException {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            byte[] mo2373deserialize = BytePrimitiveArraySerializer.INSTANCE.mo2373deserialize((DataInputView) this.kgInputView);
            byte[] mo2373deserialize2 = BytePrimitiveArraySerializer.INSTANCE.mo2373deserialize((DataInputView) this.kgInputView);
            int intValue = this.currentKvStateId.intValue();
            if (FullSnapshotUtil.hasMetaDataFollowsFlag(mo2373deserialize)) {
                FullSnapshotUtil.clearMetaDataFollowsFlag(mo2373deserialize);
                this.currentKvStateId = Integer.valueOf(65535 & this.kgInputView.readShort());
                if (65535 == this.currentKvStateId.intValue()) {
                    this.currentKvStateId = null;
                }
            }
            return new KeyGroupEntry(intValue, mo2373deserialize, mo2373deserialize2);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            if (this.kgInputView != null) {
                this.kgInputView.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/restore/FullSnapshotRestoreOperation$KeyGroupsIterator.class */
    public static class KeyGroupsIterator implements ThrowingIterator<KeyGroup> {

        @Nonnull
        private final KeyGroupRange keyGroupRange;

        @Nonnull
        private final Iterator<Tuple2<Integer, Long>> keyGroups;

        @Nonnull
        private final FSDataInputStream currentStateHandleInStream;

        @Nonnull
        private final StreamCompressionDecorator keygroupStreamCompressionDecorator;

        @Nonnull
        private final KeyGroupsStateHandle currentKeyGroupsStateHandle;

        private KeyGroupsIterator(@Nonnull KeyGroupRange keyGroupRange, @Nonnull KeyGroupsStateHandle keyGroupsStateHandle, @Nonnull FSDataInputStream fSDataInputStream, @Nonnull StreamCompressionDecorator streamCompressionDecorator) {
            this.keyGroupRange = keyGroupRange;
            this.keyGroups = keyGroupsStateHandle.getGroupRangeOffsets().iterator();
            this.currentStateHandleInStream = fSDataInputStream;
            this.keygroupStreamCompressionDecorator = streamCompressionDecorator;
            this.currentKeyGroupsStateHandle = keyGroupsStateHandle;
            FullSnapshotRestoreOperation.LOG.info("Starting to restore from state handle: {}.", keyGroupsStateHandle);
        }

        @Override // org.apache.flink.runtime.state.restore.ThrowingIterator
        public boolean hasNext() {
            return this.keyGroups.hasNext();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.runtime.state.restore.ThrowingIterator
        public KeyGroup next() throws IOException {
            Tuple2<Integer, Long> next = this.keyGroups.next();
            int intValue = next.f0.intValue();
            Preconditions.checkState(this.keyGroupRange.contains(intValue), "The key group must belong to the backend");
            long longValue = next.f1.longValue();
            if (0 == longValue) {
                return new KeyGroup(intValue, new KeyGroupEntriesIterator());
            }
            this.currentStateHandleInStream.seek(longValue);
            return new KeyGroup(intValue, new KeyGroupEntriesIterator(new DataInputViewStreamWrapper(this.keygroupStreamCompressionDecorator.decorateWithCompression(this.currentStateHandleInStream))));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.currentStateHandleInStream.close();
            FullSnapshotRestoreOperation.LOG.info("Finished restoring from state handle: {}.", this.currentKeyGroupsStateHandle);
        }
    }

    public FullSnapshotRestoreOperation(KeyGroupRange keyGroupRange, ClassLoader classLoader, Collection<KeyedStateHandle> collection, StateSerializerProvider<K> stateSerializerProvider) {
        this.keyGroupRange = keyGroupRange;
        this.userCodeClassLoader = classLoader;
        this.restoreStateHandles = (Collection) collection.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        this.keySerializerProvider = stateSerializerProvider;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.state.RestoreOperation
    public ThrowingIterator<SavepointRestoreResult> restore() throws IOException, StateMigrationException {
        return new ThrowingIterator<SavepointRestoreResult>() { // from class: org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.1
            private final Iterator<KeyedStateHandle> keyedStateHandlesIterator;

            {
                this.keyedStateHandlesIterator = FullSnapshotRestoreOperation.this.restoreStateHandles.iterator();
            }

            @Override // org.apache.flink.runtime.state.restore.ThrowingIterator
            public boolean hasNext() {
                return this.keyedStateHandlesIterator.hasNext();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.flink.runtime.state.restore.ThrowingIterator
            public SavepointRestoreResult next() throws IOException, StateMigrationException {
                KeyedStateHandle next = this.keyedStateHandlesIterator.next();
                if (!(next instanceof KeyGroupsStateHandle)) {
                    throw StateUtil.unexpectedStateHandleException((Class<? extends StateObject>) KeyGroupsStateHandle.class, (Class<? extends StateObject>) next.getClass());
                }
                return FullSnapshotRestoreOperation.this.restoreKeyGroupsInStateHandle((KeyGroupsStateHandle) next);
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public SavepointRestoreResult restoreKeyGroupsInStateHandle(@Nonnull KeyGroupsStateHandle keyGroupsStateHandle) throws IOException, StateMigrationException {
        FSDataInputStream openInputStream = keyGroupsStateHandle.openInputStream();
        KeyedBackendSerializationProxy<K> readMetaData = readMetaData(new DataInputViewStreamWrapper(openInputStream));
        return new SavepointRestoreResult(readMetaData.getStateMetaInfoSnapshots(), new KeyGroupsIterator(this.keyGroupRange, keyGroupsStateHandle, openInputStream, readMetaData.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE));
    }

    private KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView) throws IOException, StateMigrationException {
        KeyedBackendSerializationProxy<K> keyedBackendSerializationProxy = new KeyedBackendSerializationProxy<>(this.userCodeClassLoader);
        keyedBackendSerializationProxy.read(dataInputView);
        if (!this.isKeySerializerCompatibilityChecked) {
            TypeSerializer<K> currentSchemaSerializer = this.keySerializerProvider.currentSchemaSerializer();
            TypeSerializerSchemaCompatibility<K> previousSerializerSnapshotForRestoredState = this.keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(keyedBackendSerializationProxy.getKeySerializerSnapshot());
            if (previousSerializerSnapshotForRestoredState.isCompatibleAfterMigration() || previousSerializerSnapshotForRestoredState.isIncompatible()) {
                throw new StateMigrationException("The new key serializer (" + currentSchemaSerializer + ") must be compatible with the previous key serializer (" + this.keySerializerProvider.previousSchemaSerializer() + ").");
            }
            this.isKeySerializerCompatibilityChecked = true;
        }
        return keyedBackendSerializationProxy;
    }
}
