package com.fr.swift.service;

import com.fr.swift.annotation.RpcMethod;
import com.fr.swift.annotation.RpcService;
import com.fr.swift.config.service.SwiftSegmentLocationService;
import com.fr.swift.config.service.SwiftSegmentService;
import com.fr.swift.context.SwiftContext;
import com.fr.swift.cube.io.Types;
import com.fr.swift.db.Database;
import com.fr.swift.db.Table;
import com.fr.swift.db.impl.SwiftDatabase;
import com.fr.swift.event.SwiftEventDispatcher;
import com.fr.swift.exception.SwiftServiceException;
import com.fr.swift.exception.TableNotExistException;
import com.fr.swift.log.SwiftLoggers;
import com.fr.swift.property.SwiftProperty;
import com.fr.swift.segment.Segment;
import com.fr.swift.segment.SegmentKey;
import com.fr.swift.segment.SegmentUtils;
import com.fr.swift.segment.SwiftSegmentManager;
import com.fr.swift.segment.collate.FragmentCollectRule;
import com.fr.swift.segment.collate.SwiftFragmentCollectRule;
import com.fr.swift.segment.event.SegmentEvent;
import com.fr.swift.segment.event.SyncSegmentLocationEvent;
import com.fr.swift.segment.impl.HisSegmentMergerImpl;
import com.fr.swift.source.SourceKey;
import com.fr.swift.source.alloter.impl.line.LineAllotRule;
import com.fr.swift.source.alloter.impl.line.LineSourceAlloter;
import com.fr.swift.util.concurrent.CommonExecutor;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;

@com.fr.swift.annotation.SwiftService(name = "collate")
@RpcService(value = CollateService.class, type = RpcService.RpcServiceType.INTERNAL)
/* loaded from: input_file:fine-swift-log-adaptor-10.0.jar:com/fr/swift/service/SwiftCollateService.class */
public class SwiftCollateService extends AbstractSwiftService implements CollateService {
    private static final long serialVersionUID = 7259915342007294244L;
    private transient SwiftSegmentManager segmentManager;
    private transient Database database;
    private transient SwiftSegmentService swiftSegmentService;
    private transient SwiftSegmentLocationService segLocationSvc;
    private transient FragmentCollectRule collectRule = new SwiftFragmentCollectRule();

    private SwiftCollateService() {
    }

    @Override // com.fr.swift.service.AbstractSwiftService, com.fr.swift.service.SwiftService
    public boolean start() throws SwiftServiceException {
        super.start();
        this.segmentManager = (SwiftSegmentManager) SwiftContext.get().getBean("localSegmentProvider", SwiftSegmentManager.class);
        this.database = SwiftDatabase.getInstance();
        this.swiftSegmentService = (SwiftSegmentService) SwiftContext.get().getBean(SwiftSegmentService.class);
        this.segLocationSvc = (SwiftSegmentLocationService) SwiftContext.get().getBean(SwiftSegmentLocationService.class);
        return true;
    }

    @Override // com.fr.swift.service.AbstractSwiftService, com.fr.swift.service.SwiftService
    public boolean shutdown() throws SwiftServiceException {
        super.shutdown();
        this.segmentManager = null;
        this.database = null;
        this.swiftSegmentService = null;
        this.segLocationSvc = null;
        return true;
    }

    @Override // com.fr.swift.service.CollateService
    public void autoCollateRealtime(SourceKey sourceKey) throws Exception {
        List<SegmentKey> segmentKeys = this.segmentManager.getSegmentKeys(sourceKey);
        checkSegmentKeys(segmentKeys, Types.StoreType.MEMORY);
        collateSegments(sourceKey, segmentKeys);
    }

    @Override // com.fr.swift.service.CollateService
    public void autoCollateHistory(SourceKey sourceKey) throws Exception {
        List<SegmentKey> segmentKeys = this.segmentManager.getSegmentKeys(sourceKey);
        checkSegmentKeys(segmentKeys, Types.StoreType.FINE_IO);
        collateSegments(sourceKey, segmentKeys);
    }

