package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.class */
public final class StreamTaskNetworkInput<T> extends AbstractStreamTaskNetworkInput<T, SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>> {
    public StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<T> typeSerializer, IOManager iOManager, StatusWatermarkValve statusWatermarkValve, int i) {
        super(checkpointedInputGate, typeSerializer, statusWatermarkValve, i, getRecordDeserializers(checkpointedInputGate, iOManager));
    }

    private static Map<InputChannelInfo, SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>> getRecordDeserializers(CheckpointedInputGate checkpointedInputGate, IOManager iOManager) {
        return (Map) checkpointedInputGate.getChannelInfos().stream().collect(Collectors.toMap(Function.identity(), inputChannelInfo -> {
            return new SpillingAdaptiveSpanningRecordDeserializer(iOManager.getSpillingDirectoriesPaths());
        }));
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamTaskInput
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
        for (Map.Entry entry : this.recordDeserializers.entrySet()) {
            try {
                channelStateWriter.addInputData(j, (InputChannelInfo) entry.getKey(), -2, ((SpillingAdaptiveSpanningRecordDeserializer) entry.getValue()).getUnconsumedBuffer());
            } catch (IOException e) {
                throw new CheckpointException(CheckpointFailureReason.EXCEPTION, e);
            }
        }
        return this.checkpointedInputGate.getAllBarriersReceivedFuture(j);
    }

    @Override // org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        this.checkpointedInputGate.close();
    }
}
