package com.fr.swift.service.transfer;

import com.fr.swift.config.service.SwiftSegmentService;
import com.fr.swift.context.SwiftContext;
import com.fr.swift.event.SwiftEventDispatcher;
import com.fr.swift.property.SwiftProperty;
import com.fr.swift.segment.RealTimeSegment;
import com.fr.swift.segment.RealtimeSegmentMemMeter;
import com.fr.swift.segment.Segment;
import com.fr.swift.segment.SegmentKey;
import com.fr.swift.segment.SegmentUtils;
import com.fr.swift.segment.event.SegmentEvent;
import com.fr.swift.service.SwiftCoreService;
import com.fr.swift.util.concurrent.PoolThreadFactory;
import com.fr.swift.util.concurrent.SwiftExecutors;
import com.fr.third.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

@SwiftCoreService.CoreService
@Service
/* loaded from: input_file:com/fr/swift/service/transfer/ScheduledRealtimeTransfer.class */
public class ScheduledRealtimeTransfer implements Runnable, SwiftCoreService {
    private SwiftSegmentService segSvc;
    private ScheduledExecutorService executor;
    private Map<SegmentKey, AtomicInteger> segAges = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/fr/swift/service/transfer/ScheduledRealtimeTransfer$SegMemDescCmp.class */
    public static class SegMemDescCmp implements Comparator<Map.Entry<SegmentKey, Long>> {
        private SegMemDescCmp() {
        }

        @Override // java.util.Comparator
        public int compare(Map.Entry<SegmentKey, Long> entry, Map.Entry<SegmentKey, Long> entry2) {
            return (int) (entry2.getValue().longValue() - entry.getValue().longValue());
        }
    }

    @Override // com.fr.swift.service.SwiftCoreService
    public void start() {
        this.segSvc = (SwiftSegmentService) SwiftContext.get().getBean(SwiftSegmentService.class);
        this.executor = SwiftExecutors.newSingleThreadScheduledExecutor(new PoolThreadFactory(getClass()));
        this.executor.scheduleWithFixedDelay(this, 1L, 1L, TimeUnit.HOURS);
    }

    @Override // com.fr.swift.service.SwiftCoreService
    public void stop() {
        this.executor.shutdownNow();
        this.segSvc = null;
        this.segAges.clear();
    }

    @Override // java.lang.Runnable
    public void run() {
        HashMap hashMap = new HashMap();
        long j = 0;
        for (SegmentKey segmentKey : this.segSvc.getRealtimeSegKeyOnNode(((SwiftProperty) SwiftContext.get().getBean(SwiftProperty.class)).getClusterId())) {
            Segment newSegment = SegmentUtils.newSegment(segmentKey);
            if (!newSegment.isReadable() || newSegment.getRowCount() < 50000) {
                long meter = RealtimeSegmentMemMeter.meter((RealTimeSegment) newSegment);
                hashMap.put(segmentKey, Long.valueOf(meter));
                j += meter;
            } else {
                SwiftEventDispatcher.fire(SegmentEvent.TRANSFER_REALTIME, segmentKey);
                this.segAges.remove(segmentKey);
            }
        }
        transferRemain(hashMap, j);
    }

    private void transferRemain(Map<SegmentKey, Long> map, long j) {
        transferBigSeg(map, j);
        updateSegAge(map.keySet());
        transferOldSeg();
    }

    private void transferBigSeg(Map<SegmentKey, Long> map, long j) {
        ArrayList<Map.Entry> arrayList = new ArrayList(map.entrySet());
        Collections.sort(arrayList, new SegMemDescCmp());
        double d = TransferLimits.ALL_SEG_MEM_LIMIT * 0.6d;
        for (Map.Entry entry : arrayList) {
            if (j < d) {
                return;
            }
            SwiftEventDispatcher.fire(SegmentEvent.TRANSFER_REALTIME, entry.getKey());
            j -= ((Long) entry.getValue()).longValue();
            map.remove(entry.getKey());
        }
    }

    private void updateSegAge(Set<SegmentKey> set) {
        for (SegmentKey segmentKey : set) {
            if (!this.segAges.containsKey(segmentKey)) {
                this.segAges.put(segmentKey, new AtomicInteger(0));
            }
            this.segAges.get(segmentKey).incrementAndGet();
        }
    }

    private void transferOldSeg() {
        Iterator<Map.Entry<SegmentKey, AtomicInteger>> it = this.segAges.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<SegmentKey, AtomicInteger> next = it.next();
            if (next.getValue().get() >= 24) {
                SwiftEventDispatcher.fire(SegmentEvent.TRANSFER_REALTIME, next.getKey());
                it.remove();
            }
        }
    }
}
