/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.client.impl.worker;

import io.grpc.stub.StreamObserver;
import io.zeebe.client.api.response.ActivatedJob;
import io.zeebe.client.impl.Loggers;
import io.zeebe.client.impl.ZeebeObjectMapper;
import io.zeebe.client.impl.response.ActivatedJobImpl;
import io.zeebe.gateway.protocol.GatewayGrpc;
import io.zeebe.gateway.protocol.GatewayOuterClass;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;

public class JobPoller
implements StreamObserver<GatewayOuterClass.ActivateJobsResponse> {
    private static final Logger LOG = Loggers.JOB_POLLER_LOGGER;
    private final GatewayGrpc.GatewayStub gatewayStub;
    private final GatewayOuterClass.ActivateJobsRequest.Builder requestBuilder;
    private final ZeebeObjectMapper objectMapper;
    private final long requestTimeout;
    private Consumer<ActivatedJob> jobConsumer;
    private Consumer<Integer> doneCallback;
    private int activatedJobs;

    public JobPoller(GatewayGrpc.GatewayStub gatewayStub, GatewayOuterClass.ActivateJobsRequest.Builder requestBuilder, ZeebeObjectMapper objectMapper, Duration requestTimeout) {
        this.gatewayStub = gatewayStub;
        this.requestBuilder = requestBuilder;
        this.objectMapper = objectMapper;
        this.requestTimeout = requestTimeout.toMillis();
    }

    private void reset() {
        this.activatedJobs = 0;
    }

    public void poll(int maxJobsToActivate, Consumer<ActivatedJob> jobConsumer, Consumer<Integer> doneCallback) {
        this.reset();
        this.requestBuilder.setMaxJobsToActivate(maxJobsToActivate);
        this.jobConsumer = jobConsumer;
        this.doneCallback = doneCallback;
        this.poll();
    }

    private void poll() {
        LOG.trace("Polling at max {} jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getMaxJobsToActivate(), this.requestBuilder.getWorker(), this.requestBuilder.getType()});
        ((GatewayGrpc.GatewayStub)this.gatewayStub.withDeadlineAfter(this.requestTimeout, TimeUnit.MILLISECONDS)).activateJobs(this.requestBuilder.build(), (StreamObserver)this);
    }

    public void onNext(GatewayOuterClass.ActivateJobsResponse activateJobsResponse) {
        this.activatedJobs += activateJobsResponse.getJobsCount();
        activateJobsResponse.getJobsList().stream().map(job -> new ActivatedJobImpl(this.objectMapper, (GatewayOuterClass.ActivatedJob)job)).forEach(this.jobConsumer);
    }

    public void onError(Throwable throwable) {
        LOG.warn("Failed to activated jobs for worker {} and job type {}", new Object[]{this.requestBuilder.getWorker(), this.requestBuilder.getType(), throwable});
        this.pollingDone();
    }

    public void onCompleted() {
        this.pollingDone();
    }

    private void pollingDone() {
        if (this.activatedJobs > 0) {
            LOG.info("Activated {} jobs for worker {} and job type {}", new Object[]{this.activatedJobs, this.requestBuilder.getWorker(), this.requestBuilder.getType()});
        } else {
            LOG.trace("No jobs activated for worker {} and job type {}", (Object)this.requestBuilder.getWorker(), (Object)this.requestBuilder.getType());
        }
        this.doneCallback.accept(this.activatedJobs);
    }
}

