package io.zeebe.broker.system.management.deployment;

import io.atomix.core.Atomix;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.engine.impl.DeploymentDistributorImpl;
import io.zeebe.clustering.management.MessageHeaderDecoder;
import io.zeebe.engine.processor.workflow.DeploymentResponder;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.logstreams.log.LogStreamWriterImpl;
import io.zeebe.msgpack.UnpackedObject;
import io.zeebe.protocol.impl.encoding.ErrorResponse;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.zeebe.protocol.record.ErrorCode;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.protocol.record.intent.DeploymentIntent;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.ActorControl;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/system/management/deployment/PushDeploymentRequestHandler.class */
public class PushDeploymentRequestHandler implements Function<byte[], CompletableFuture<byte[]>>, DeploymentResponder {
    private static final Logger LOG = Loggers.WORKFLOW_REPOSITORY_LOGGER;
    private final Int2ObjectHashMap<Partition> leaderPartitions;
    private final ActorControl actor;
    private final Atomix atomix;
    private final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    private final LogStreamRecordWriter logStreamWriter = new LogStreamWriterImpl();
    private final RecordMetadata recordMetadata = new RecordMetadata();
    private final PushDeploymentResponse deploymentResponse = new PushDeploymentResponse();

    public PushDeploymentRequestHandler(Int2ObjectHashMap<Partition> int2ObjectHashMap, ActorControl actorControl, Atomix atomix) {
        this.leaderPartitions = int2ObjectHashMap;
        this.actor = actorControl;
        this.atomix = atomix;
    }

    @Override // java.util.function.Function
    public CompletableFuture<byte[]> apply(byte[] bArr) {
        CompletableFuture<byte[]> completableFuture = new CompletableFuture<>();
        this.actor.call(() -> {
            DirectBuffer unsafeBuffer = new UnsafeBuffer(bArr);
            int capacity = unsafeBuffer.capacity();
            this.messageHeaderDecoder.m119wrap(unsafeBuffer, 0);
            int schemaId = this.messageHeaderDecoder.schemaId();
            if (5 != schemaId) {
                completableFuture.completeExceptionally(new RuntimeException(String.format("Expected to have schema id %d, but got %d.", 5, Integer.valueOf(schemaId))));
                return;
            }
            int templateId = this.messageHeaderDecoder.templateId();
            if (2 == templateId) {
                handleValidRequest(completableFuture, unsafeBuffer, 0, capacity);
            } else {
                completableFuture.completeExceptionally(new RuntimeException(String.format("Expected to have template id %d, but got %d.", 2, Integer.valueOf(templateId))));
            }
        });
        return completableFuture;
    }

    public void sendDeploymentResponse(long j, int i) {
        this.deploymentResponse.reset();
        this.deploymentResponse.deploymentKey(j).partitionId(i);
        String deploymentResponseTopic = DeploymentDistributorImpl.getDeploymentResponseTopic(j);
        this.atomix.getEventService().broadcast(deploymentResponseTopic, this.deploymentResponse.toBytes());
        LOG.trace("Send deployment response on topic {}", deploymentResponseTopic);
    }

    private void handleValidRequest(CompletableFuture<byte[]> completableFuture, DirectBuffer directBuffer, int i, int i2) {
        PushDeploymentRequest pushDeploymentRequest = new PushDeploymentRequest();
        pushDeploymentRequest.wrap(directBuffer, i, i2);
        long deploymentKey = pushDeploymentRequest.deploymentKey();
        int partitionId = pushDeploymentRequest.partitionId();
        DirectBuffer deployment = pushDeploymentRequest.deployment();
        if (((Partition) this.leaderPartitions.get(partitionId)) != null) {
            LOG.debug("Handling deployment {} for partition {} as leader", Long.valueOf(deploymentKey), Integer.valueOf(partitionId));
            handlePushDeploymentRequest(completableFuture, deployment, deploymentKey, partitionId);
        } else {
            LOG.debug("Rejecting deployment {} for partition {} as not leader", Long.valueOf(deploymentKey), Integer.valueOf(partitionId));
            sendNotLeaderRejection(completableFuture, partitionId);
        }
    }

    private void handlePushDeploymentRequest(CompletableFuture<byte[]> completableFuture, DirectBuffer directBuffer, long j, int i) {
        DeploymentRecord deploymentRecord = new DeploymentRecord();
        deploymentRecord.wrap(directBuffer);
        this.actor.runUntilDone(() -> {
            Partition partition = (Partition) this.leaderPartitions.get(i);
            if (partition == null) {
                LOG.debug("Leader change on partition {}, ignore push deployment request", Integer.valueOf(i));
                this.actor.done();
            } else {
                if (!writeCreatingDeployment(partition, j, deploymentRecord)) {
                    this.actor.yield();
                    return;
                }
                LOG.debug("Deployment CREATE command was written on partition {}", Integer.valueOf(i));
                this.actor.done();
                sendResponse(completableFuture, j, i);
            }
        });
    }

    private void sendResponse(CompletableFuture<byte[]> completableFuture, long j, int i) {
        PushDeploymentResponse pushDeploymentResponse = new PushDeploymentResponse();
        pushDeploymentResponse.deploymentKey(j);
        pushDeploymentResponse.partitionId(i);
        completableFuture.complete(pushDeploymentResponse.toBytes());
    }

    private void sendNotLeaderRejection(CompletableFuture<byte[]> completableFuture, int i) {
        ErrorResponse errorResponse = new ErrorResponse();
        errorResponse.setErrorCode(ErrorCode.PARTITION_LEADER_MISMATCH).setErrorData(BufferUtil.wrapString(String.format("Not leader of partition %d", Integer.valueOf(i))));
        completableFuture.complete(errorResponse.toBytes());
    }

    private boolean writeCreatingDeployment(Partition partition, long j, UnpackedObject unpackedObject) {
        RecordType recordType = RecordType.COMMAND;
        ValueType valueType = ValueType.DEPLOYMENT;
        DeploymentIntent deploymentIntent = DeploymentIntent.CREATE;
        this.logStreamWriter.wrap(partition.getLogStream());
        this.recordMetadata.reset().recordType(recordType).valueType(valueType).intent(deploymentIntent);
        return this.logStreamWriter.key(j).metadataWriter(this.recordMetadata).valueWriter(unpackedObject).tryWrite() > 0;
    }
}
