package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.io.Closeable;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.metrics.TimerGauge;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.class */
public class MailboxProcessor implements Closeable {
    private static final Logger LOG;
    protected final TaskMailbox mailbox;
    protected final MailboxDefaultAction mailboxDefaultAction;
    private boolean mailboxLoopRunning;
    private boolean suspended;
    private DefaultActionSuspension suspendedDefaultAction;
    private final StreamTaskActionExecutor actionExecutor;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor$DefaultActionSuspension.class */
    public final class DefaultActionSuspension implements MailboxDefaultAction.Suspension {

        @Nullable
        private final TimerGauge suspensionTimer;

        public DefaultActionSuspension(@Nullable TimerGauge timerGauge) {
            this.suspensionTimer = timerGauge;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension
        public void resume() {
            if (MailboxProcessor.this.mailbox.isMailboxThread()) {
                resumeInternal();
            } else {
                try {
                    MailboxProcessor.this.sendControlMail(this::resumeInternal, "resume default action", new Object[0]);
                } catch (TaskMailbox.MailboxClosedException e) {
                }
            }
        }

        private void resumeInternal() {
            if (MailboxProcessor.this.suspendedDefaultAction == this) {
                MailboxProcessor.this.suspendedDefaultAction = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor$MailboxController.class */
    public static final class MailboxController implements MailboxDefaultAction.Controller {
        private final MailboxProcessor mailboxProcessor;

        protected MailboxController(MailboxProcessor mailboxProcessor) {
            this.mailboxProcessor = mailboxProcessor;
        }

        @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Controller
        public void allActionsCompleted() {
            this.mailboxProcessor.allActionsCompleted();
        }

        @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Controller
        public MailboxDefaultAction.Suspension suspendDefaultAction(TimerGauge timerGauge) {
            return this.mailboxProcessor.suspendDefaultAction(timerGauge);
        }

        @Override // org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Controller
        public MailboxDefaultAction.Suspension suspendDefaultAction() {
            return this.mailboxProcessor.suspendDefaultAction(null);
        }
    }

    @VisibleForTesting
    public MailboxProcessor() {
        this((v0) -> {
            v0.suspendDefaultAction();
        });
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
        this(mailboxDefaultAction, StreamTaskActionExecutor.IMMEDIATE);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, StreamTaskActionExecutor streamTaskActionExecutor) {
        this(mailboxDefaultAction, new TaskMailboxImpl(Thread.currentThread()), streamTaskActionExecutor);
    }

    public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox taskMailbox, StreamTaskActionExecutor streamTaskActionExecutor) {
        this.mailboxDefaultAction = (MailboxDefaultAction) Preconditions.checkNotNull(mailboxDefaultAction);
        this.actionExecutor = (StreamTaskActionExecutor) Preconditions.checkNotNull(streamTaskActionExecutor);
        this.mailbox = (TaskMailbox) Preconditions.checkNotNull(taskMailbox);
        this.mailboxLoopRunning = true;
        this.suspendedDefaultAction = null;
    }

    public MailboxExecutor getMainMailboxExecutor() {
        return new MailboxExecutorImpl(this.mailbox, -1, this.actionExecutor);
    }

    public MailboxExecutor getMailboxExecutor(int i) {
        return new MailboxExecutorImpl(this.mailbox, i, this.actionExecutor, this);
    }

    public void prepareClose() {
        this.mailbox.quiesce();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        List<Mail> close = this.mailbox.close();
        if (close.isEmpty()) {
            return;
        }
        LOG.debug("Closing the mailbox dropped mails {}.", close);
        Optional empty = Optional.empty();
        Iterator<Mail> it = close.iterator();
        while (it.hasNext()) {
            try {
                it.next().tryCancel(false);
            } catch (RuntimeException e) {
                empty = Optional.of(ExceptionUtils.firstOrSuppressed(e, (Throwable) empty.orElse(null)));
            }
        }
        empty.ifPresent(runtimeException -> {
            throw runtimeException;
        });
    }

    public void drain() throws Exception {
        Iterator<Mail> it = this.mailbox.drain().iterator();
        while (it.hasNext()) {
            it.next().run();
        }
    }

    public void runMailboxLoop() throws Exception {
        this.suspended = !this.mailboxLoopRunning;
        TaskMailbox taskMailbox = this.mailbox;
        Preconditions.checkState(taskMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!");
        if (!$assertionsDisabled && taskMailbox.getState() != TaskMailbox.State.OPEN) {
            throw new AssertionError("Mailbox must be opened!");
        }
        MailboxController mailboxController = new MailboxController(this);
        while (isNextLoopPossible()) {
            processMail(taskMailbox, false);
            if (isNextLoopPossible()) {
                this.mailboxDefaultAction.runDefaultAction(mailboxController);
            }
        }
    }

    public void suspend() {
        sendPoisonMail(() -> {
            this.suspended = true;
        });
    }

    @VisibleForTesting
    public boolean runMailboxStep() throws Exception {
        this.suspended = !this.mailboxLoopRunning;
        if (processMail(this.mailbox, true)) {
            return true;
        }
        if (isDefaultActionUnavailable() || !isNextLoopPossible()) {
            return false;
        }
        this.mailboxDefaultAction.runDefaultAction(new MailboxController(this));
        return true;
    }

    public boolean isMailboxThread() {
        return this.mailbox.isMailboxThread();
    }

    public void reportThrowable(Throwable th) {
        sendControlMail(() -> {
            if (th instanceof Exception) {
                throw ((Exception) th);
            }
            if (!(th instanceof Error)) {
                throw WrappingRuntimeException.wrapIfNecessary(th);
            }
            throw ((Error) th);
        }, "Report throwable %s", th);
    }

    public void allActionsCompleted() {
        sendPoisonMail(() -> {
            this.mailboxLoopRunning = false;
            this.suspended = true;
        });
    }

    private void sendPoisonMail(RunnableWithException runnableWithException) {
        this.mailbox.runExclusively(() -> {
            if (this.mailbox.getState() == TaskMailbox.State.OPEN) {
                sendControlMail(runnableWithException, "poison mail", new Object[0]);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendControlMail(RunnableWithException runnableWithException, String str, Object... objArr) {
        this.mailbox.putFirst(new Mail(runnableWithException, Integer.MAX_VALUE, str, objArr));
    }

    private boolean processMail(TaskMailbox taskMailbox, boolean z) throws Exception {
        if (!taskMailbox.createBatch()) {
            return false;
        }
        boolean processMailsNonBlocking = processMailsNonBlocking(z);
        return z ? processMailsNonBlocking : processMailsNonBlocking | processMailsWhenDefaultActionUnavailable();
    }

    private boolean processMailsWhenDefaultActionUnavailable() throws Exception {
        boolean z;
        boolean z2 = false;
        while (true) {
            z = z2;
            if (!isDefaultActionUnavailable() || !isNextLoopPossible()) {
                break;
            }
            Optional<Mail> tryTake = this.mailbox.tryTake(-1);
            if (!tryTake.isPresent()) {
                tryTake = Optional.of(this.mailbox.take(-1));
            }
            maybePauseIdleTimer();
            tryTake.get().run();
            maybeRestartIdleTimer();
            z2 = true;
        }
        return z;
    }

    private boolean processMailsNonBlocking(boolean z) throws Exception {
        long j = 0;
        while (isNextLoopPossible()) {
            Optional<Mail> tryTakeFromBatch = this.mailbox.tryTakeFromBatch();
            if (!tryTakeFromBatch.isPresent()) {
                break;
            }
            long j2 = j;
            j = j2 + 1;
            if (j2 == 0) {
                maybePauseIdleTimer();
            }
            tryTakeFromBatch.get().run();
            if (z) {
                break;
            }
        }
        if (j <= 0) {
            return false;
        }
        maybeRestartIdleTimer();
        return true;
    }

    private void maybePauseIdleTimer() {
        if (this.suspendedDefaultAction == null || this.suspendedDefaultAction.suspensionTimer == null) {
            return;
        }
        this.suspendedDefaultAction.suspensionTimer.markEnd();
    }

    private void maybeRestartIdleTimer() {
        if (this.suspendedDefaultAction == null || this.suspendedDefaultAction.suspensionTimer == null) {
            return;
        }
        this.suspendedDefaultAction.suspensionTimer.markStart();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MailboxDefaultAction.Suspension suspendDefaultAction(@Nullable TimerGauge timerGauge) {
        Preconditions.checkState(this.mailbox.isMailboxThread(), "Suspending must only be called from the mailbox thread!");
        Preconditions.checkState(this.suspendedDefaultAction == null, "Default action has already been suspended");
        if (this.suspendedDefaultAction == null) {
            this.suspendedDefaultAction = new DefaultActionSuspension(timerGauge);
            ensureControlFlowSignalCheck();
        }
        return this.suspendedDefaultAction;
    }

    @VisibleForTesting
    public boolean isDefaultActionUnavailable() {
        return this.suspendedDefaultAction != null;
    }

    private boolean isNextLoopPossible() {
        return !this.suspended;
    }

    @VisibleForTesting
    public boolean isMailboxLoopRunning() {
        return this.mailboxLoopRunning;
    }

    @VisibleForTesting
    public boolean hasMail() {
        return this.mailbox.hasMail();
    }

    private void ensureControlFlowSignalCheck() {
        if (this.mailbox.hasMail()) {
            return;
        }
        sendControlMail(() -> {
        }, "signal check", new Object[0]);
    }

    static {
        $assertionsDisabled = !MailboxProcessor.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) MailboxProcessor.class);
    }
}
