/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.distributedlog.impl;

import io.atomix.core.Atomix;
import io.atomix.primitive.partition.Partitioner;
import io.atomix.primitive.protocol.ProxyProtocol;
import io.atomix.protocols.raft.MultiRaftProtocol;
import io.zeebe.distributedlog.DistributedLogstream;
import io.zeebe.distributedlog.DistributedLogstreamBuilder;
import io.zeebe.distributedlog.DistributedLogstreamType;
import io.zeebe.distributedlog.impl.DistributedLogstreamName;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.util.ZbLogger;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;

public class DistributedLogstreamPartition
implements Service<DistributedLogstreamPartition> {
    private static final Logger LOG = new ZbLogger(DistributedLogstreamPartition.class);
    private static final MultiRaftProtocol PROTOCOL = MultiRaftProtocol.builder().withPartitioner((Partitioner)DistributedLogstreamName.getInstance()).build();
    private final int partitionId;
    private final String partitionName;
    private final String primitiveName;
    private final long currentLeaderTerm;
    private final Injector<Atomix> atomixInjector = new Injector();
    private DistributedLogstream distributedLog;
    private Atomix atomix;
    private String memberId;

    public DistributedLogstreamPartition(int partitionId, long leaderTerm) {
        this.partitionId = partitionId;
        this.currentLeaderTerm = leaderTerm;
        this.primitiveName = "distributed-log";
        this.partitionName = DistributedLogstreamName.getPartitionKey(partitionId);
    }

    public long append(byte[] blockBuffer, long commitPosition) {
        return this.distributedLog.append(this.partitionName, this.memberId, commitPosition, blockBuffer);
    }

    public CompletableFuture<Long> asyncAppend(byte[] blockBuffer, long commitPosition) {
        return this.distributedLog.async().append(this.partitionName, this.memberId, commitPosition, blockBuffer);
    }

    public CompletableFuture<Boolean> claimLeaderShip() {
        return this.distributedLog.async().claimLeaderShip(this.partitionName, this.memberId, this.currentLeaderTerm);
    }

    public void start(ServiceStartContext startContext) {
        CompletableActorFuture startFuture = new CompletableActorFuture();
        this.atomix = (Atomix)this.atomixInjector.getValue();
        this.memberId = (String)((Object)this.atomix.getMembershipService().getLocalMember().id().id());
        startContext.async((ActorFuture)startFuture, true);
        this.tryStart((CompletableActorFuture<Void>)startFuture);
    }

    public void stop(ServiceStopContext stopContext) {
        this.distributedLog.async().close();
    }

    public DistributedLogstreamPartition get() {
        return this;
    }

    public Injector<Atomix> getAtomixInjector() {
        return this.atomixInjector;
    }

    private void tryStart(CompletableActorFuture<Void> startFuture) {
        CompletionStage<Boolean> leadershipClaim = this.distributedLog == null ? this.buildPrimitiveAsync().thenCompose(this::onPrimitiveBuilt) : this.onPrimitiveBuilt(this.distributedLog);
        leadershipClaim.whenComplete((nothing, error) -> this.onLeadershipClaimed(startFuture, (Throwable)error));
    }

    private void onLeadershipClaimed(CompletableActorFuture<Void> startFuture, Throwable error) {
        if (error == null) {
            LOG.debug("Partition {} for node {} claimed leadership", (Object)this.partitionId, (Object)this.memberId);
            startFuture.complete(null);
        }
        if (error != null) {
            LOG.error("Partition {} for node {} failed to start, retrying.", new Object[]{this.partitionId, this.memberId, error});
            this.tryStart(startFuture);
        }
    }

    private CompletableFuture<Boolean> onPrimitiveBuilt(DistributedLogstream distributedLog) {
        this.distributedLog = distributedLog;
        return this.claimLeaderShip();
    }

    private CompletableFuture<DistributedLogstream> buildPrimitiveAsync() {
        return ((DistributedLogstreamBuilder)((DistributedLogstreamBuilder)this.atomix.primitiveBuilder(this.primitiveName, DistributedLogstreamType.instance())).withProtocol((ProxyProtocol)PROTOCOL)).buildAsync();
    }
}

