package io.zeebe.broker.logstreams.restore;

import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.zeebe.distributedlog.restore.RestoreServer;
import io.zeebe.distributedlog.restore.impl.DefaultRestoreInfoRequestHandler;
import io.zeebe.distributedlog.restore.log.impl.DefaultLogReplicationRequestHandler;
import io.zeebe.distributedlog.restore.snapshot.impl.DefaultSnapshotRequestHandler;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.spi.SnapshotController;
import io.zeebe.util.ZbLogger;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/logstreams/restore/BrokerRestoreServer.class */
public class BrokerRestoreServer implements RestoreServer {
    private final ClusterCommunicationService communicationService;
    private final String logReplicationTopic;
    private final String restoreInfoTopic;
    private final String snapshotRequestTopic;
    private final String snapshotInfoRequestTopic;
    private final ExecutorService executor;
    private final Logger logger;

    public BrokerRestoreServer(ClusterCommunicationService clusterCommunicationService, int i) {
        this(clusterCommunicationService, BrokerRestoreFactory.getLogReplicationTopic(i), BrokerRestoreFactory.getRestoreInfoTopic(i), BrokerRestoreFactory.getSnapshotRequestTopic(i), BrokerRestoreFactory.getSnapshotInfoRequestTopic(i), Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, String.format(BrokerRestoreServer.class.getName(), Integer.valueOf(i)));
        }), new ZbLogger(String.format(BrokerRestoreServer.class.getName(), Integer.valueOf(i))));
    }

    public BrokerRestoreServer(ClusterCommunicationService clusterCommunicationService, String str, String str2, String str3, String str4, ExecutorService executorService, Logger logger) {
        this.communicationService = clusterCommunicationService;
        this.logReplicationTopic = str;
        this.restoreInfoTopic = str2;
        this.snapshotRequestTopic = str3;
        this.snapshotInfoRequestTopic = str4;
        this.executor = executorService;
        this.logger = logger;
    }

    public CompletableFuture<Void> start(LogStream logStream, SnapshotController snapshotController) {
        DefaultLogReplicationRequestHandler defaultLogReplicationRequestHandler = new DefaultLogReplicationRequestHandler(logStream);
        DefaultRestoreInfoRequestHandler defaultRestoreInfoRequestHandler = new DefaultRestoreInfoRequestHandler(logStream, snapshotController);
        DefaultSnapshotRequestHandler defaultSnapshotRequestHandler = new DefaultSnapshotRequestHandler(snapshotController);
        return serve((RestoreServer.LogReplicationRequestHandler) defaultLogReplicationRequestHandler).thenCompose(r5 -> {
            return serve(defaultRestoreInfoRequestHandler);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r52 -> {
            return serve(defaultSnapshotRequestHandler);
        }).thenRun(this::logServerStart);
    }

    public CompletableFuture<Void> serve(RestoreServer.SnapshotRequestHandler snapshotRequestHandler) {
        return this.communicationService.subscribe(this.snapshotRequestTopic, SbeSnapshotRestoreRequest::new, sbeSnapshotRestoreRequest -> {
            return snapshotRequestHandler.onSnapshotRequest(sbeSnapshotRestoreRequest, this.logger);
        }, SbeSnapshotRestoreResponse::serialize, this.executor);
    }

    public CompletableFuture<Void> serve(RestoreServer.RestoreInfoRequestHandler restoreInfoRequestHandler) {
        return this.communicationService.subscribe(this.restoreInfoTopic, SbeRestoreInfoRequest::new, sbeRestoreInfoRequest -> {
            return restoreInfoRequestHandler.onRestoreInfoRequest(sbeRestoreInfoRequest, this.logger);
        }, SbeRestoreInfoResponse::serialize, this.executor);
    }

    public void close() {
        this.communicationService.unsubscribe(this.logReplicationTopic);
        this.communicationService.unsubscribe(this.restoreInfoTopic);
        this.communicationService.unsubscribe(this.snapshotRequestTopic);
        this.communicationService.unsubscribe(this.snapshotInfoRequestTopic);
        this.executor.shutdownNow();
        this.logger.debug("Closed restore server for topics: {}, {}, {}, {}", new Object[]{this.logReplicationTopic, this.restoreInfoTopic, this.snapshotRequestTopic, this.snapshotInfoRequestTopic});
    }

    public CompletableFuture<Void> serve(RestoreServer.LogReplicationRequestHandler logReplicationRequestHandler) {
        return this.communicationService.subscribe(this.logReplicationTopic, SbeLogReplicationRequest::new, sbeLogReplicationRequest -> {
            return logReplicationRequestHandler.onReplicationRequest(sbeLogReplicationRequest, this.logger);
        }, SbeLogReplicationResponse::serialize, this.executor);
    }

    private void logServerStart() {
        this.logger.debug("Started restore server for topics: {}, {}, {}, {}", new Object[]{this.logReplicationTopic, this.restoreInfoTopic, this.snapshotRequestTopic, this.snapshotInfoRequestTopic});
    }
}
