package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.class */
public class DeclarativeSlotPoolBridge extends DeclarativeSlotPoolService implements SlotPool {
    private final Map<SlotRequestId, PendingRequest> pendingRequests;
    private final Map<SlotRequestId, AllocationID> fulfilledRequests;
    private final Time idleSlotTimeout;

    @Nullable
    private ComponentMainThreadExecutor componentMainThreadExecutor;
    private final Time batchSlotTimeout;
    private boolean isBatchSlotRequestTimeoutCheckDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge$PendingRequest.class */
    public static final class PendingRequest {
        private final SlotRequestId slotRequestId;
        private final ResourceProfile resourceProfile;
        private final boolean isBatchRequest;
        private final CompletableFuture<PhysicalSlot> slotFuture = new CompletableFuture<>();
        private long unfulfillableSince = CheckpointOptions.NO_ALIGNMENT_TIME_OUT;

        private PendingRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile, boolean z) {
            this.slotRequestId = slotRequestId;
            this.resourceProfile = resourceProfile;
            this.isBatchRequest = z;
        }

        static PendingRequest createBatchRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            return new PendingRequest(slotRequestId, resourceProfile, true);
        }

        static PendingRequest createNormalRequest(SlotRequestId slotRequestId, ResourceProfile resourceProfile) {
            return new PendingRequest(slotRequestId, resourceProfile, false);
        }

        SlotRequestId getSlotRequestId() {
            return this.slotRequestId;
        }

        ResourceProfile getResourceProfile() {
            return this.resourceProfile;
        }

        CompletableFuture<PhysicalSlot> getSlotFuture() {
            return this.slotFuture;
        }

        void failRequest(Exception exc) {
            this.slotFuture.completeExceptionally(exc);
        }

        public boolean isBatchRequest() {
            return this.isBatchRequest;
        }

        public void markFulfillable() {
            this.unfulfillableSince = CheckpointOptions.NO_ALIGNMENT_TIME_OUT;
        }

        public void markUnfulfillable(long j) {
            this.unfulfillableSince = j;
        }

        public long getUnfulfillableSince() {
            return this.unfulfillableSince;
        }

        public boolean fulfill(PhysicalSlot physicalSlot) {
            return this.slotFuture.complete(physicalSlot);
        }

        public String toString() {
            return "PendingRequest{slotRequestId=" + this.slotRequestId + ", resourceProfile=" + this.resourceProfile + ", isBatchRequest=" + this.isBatchRequest + ", unfulfillableSince=" + this.unfulfillableSince + '}';
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge$PendingRequestSlotMatching.class */
    private static final class PendingRequestSlotMatching {
        private final PendingRequest pendingRequest;
        private final PhysicalSlot matchedSlot;

        private PendingRequestSlotMatching(PendingRequest pendingRequest, PhysicalSlot physicalSlot) {
            this.pendingRequest = pendingRequest;
            this.matchedSlot = physicalSlot;
        }

        public static PendingRequestSlotMatching createFor(PendingRequest pendingRequest, PhysicalSlot physicalSlot) {
            return new PendingRequestSlotMatching(pendingRequest, physicalSlot);
        }

        public void fulfillPendingRequest() {
            Preconditions.checkState(this.pendingRequest.fulfill(this.matchedSlot), "Pending requests must be fulfillable.");
        }
    }

    public DeclarativeSlotPoolBridge(JobID jobID, DeclarativeSlotPoolFactory declarativeSlotPoolFactory, Clock clock, Time time, Time time2, Time time3) {
        super(jobID, declarativeSlotPoolFactory, clock, time2, time);
        this.idleSlotTimeout = time2;
        this.batchSlotTimeout = (Time) Preconditions.checkNotNull(time3);
        this.isBatchSlotRequestTimeoutCheckDisabled = false;
        this.pendingRequests = new LinkedHashMap();
        this.fulfilledRequests = new HashMap();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService, org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService
    public <T> Optional<T> castInto(Class<T> cls) {
        return cls.isAssignableFrom(getClass()) ? Optional.of(cls.cast(this)) : Optional.empty();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService
    protected void onStart(ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.componentMainThreadExecutor = componentMainThreadExecutor;
        getDeclarativeSlotPool().registerNewSlotsListener(this::newSlotsAreAvailable);
        componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, this.idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, this.batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService
    protected void onClose() {
        cancelPendingRequests(pendingRequest -> {
            return true;
        }, new FlinkException("Closing slot pool"));
    }

    private void cancelPendingRequests(Predicate<PendingRequest> predicate, FlinkException flinkException) {
        ResourceCounter empty = ResourceCounter.empty();
        ArrayList<PendingRequest> arrayList = new ArrayList(this.pendingRequests.values());
        this.pendingRequests.clear();
        for (PendingRequest pendingRequest : arrayList) {
            if (predicate.test(pendingRequest)) {
                pendingRequest.failRequest(flinkException);
                empty = empty.add(pendingRequest.getResourceProfile(), 1);
            } else {
                this.pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
            }
        }
        getDeclarativeSlotPool().decreaseResourceRequirementsBy(empty);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService
    protected void onReleaseTaskManager(ResourceCounter resourceCounter) {
        getDeclarativeSlotPool().decreaseResourceRequirementsBy(resourceCounter);
    }

    @VisibleForTesting
    void newSlotsAreAvailable(Collection<? extends PhysicalSlot> collection) {
        ArrayList arrayList = new ArrayList();
        for (PhysicalSlot physicalSlot : collection) {
            findMatchingPendingRequest(physicalSlot).ifPresent(pendingRequest -> {
                Preconditions.checkNotNull(this.pendingRequests.remove(pendingRequest.getSlotRequestId()), "Cannot fulfill a non existing pending slot request.");
                reserveFreeSlot(pendingRequest.getSlotRequestId(), physicalSlot.getAllocationId(), pendingRequest.resourceProfile);
                arrayList.add(PendingRequestSlotMatching.createFor(pendingRequest, physicalSlot));
            });
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((PendingRequestSlotMatching) it.next()).fulfillPendingRequest();
        }
    }

    private void reserveFreeSlot(SlotRequestId slotRequestId, AllocationID allocationID, ResourceProfile resourceProfile) {
        this.log.debug("Reserve slot {} for slot request id {}", allocationID, slotRequestId);
        getDeclarativeSlotPool().reserveFreeSlot(allocationID, resourceProfile);
        this.fulfilledRequests.put(slotRequestId, allocationID);
    }

    private Optional<PendingRequest> findMatchingPendingRequest(PhysicalSlot physicalSlot) {
        ResourceProfile resourceProfile = physicalSlot.getResourceProfile();
        for (PendingRequest pendingRequest : this.pendingRequests.values()) {
            if (resourceProfile.isMatching(pendingRequest.getResourceProfile())) {
                this.log.debug("Matched slot {} to pending request {}.", physicalSlot, pendingRequest);
                return Optional.of(pendingRequest);
            }
        }
        this.log.debug("Could not match slot {} to any pending request.", physicalSlot);
        return Optional.empty();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    public Optional<PhysicalSlot> allocateAvailableSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull AllocationID allocationID, @Nonnull ResourceProfile resourceProfile) {
        assertRunningInMainThread();
        Preconditions.checkNotNull(resourceProfile, "The requiredSlotProfile must not be null.");
        this.log.debug("Reserving free slot {} for slot request id {} and profile {}.", allocationID, slotRequestId, resourceProfile);
        return Optional.of(reserveFreeSlotForResource(slotRequestId, allocationID, resourceProfile));
    }

    private PhysicalSlot reserveFreeSlotForResource(SlotRequestId slotRequestId, AllocationID allocationID, ResourceProfile resourceProfile) {
        getDeclarativeSlotPool().increaseResourceRequirementsBy(ResourceCounter.withResource(resourceProfile, 1));
        PhysicalSlot reserveFreeSlot = getDeclarativeSlotPool().reserveFreeSlot(allocationID, resourceProfile);
        this.fulfilledRequests.put(slotRequestId, allocationID);
        return reserveFreeSlot;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile, @Nullable Time time) {
        assertRunningInMainThread();
        this.log.debug("Request new allocated slot with slot request id {} and resource profile {}", slotRequestId, resourceProfile);
        return internalRequestNewSlot(PendingRequest.createNormalRequest(slotRequestId, resourceProfile), time);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    @Nonnull
    public CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull ResourceProfile resourceProfile) {
        assertRunningInMainThread();
        this.log.debug("Request new allocated batch slot with slot request id {} and resource profile {}", slotRequestId, resourceProfile);
        return internalRequestNewSlot(PendingRequest.createBatchRequest(slotRequestId, resourceProfile), null);
    }

    private CompletableFuture<PhysicalSlot> internalRequestNewSlot(PendingRequest pendingRequest, @Nullable Time time) {
        internalRequestNewAllocatedSlot(pendingRequest);
        return time == null ? pendingRequest.getSlotFuture() : FutureUtils.orTimeout(pendingRequest.getSlotFuture(), time.toMilliseconds(), TimeUnit.MILLISECONDS, this.componentMainThreadExecutor).whenComplete((physicalSlot, th) -> {
            if (th instanceof TimeoutException) {
                timeoutPendingSlotRequest(pendingRequest.getSlotRequestId());
            }
        });
    }

    private void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
        releaseSlot(slotRequestId, new TimeoutException("Pending slot request timed out in slot pool."));
    }

    private void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) {
        this.pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
        getDeclarativeSlotPool().increaseResourceRequirementsBy(ResourceCounter.withResource(pendingRequest.getResourceProfile(), 1));
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    public Optional<ResourceID> failAllocation(AllocationID allocationID, Exception exc) {
        throw new UnsupportedOperationException("Please call failAllocation(ResourceID, AllocationID, Exception)");
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService
    protected void onFailAllocation(ResourceCounter resourceCounter) {
        getDeclarativeSlotPool().decreaseResourceRequirementsBy(resourceCounter);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlotActions
    public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable th) {
        this.log.debug("Release slot with slot request id {}", slotRequestId);
        assertRunningInMainThread();
        PendingRequest remove = this.pendingRequests.remove(slotRequestId);
        if (remove != null) {
            getDeclarativeSlotPool().decreaseResourceRequirementsBy(ResourceCounter.withResource(remove.getResourceProfile(), 1));
            remove.failRequest(new FlinkException(String.format("Pending slot request with %s has been released.", remove.getSlotRequestId()), th));
            return;
        }
        AllocationID remove2 = this.fulfilledRequests.remove(slotRequestId);
        if (remove2 != null) {
            getDeclarativeSlotPool().decreaseResourceRequirementsBy(getDeclarativeSlotPool().freeReservedSlot(remove2, th, getRelativeTimeMillis()));
        } else {
            this.log.debug("Could not find slot which has fulfilled slot request {}. Ignoring the release operation.", slotRequestId);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService
    public void notifyNotEnoughResourcesAvailable(Collection<ResourceRequirement> collection) {
        assertRunningInMainThread();
        failPendingRequests(collection);
    }

    private void failPendingRequests(Collection<ResourceRequirement> collection) {
        Predicate<PendingRequest> predicate = pendingRequest -> {
            return (this.isBatchSlotRequestTimeoutCheckDisabled && pendingRequest.isBatchRequest()) ? false : true;
        };
        if (this.pendingRequests.values().stream().anyMatch(predicate)) {
            this.log.warn("Could not acquire the minimum required resources, failing slot requests. Acquired: {}. Current slot pool status: {}", collection, getSlotServiceStatus());
            cancelPendingRequests(predicate, new NoResourceAvailableException("Could not acquire the minimum required resources."));
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    public Collection<SlotInfo> getAllocatedSlotsInformation() {
        assertRunningInMainThread();
        Collection<? extends SlotInfo> allSlotsInformation = getDeclarativeSlotPool().getAllSlotsInformation();
        Set set = (Set) getDeclarativeSlotPool().getFreeSlotsInformation().stream().map((v0) -> {
            return v0.getAllocationId();
        }).collect(Collectors.toSet());
        return (Collection) allSlotsInformation.stream().filter(slotInfo -> {
            return !set.contains(slotInfo.getAllocationId());
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    @Nonnull
    public Collection<SlotInfoWithUtilization> getAvailableSlotsInformation() {
        assertRunningInMainThread();
        return getDeclarativeSlotPool().getFreeSlotsInformation();
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotPool
    public void disableBatchSlotRequestTimeoutCheck() {
        this.isBatchSlotRequestTimeoutCheckDisabled = true;
    }

    private void assertRunningInMainThread() {
        if (this.componentMainThreadExecutor == null) {
            throw new IllegalStateException("The FutureSlotPool has not been started yet.");
        }
        this.componentMainThreadExecutor.assertRunningInMainThread();
    }

    private void checkIdleSlotTimeout() {
        getDeclarativeSlotPool().releaseIdleSlots(getRelativeTimeMillis());
        if (this.componentMainThreadExecutor != null) {
            this.componentMainThreadExecutor.schedule(this::checkIdleSlotTimeout, this.idleSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    private void checkBatchSlotTimeout() {
        if (this.isBatchSlotRequestTimeoutCheckDisabled) {
            return;
        }
        Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();
        if (!pendingBatchRequests.isEmpty()) {
            Map map = (Map) pendingBatchRequests.stream().collect(Collectors.partitioningBy(canBeFulfilledWithAnySlot(getResourceProfilesFromAllSlots())));
            List list = (List) map.get(true);
            List<PendingRequest> list2 = (List) map.get(false);
            long relativeTimeMillis = getRelativeTimeMillis();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((PendingRequest) it.next()).markFulfillable();
            }
            for (PendingRequest pendingRequest : list2) {
                pendingRequest.markUnfulfillable(relativeTimeMillis);
                if (pendingRequest.getUnfulfillableSince() + this.batchSlotTimeout.toMilliseconds() <= relativeTimeMillis) {
                    timeoutPendingSlotRequest(pendingRequest.getSlotRequestId());
                }
            }
        }
        if (this.componentMainThreadExecutor != null) {
            this.componentMainThreadExecutor.schedule(this::checkBatchSlotTimeout, this.batchSlotTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        }
    }

    private Set<ResourceProfile> getResourceProfilesFromAllSlots() {
        return (Set) Stream.concat(getAvailableSlotsInformation().stream(), getAllocatedSlotsInformation().stream()).map((v0) -> {
            return v0.getResourceProfile();
        }).collect(Collectors.toSet());
    }

    private Collection<PendingRequest> getPendingBatchRequests() {
        return (Collection) this.pendingRequests.values().stream().filter((v0) -> {
            return v0.isBatchRequest();
        }).collect(Collectors.toList());
    }

    private static Predicate<PendingRequest> canBeFulfilledWithAnySlot(Set<ResourceProfile> set) {
        return pendingRequest -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                if (((ResourceProfile) it.next()).isMatching(pendingRequest.getResourceProfile())) {
                    return true;
                }
            }
            return false;
        };
    }
}
