package io.zeebe.logstreams.impl.service;

import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.DispatcherBuilder;
import io.zeebe.dispatcher.Dispatchers;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.logstreams.impl.LogStorageAppender;
import io.zeebe.logstreams.impl.LogStreamBuilder;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.BufferedLogStreamReader;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.servicecontainer.CompositeServiceBuilder;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.util.ByteValue;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.channel.ActorConditions;
import io.zeebe.util.sched.future.ActorFuture;
import org.agrona.concurrent.status.Position;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/service/LogStreamService.class */
public class LogStreamService implements LogStream, Service<LogStream> {
    public static final long INVALID_ADDRESS = -1;
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private static final String APPENDER_SUBSCRIPTION_NAME = "appender";
    private final Injector<LogStorage> logStorageInjector = new Injector<>();
    private final ServiceContainer serviceContainer;
    private final ActorConditions onCommitPositionUpdatedConditions;
    private final String logName;
    private final int partitionId;
    private final ByteValue writeBufferSize;
    private final int maxAppendBlockSize;
    private final Position commitPosition;
    private BufferedLogStreamReader reader;
    private ServiceStartContext serviceContext;
    private LogStorage logStorage;
    private ActorFuture<Dispatcher> writeBufferFuture;
    private ActorFuture<LogStorageAppender> appenderFuture;
    private Dispatcher writeBuffer;
    private LogStorageAppender appender;

    public LogStreamService(LogStreamBuilder logStreamBuilder) {
        this.logName = logStreamBuilder.getLogName();
        this.partitionId = logStreamBuilder.getPartitionId();
        this.serviceContainer = logStreamBuilder.getServiceContainer();
        this.onCommitPositionUpdatedConditions = logStreamBuilder.getOnCommitPositionUpdatedConditions();
        this.commitPosition = logStreamBuilder.getCommitPosition();
        this.writeBufferSize = ByteValue.ofBytes(logStreamBuilder.getWriteBufferSize());
        this.maxAppendBlockSize = logStreamBuilder.getMaxAppendBlockSize();
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.commitPosition.setVolatile(-1L);
        this.serviceContext = serviceStartContext;
        this.logStorage = (LogStorage) this.logStorageInjector.getValue();
        this.reader = new BufferedLogStreamReader(this);
    }

    public void stop(ServiceStopContext serviceStopContext) {
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public LogStream m27get() {
        return this;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public String getLogName() {
        return this.logName;
    }

    @Override // io.zeebe.logstreams.log.LogStream, java.lang.AutoCloseable
    public void close() {
        closeAsync().join();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<Void> closeAsync() {
        return this.serviceContainer.removeService(LogStreamServiceNames.logStreamRootServiceName(this.logName));
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public long getCommitPosition() {
        return this.commitPosition.get();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void setCommitPosition(long j) {
        this.commitPosition.setOrdered(j);
        this.onCommitPositionUpdatedConditions.signalConsumers();
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public LogStorage getLogStorage() {
        return this.logStorage;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public Dispatcher getWriteBuffer() {
        if (this.writeBuffer == null && this.writeBufferFuture != null) {
            this.writeBuffer = (Dispatcher) this.writeBufferFuture.join();
        }
        return this.writeBuffer;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public LogStorageAppender getLogStorageAppender() {
        if (this.appender == null && this.appenderFuture != null) {
            this.appender = (LogStorageAppender) this.appenderFuture.join();
        }
        return this.appender;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<Void> closeAppender() {
        this.appenderFuture = null;
        this.writeBufferFuture = null;
        this.appender = null;
        this.writeBuffer = null;
        return this.serviceContext.removeService(LogStreamServiceNames.logStorageAppenderRootService(getLogName()));
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<LogStorageAppender> openAppender() {
        String logName = getLogName();
        ServiceName<Void> logStorageAppenderRootService = LogStreamServiceNames.logStorageAppenderRootService(logName);
        ServiceName<Dispatcher> logWriteBufferServiceName = LogStreamServiceNames.logWriteBufferServiceName(logName);
        ServiceName<Subscription> logWriteBufferSubscriptionServiceName = LogStreamServiceNames.logWriteBufferSubscriptionServiceName(logName, APPENDER_SUBSCRIPTION_NAME);
        ServiceName<LogStorageAppender> logStorageAppenderServiceName = LogStreamServiceNames.logStorageAppenderServiceName(logName);
        DispatcherBuilder bufferSize = Dispatchers.create(logWriteBufferServiceName.getName()).bufferSize(this.writeBufferSize);
        CompositeServiceBuilder createComposite = this.serviceContext.createComposite(logStorageAppenderRootService);
        LogWriteBufferService logWriteBufferService = new LogWriteBufferService(bufferSize);
        this.writeBufferFuture = createComposite.createService(logWriteBufferServiceName, logWriteBufferService).dependency(this.logStorageInjector.getInjectedServiceName(), logWriteBufferService.getLogStorageInjector()).install();
        LogWriteBufferSubscriptionService logWriteBufferSubscriptionService = new LogWriteBufferSubscriptionService(APPENDER_SUBSCRIPTION_NAME);
        createComposite.createService(logWriteBufferSubscriptionServiceName, logWriteBufferSubscriptionService).dependency(logWriteBufferServiceName, logWriteBufferSubscriptionService.getWritebufferInjector()).install();
        LogStorageAppenderService logStorageAppenderService = new LogStorageAppenderService(this.maxAppendBlockSize);
        this.appenderFuture = createComposite.createService(logStorageAppenderServiceName, logStorageAppenderService).dependency(logWriteBufferSubscriptionServiceName, logStorageAppenderService.getAppenderSubscriptionInjector()).dependency(LogStreamServiceNames.distributedLogPartitionServiceName(logName), logStorageAppenderService.getDistributedLogstreamInjector()).install();
        return createComposite.installAndReturn(logStorageAppenderServiceName);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void delete(long j) {
        if (!this.reader.seek(j)) {
            LOG.debug("Tried to delete from log stream, but found no corresponding address in the log block index for the given position {}.", Long.valueOf(j));
            return;
        }
        long lastReadAddress = this.reader.lastReadAddress();
        LOG.debug("Delete data from log stream until position '{}' (address: '{}').", Long.valueOf(j), Long.valueOf(lastReadAddress));
        this.logStorage.delete(lastReadAddress);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void registerOnCommitPositionUpdatedCondition(ActorCondition actorCondition) {
        this.onCommitPositionUpdatedConditions.registerConsumer(actorCondition);
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void removeOnCommitPositionUpdatedCondition(ActorCondition actorCondition) {
        this.onCommitPositionUpdatedConditions.removeConsumer(actorCondition);
    }

    public Injector<LogStorage> getLogStorageInjector() {
        return this.logStorageInjector;
    }
}
