package io.zeebe.distributedlog.impl;

import io.atomix.primitive.service.AbstractPrimitiveService;
import io.atomix.primitive.service.BackupInput;
import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.primitive.service.impl.DefaultServiceExecutor;
import io.atomix.protocols.raft.impl.RaftContext;
import io.atomix.protocols.raft.service.RaftServiceContext;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.zeebe.distributedlog.DistributedLogstreamClient;
import io.zeebe.distributedlog.DistributedLogstreamService;
import io.zeebe.distributedlog.DistributedLogstreamType;
import io.zeebe.distributedlog.StorageConfiguration;
import io.zeebe.distributedlog.restore.RestoreClient;
import io.zeebe.distributedlog.restore.RestoreFactory;
import io.zeebe.distributedlog.restore.RestoreNodeProvider;
import io.zeebe.distributedlog.restore.impl.RestoreController;
import io.zeebe.distributedlog.restore.log.LogReplicationAppender;
import io.zeebe.distributedlog.restore.log.LogReplicator;
import io.zeebe.distributedlog.restore.snapshot.RestoreSnapshotReplicator;
import io.zeebe.distributedlog.restore.snapshot.SnapshotRestoreContext;
import io.zeebe.logstreams.LogStreams;
import io.zeebe.logstreams.impl.service.LogStreamServiceNames;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.logstreams.state.FileSnapshotConsumer;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.util.ZbLogger;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/zeebe/distributedlog/impl/DefaultDistributedLogstreamService.class */
public class DefaultDistributedLogstreamService extends AbstractPrimitiveService<DistributedLogstreamClient> implements DistributedLogstreamService, LogReplicationAppender {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDistributedLogstreamService.class);
    private LogStream logStream;
    private LogStorage logStorage;
    private String logName;
    private int partitionId;
    private String currentLeader;
    private long currentLeaderTerm;
    private long lastPosition;
    private ServiceContainer serviceContainer;
    private String localMemberId;
    private Logger logger;

    public DefaultDistributedLogstreamService() {
        super(DistributedLogstreamType.instance(), DistributedLogstreamClient.class);
        this.currentLeaderTerm = -1L;
        this.lastPosition = -1L;
    }

    protected void configure(ServiceExecutor serviceExecutor) {
        super.configure(serviceExecutor);
        this.localMemberId = (String) getLocalMemberId().id();
        try {
            this.logName = getRaftPartitionName(serviceExecutor);
            configureFromLogName(this.logName);
            this.logger.debug("Configuring {} on node {} with logName {}", new Object[]{getServiceName(), getLocalMemberId().id(), this.logName});
            this.logStream = getOrCreateLogStream(this.logName);
            this.logStorage = this.logStream.getLogStorage();
            initLastPosition();
            this.logger.debug("Configured with LogStream {} and last appended event at position {}", this.logName, Long.valueOf(this.lastPosition));
        } catch (Exception e) {
            LOG.error("Failed to configure {} on node {} with logName {}", new Object[]{getServiceName(), getLocalMemberId().id(), this.logName, e});
            throw e;
        }
    }

    private void configureFromLogName(String str) {
        this.partitionId = getPartitionIdFromLogName(str);
        this.logger = new ZbLogger(String.format("%s-%d", getClass().getName(), Integer.valueOf(this.partitionId)));
    }

    private int getPartitionIdFromLogName(String str) {
        String[] split = str.split("-");
        return Integer.valueOf(split[split.length - 1]).intValue();
    }

    private String getRaftPartitionName(ServiceExecutor serviceExecutor) {
        try {
            Field declaredField = DefaultServiceExecutor.class.getDeclaredField("context");
            declaredField.setAccessible(true);
            RaftServiceContext raftServiceContext = (RaftServiceContext) declaredField.get(serviceExecutor);
            Field declaredField2 = RaftServiceContext.class.getDeclaredField("raft");
            declaredField2.setAccessible(true);
            String name = ((RaftContext) declaredField2.get(raftServiceContext)).getName();
            declaredField2.setAccessible(false);
            declaredField.setAccessible(false);
            return name;
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }

    private LogStream getOrCreateLogStream(String str) {
        this.serviceContainer = LogstreamConfig.getServiceContainer(this.localMemberId);
        LogStream logStream = ((Boolean) this.serviceContainer.hasService(LogStreamServiceNames.logStreamServiceName(str)).join()).booleanValue() ? LogstreamConfig.getLogStream(this.localMemberId, this.partitionId) : createLogStream(str);
        LogstreamConfig.putLogStream(this.localMemberId, this.partitionId, logStream);
        return logStream;
    }

    private LogStream createLogStream(String str) {
        StorageConfiguration storageConfiguration = (StorageConfiguration) LogstreamConfig.getConfig(this.localMemberId, this.partitionId).join();
        return (LogStream) LogStreams.createFsLogStream(this.partitionId).logDirectory(storageConfiguration.getLogDirectory().getAbsolutePath()).logSegmentSize((int) storageConfiguration.getLogSegmentSize()).logName(str).serviceContainer(this.serviceContainer).build().join();
    }

    private void initLastPosition() {
        BufferedLogStreamReader bufferedLogStreamReader = new BufferedLogStreamReader(this.logStream);
        bufferedLogStreamReader.seekToLastEvent();
        this.lastPosition = bufferedLogStreamReader.getPosition();
        if (this.lastPosition > 0) {
            this.logStream.setCommitPosition(this.lastPosition);
        }
        bufferedLogStreamReader.close();
    }

    @Override // io.zeebe.distributedlog.DistributedLogstreamService
    public long append(String str, long j, byte[] bArr) {
        if (this.currentLeader.equals(str)) {
            return append(j, bArr);
        }
        this.logger.warn("Append request from follower node {}. Current leader is {}.", str, this.currentLeader);
        return 0L;
    }

    @Override // io.zeebe.distributedlog.restore.log.LogReplicationAppender
    public long append(long j, byte[] bArr) {
        if (j <= this.lastPosition) {
            this.logger.trace("Rejecting append request at position {}", Long.valueOf(j));
            return 1L;
        }
        long appendBlock = appendBlock(ByteBuffer.wrap(bArr));
        updateCommitPosition(j);
        return appendBlock;
    }

    private long appendBlock(ByteBuffer byteBuffer) {
        long j = -1;
        boolean z = true;
        do {
            try {
                try {
                    j = this.logStorage.append(byteBuffer);
                    z = false;
                } catch (IOException e) {
                    LOG.error("Expected to append new buffer, but caught IOException. Will retry this operation.", e);
                }
            } catch (Exception e2) {
                LOG.error("Expected to append new buffer, but caught an non recoverable exception. Will block the primitive.", e2);
                while (true) {
                    try {
                        new CountDownLatch(1).await();
                    } catch (InterruptedException e3) {
                        LOG.error("Blocking the primitive was interrupted.", e3);
                    }
                }
            }
        } while (z);
        return j;
    }

    @Override // io.zeebe.distributedlog.DistributedLogstreamService
    public boolean claimLeaderShip(String str, long j) {
        this.logger.debug("Node {} claiming leadership for LogStream partition {} at term {}.", new Object[]{str, Integer.valueOf(this.logStream.getPartitionId()), Long.valueOf(j)});
        if (this.currentLeaderTerm >= j) {
            return false;
        }
        this.currentLeader = str;
        this.currentLeaderTerm = j;
        return true;
    }

    public void backup(BackupOutput backupOutput) {
        this.logger.info("Backup log {} at position {}", this.logName, Long.valueOf(this.lastPosition));
        backupOutput.writeLong(this.lastPosition);
        backupOutput.writeString(this.currentLeader);
        backupOutput.writeLong(this.currentLeaderTerm);
    }

    public void restore(BackupInput backupInput) {
        long readLong = backupInput.readLong();
        if (this.lastPosition < readLong) {
            LogstreamConfig.startRestore(this.localMemberId, this.partitionId);
            SingleThreadContext singleThreadContext = new SingleThreadContext(String.format("log-restore-%d", Integer.valueOf(this.partitionId)));
            RestoreController createRestoreController = createRestoreController(singleThreadContext);
            while (this.lastPosition < readLong) {
                long j = this.lastPosition;
                this.logger.debug("Restoring local log from position {} to {}", Long.valueOf(j), Long.valueOf(readLong));
                try {
                    this.lastPosition = createRestoreController.restore(this.lastPosition, readLong);
                    this.logger.debug("Restored local log from position {} to {}", Long.valueOf(j), Long.valueOf(this.lastPosition));
                } catch (RuntimeException e) {
                    this.lastPosition = this.logStream.getCommitPosition();
                    this.logger.debug("Restoring local log failed at position {}, retrying.", Long.valueOf(this.lastPosition), e);
                }
            }
            singleThreadContext.close();
            LogstreamConfig.completeRestore(this.localMemberId, this.partitionId);
        }
        this.logger.debug("Restored local log to position {}", Long.valueOf(this.lastPosition));
        this.currentLeader = backupInput.readString();
        this.currentLeaderTerm = backupInput.readLong();
    }

    private RestoreController createRestoreController(ThreadContext threadContext) {
        RestoreFactory restoreFactory = LogstreamConfig.getRestoreFactory(this.localMemberId);
        RestoreClient createClient = restoreFactory.createClient(this.partitionId);
        RestoreNodeProvider createNodeProvider = restoreFactory.createNodeProvider(this.partitionId);
        LogReplicator logReplicator = new LogReplicator(this, createClient, threadContext);
        SnapshotRestoreContext createSnapshotRestoreContext = restoreFactory.createSnapshotRestoreContext(this.partitionId, this.logger);
        return new RestoreController(createClient, createNodeProvider, logReplicator, new RestoreSnapshotReplicator(createClient, createSnapshotRestoreContext, new FileSnapshotConsumer(createSnapshotRestoreContext.getStateStorage(), LOG), threadContext, this.logger), threadContext, this.logger);
    }

    private void updateCommitPosition(long j) {
        this.logStream.setCommitPosition(j);
        this.lastPosition = j;
    }

    public void close() {
        super.close();
        this.logger.info("Closing {}", getServiceName());
    }
}
