/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.logstreams.impl;

import io.zeebe.dispatcher.BlockPeek;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.distributedlog.impl.DistributedLogstreamPartition;
import io.zeebe.logstreams.impl.LoggedEventImpl;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.channel.ConsumableChannel;
import io.zeebe.util.sched.future.ActorFuture;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

public class LogStorageAppender
extends Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final AtomicBoolean isFailed = new AtomicBoolean(false);
    private final BlockPeek blockPeek = new BlockPeek();
    private byte[] bytesToAppend;
    private long commitPosition;
    private final String name;
    private final Subscription writeBufferSubscription;
    private final Runnable peekedBlockHandler = this::appendBlock;
    private final int maxAppendBlockSize;
    private final DistributedLogstreamPartition distributedLog;

    public LogStorageAppender(String name, DistributedLogstreamPartition distributedLog, Subscription writeBufferSubscription, int maxBlockSize) {
        this.name = name;
        this.distributedLog = distributedLog;
        this.writeBufferSubscription = writeBufferSubscription;
        this.maxAppendBlockSize = maxBlockSize;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.actor.consume((ConsumableChannel)this.writeBufferSubscription, this::peekBlock);
    }

    private void peekBlock() {
        if (this.writeBufferSubscription.peekBlock(this.blockPeek, this.maxAppendBlockSize, true) > 0) {
            this.peekedBlockHandler.run();
        } else {
            this.actor.yield();
        }
    }

    private void appendBlock() {
        ByteBuffer rawBuffer = this.blockPeek.getRawBuffer();
        this.bytesToAppend = new byte[rawBuffer.remaining()];
        rawBuffer.get(this.bytesToAppend);
        this.commitPosition = this.getLastEventPosition(this.bytesToAppend);
        this.actor.runUntilDone(this::tryWrite);
    }

    private void tryWrite() {
        this.distributedLog.asyncAppend(this.bytesToAppend, this.commitPosition);
        this.blockPeek.markCompleted();
        this.actor.done();
    }

    private long getLastEventPosition(byte[] buffer) {
        int bufferOffset = 0;
        UnsafeBuffer directBuffer = new UnsafeBuffer(0L, 0);
        directBuffer.wrap(buffer);
        long lastEventPosition = -1L;
        LoggedEventImpl nextEvent = new LoggedEventImpl();
        int remaining = buffer.length - bufferOffset;
        while (remaining > 0) {
            nextEvent.wrap((DirectBuffer)directBuffer, bufferOffset);
            lastEventPosition = nextEvent.getPosition();
            remaining = buffer.length - (bufferOffset += nextEvent.getFragmentLength());
        }
        return lastEventPosition;
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    public boolean isFailed() {
        return this.isFailed.get();
    }

    public long getCurrentAppenderPosition() {
        return this.writeBufferSubscription.getPosition();
    }
}

