package com.fr.swift.service;

import com.fr.swift.basics.base.selector.ProxySelector;
import com.fr.swift.config.service.SwiftSegmentService;
import com.fr.swift.context.SwiftContext;
import com.fr.swift.cube.io.ResourceDiscovery;
import com.fr.swift.db.Table;
import com.fr.swift.db.impl.SwiftDatabase;
import com.fr.swift.exception.SwiftServiceException;
import com.fr.swift.execute.task.RealtimeInsertExecutorTask;
import com.fr.swift.execute.task.RecoveryExecutorTask;
import com.fr.swift.executor.TaskProducer;
import com.fr.swift.log.SwiftLoggers;
import com.fr.swift.netty.rpc.server.RpcServer;
import com.fr.swift.query.info.bean.query.QueryInfoBean;
import com.fr.swift.query.query.QueryBeanFactory;
import com.fr.swift.segment.SwiftSegmentManager;
import com.fr.swift.source.SourceKey;
import com.fr.swift.source.SwiftResultSet;
import com.fr.swift.util.JsonBuilder;
import java.io.Serializable;
import java.sql.SQLException;
import java.util.Iterator;

@com.fr.swift.annotation.SwiftService(name = "realtime")
/* loaded from: input_file:fine-swift-log-adaptor-10.0.jar:com/fr/swift/service/SwiftRealtimeService.class */
public class SwiftRealtimeService extends AbstractSwiftService implements RealtimeService, Serializable {
    private static final long serialVersionUID = 4719723736240190155L;
    private transient RpcServer server;
    private transient SwiftSegmentManager segmentManager;
    private transient QueryBeanFactory queryBeanFactory;
    private transient SwiftSegmentService segmentService;
    private volatile transient boolean recoverable = true;

    private SwiftRealtimeService() {
    }

    @Override // com.fr.swift.service.AbstractSwiftService, com.fr.swift.service.SwiftService
    public boolean start() throws SwiftServiceException {
        super.start();
        this.server = (RpcServer) SwiftContext.get().getBean(RpcServer.class);
        this.segmentManager = (SwiftSegmentManager) SwiftContext.get().getBean("localSegmentProvider", SwiftSegmentManager.class);
        this.queryBeanFactory = (QueryBeanFactory) SwiftContext.get().getBean(QueryBeanFactory.class);
        this.segmentService = (SwiftSegmentService) SwiftContext.get().getBean(SwiftSegmentService.class);
        if (!this.recoverable) {
            return true;
        }
        recover0();
        this.recoverable = false;
        return true;
    }

    @Override // com.fr.swift.service.AbstractSwiftService, com.fr.swift.service.SwiftService
    public boolean shutdown() throws SwiftServiceException {
        super.shutdown();
        ResourceDiscovery.getInstance().releaseAll();
        this.recoverable = true;
        this.server = null;
        this.segmentManager = null;
        this.queryBeanFactory = null;
        this.segmentService = null;
        return true;
    }

    @Override // com.fr.swift.service.RealtimeService
    public void insert(SourceKey sourceKey, SwiftResultSet swiftResultSet) throws Exception {
        TaskProducer.produceTask(new RealtimeInsertExecutorTask(sourceKey, swiftResultSet));
    }

    private void recover0() {
        Iterator<Table> it = SwiftDatabase.getInstance().getAllTables().iterator();
        while (it.hasNext()) {
            try {
                TaskProducer.produceTask(new RecoveryExecutorTask(it.next().getSourceKey()));
            } catch (Exception e) {
                SwiftLoggers.getLogger().warn(e);
            }
        }
    }

    @Override // com.fr.swift.service.RealtimeService
    public SwiftResultSet query(String str) throws SQLException {
        try {
            return ((ServiceContext) ProxySelector.getProxy(ServiceContext.class)).getQueryResult(JsonBuilder.writeJsonString((QueryInfoBean) this.queryBeanFactory.create(str, false)));
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

    @Override // com.fr.swift.service.SwiftService
    public ServiceType getServiceType() {
        return ServiceType.REAL_TIME;
    }
}