    @Override // com.fr.swift.service.CollateService
    @RpcMethod(methodName = "appointCollate")
    public void appointCollate(SourceKey sourceKey, List<SegmentKey> list) throws Exception {
        collateSegments(sourceKey, list);
    }

    @Override // com.fr.swift.service.CollateService
    public void autoCollate(SourceKey sourceKey) throws Exception {
        collateSegments(sourceKey);
    }

    @Override // com.fr.swift.service.SwiftService
    public ServiceType getServiceType() {
        return ServiceType.COLLATE;
    }

    private void collateSegments(SourceKey sourceKey) throws Exception {
        collateSegments(sourceKey, this.segmentManager.getSegmentKeys(sourceKey));
    }

    private void collateSegments(SourceKey sourceKey, List<SegmentKey> list) throws Exception {
        SwiftLoggers.getLogger().info("Prepare collate task! Original segs: {}! ", list.toString());
        if (!this.database.existsTable(sourceKey)) {
            throw new TableNotExistException(sourceKey);
        }
        if (this.swiftSegmentService.getTableSegKeys(sourceKey).containsAll(list)) {
            Table table = this.database.getTable(sourceKey);
            int step = ((LineAllotRule) new LineSourceAlloter(table.getSourceKey()).getAllotRule()).getStep();
            HisSegmentMergerImpl hisSegmentMergerImpl = new HisSegmentMergerImpl();
            List<Segment> segmentsByKeys = getSegmentsByKeys(list);
            if (!list.isEmpty() && segmentsByKeys.isEmpty()) {
                clearCollatedSegment(list, sourceKey);
                return;
            }
            List<SegmentKey> merge = hisSegmentMergerImpl.merge(table, segmentsByKeys, step);
            if (merge.isEmpty()) {
                SwiftLoggers.getLogger().info("Failed collate! Collate segs: {}", list);
                return;
            }
            fireSyncNewKeys(merge);
            clearCollatedSegment(list, sourceKey);
            SwiftLoggers.getLogger().info("Finish collate ! Collated segs: {} ; New segs: {}", list.toString(), merge);
        }
    }

    private void fireSyncNewKeys(List<SegmentKey> list) {
        SwiftSegmentManager swiftSegmentManager = (SwiftSegmentManager) SwiftContext.get().getBean("localSegmentProvider", SwiftSegmentManager.class);
        Iterator<SegmentKey> it = list.iterator();
        while (it.hasNext()) {
            swiftSegmentManager.getSegment(it.next());
        }
        SwiftEventDispatcher.fire(SyncSegmentLocationEvent.PUSH_SEG, list);
    }

    private void checkSegmentKeys(List<SegmentKey> list, Types.StoreType storeType) {
        Iterator<SegmentKey> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getStoreType() != storeType) {
                it.remove();
            }
        }
    }

    private List<Segment> getSegmentsByKeys(List<SegmentKey> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<SegmentKey> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.segmentManager.getSegment(it.next()));
        }
        return arrayList;
    }

    private void clearCollatedSegment(final List<SegmentKey> list, SourceKey sourceKey) {
        this.segLocationSvc.deleteOnNode(((SwiftProperty) SwiftContext.get().getBean(SwiftProperty.class)).getClusterId(), new HashSet(list));
        this.swiftSegmentService.delete(list);
        CommonExecutor.get().execute(new Runnable() { // from class: com.fr.swift.service.SwiftCollateService.1
            @Override // java.lang.Runnable
            public void run() {
                for (SegmentKey segmentKey : list) {
                    SegmentUtils.clearSegment(segmentKey);
                    SwiftLoggers.getLogger().info("SwiftCollateService.clearCollatedSegment {}", segmentKey);
                    if (segmentKey.getStoreType().isPersistent()) {
                        SwiftEventDispatcher.fire(SegmentEvent.REMOVE_HISTORY, segmentKey);
                    }
                }
            }
        });
    }
}
