package org.apache.flink.runtime.operators.sort;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/sort/ExternalSorter.class */
public class ExternalSorter<E> implements Sorter<E> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExternalSorter.class);
    private final StageRunner readThread;
    private final StageRunner sortThread;
    private final StageRunner spillThread;
    private final List<MemorySegment> sortReadMemory;
    private final List<MemorySegment> writeMemory;
    private final MemoryManager memoryManager;
    private final LargeRecordHandler<E> largeRecordHandler;
    private final SpillChannelManager spillChannelManager;
    private final CircularQueues<E> queues;
    private volatile boolean closed;
    private final Collection<InMemorySorter<E>> inMemorySorters;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ExternalSorter(@Nullable StageRunner stageRunner, StageRunner stageRunner2, StageRunner stageRunner3, List<MemorySegment> list, List<MemorySegment> list2, MemoryManager memoryManager, @Nullable LargeRecordHandler<E> largeRecordHandler, SpillChannelManager spillChannelManager, Collection<InMemorySorter<E>> collection, CircularQueues<E> circularQueues) {
        this.readThread = stageRunner;
        this.sortThread = (StageRunner) Preconditions.checkNotNull(stageRunner2);
        this.spillThread = (StageRunner) Preconditions.checkNotNull(stageRunner3);
        this.sortReadMemory = (List) Preconditions.checkNotNull(list);
        this.writeMemory = (List) Preconditions.checkNotNull(list2);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.largeRecordHandler = largeRecordHandler;
        this.spillChannelManager = (SpillChannelManager) Preconditions.checkNotNull(spillChannelManager);
        this.inMemorySorters = (Collection) Preconditions.checkNotNull(collection);
        this.queues = (CircularQueues) Preconditions.checkNotNull(circularQueues);
        this.queues.getIteratorFuture().whenComplete((mutableObjectIterator, th) -> {
            if (th != null) {
                close();
            }
        });
        startThreads();
    }

    private void startThreads() {
        if (this.readThread != null) {
            this.readThread.start();
        }
        this.sortThread.start();
        this.spillThread.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            try {
                if (this.readThread != null) {
                    closeThread(this.readThread, "reader");
                }
                closeThread(this.sortThread, "sorter");
                closeThread(this.spillThread, "spilling");
                this.queues.close();
                Iterator<InMemorySorter<E>> it = this.inMemorySorters.iterator();
                while (it.hasNext()) {
                    it.next().dispose();
                }
                try {
                    if (!this.writeMemory.isEmpty()) {
                        this.memoryManager.release(this.writeMemory);
                    }
                    this.writeMemory.clear();
                } catch (Throwable th) {
                }
                try {
                    if (!this.sortReadMemory.isEmpty()) {
                        this.memoryManager.release(this.sortReadMemory);
                    }
                    this.sortReadMemory.clear();
                } catch (Throwable th2) {
                }
                this.spillChannelManager.close();
                try {
                    if (this.largeRecordHandler != null) {
                        this.largeRecordHandler.close();
                    }
                } catch (Throwable th3) {
                }
            } catch (Throwable th4) {
                this.queues.close();
                Iterator<InMemorySorter<E>> it2 = this.inMemorySorters.iterator();
                while (it2.hasNext()) {
                    it2.next().dispose();
                }
                try {
                    if (!this.writeMemory.isEmpty()) {
                        this.memoryManager.release(this.writeMemory);
                    }
                    this.writeMemory.clear();
                } catch (Throwable th5) {
                }
                try {
                    if (!this.sortReadMemory.isEmpty()) {
                        this.memoryManager.release(this.sortReadMemory);
                    }
                    this.sortReadMemory.clear();
                } catch (Throwable th6) {
                }
                this.spillChannelManager.close();
                try {
                    if (this.largeRecordHandler != null) {
                        this.largeRecordHandler.close();
                    }
                } catch (Throwable th7) {
                }
                throw th4;
            }
        }
    }

    private void closeThread(StageRunner stageRunner, String str) {
        try {
            stageRunner.close();
        } catch (InterruptedException e) {
            LOG.debug(String.format("Closing of %s was interrupted. The %s thread may still be working.", str, str), (Throwable) e);
        } catch (Throwable th) {
            LOG.error(String.format("Error shutting down %s thread: %s", str, th.getMessage()), th);
        }
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Override // org.apache.flink.runtime.operators.util.CloseableInputProvider
    public MutableObjectIterator<E> getIterator() throws InterruptedException {
        try {
            return this.queues.getIteratorFuture().exceptionally(th -> {
                throw new RuntimeException("Error obtaining the sorted input: " + th.getMessage(), th);
            }).get();
        } catch (ExecutionException e) {
            close();
            throw new WrappingRuntimeException(e);
        }
    }

    public static <E> ExternalSorterBuilder<E> newBuilder(MemoryManager memoryManager, AbstractInvokable abstractInvokable, TypeSerializer<E> typeSerializer, TypeComparator<E> typeComparator) {
        return new ExternalSorterBuilder<>((MemoryManager) Preconditions.checkNotNull(memoryManager), (AbstractInvokable) Preconditions.checkNotNull(abstractInvokable), (TypeSerializer) Preconditions.checkNotNull(typeSerializer), (TypeComparator) Preconditions.checkNotNull(typeComparator));
    }
}
