package com.fr.io.sync.cluster;

import com.fr.cluster.core.ClusterNode;
import com.fr.cluster.engine.base.ClusterEngineUtils;
import com.fr.concurrent.NamedThreadFactory;
import com.fr.event.EventDispatcher;
import com.fr.io.SyncModuleContext;
import com.fr.io.base.arch.DiffElement;
import com.fr.io.base.arch.DiffUtils;
import com.fr.io.base.arch.ResourceArchitectureProvider;
import com.fr.io.base.events.SyncCacheEvents;
import com.fr.io.base.msg.FsyncMessagePublisher;
import com.fr.io.monitor.ResourceEntry;
import com.fr.io.sync.config.ResourceSyncConfig;
import com.fr.io.utils.ResourceIOUtils;
import com.fr.log.FineLoggerFactory;
import com.fr.third.jgroups.Address;
import com.fr.third.jgroups.JChannel;
import com.fr.third.jgroups.Message;
import com.fr.third.jgroups.ReceiverAdapter;
import com.fr.third.jgroups.View;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncReceiver.class */
public class ResourceSyncReceiver extends ReceiverAdapter {
    private static final int BATCH_MSG_SIZE = 100;
    private final ResourceSyncCluster SYNC_CLUSTER;
    private final ExecutorService HANDLING_THREAD_POOL;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceSyncReceiver(JChannel jChannel, ResourceSyncCluster resourceSyncCluster) {
        jChannel.setReceiver(this);
        this.SYNC_CLUSTER = resourceSyncCluster;
        this.HANDLING_THREAD_POOL = Executors.newFixedThreadPool(ResourceSyncConfig.getInstance().getReceivePoolSize(), new NamedThreadFactory(getClass().getSimpleName()));
    }

    @Override // com.fr.third.jgroups.ReceiverAdapter, com.fr.third.jgroups.MessageListener
    public void receive(Message message) {
        if (message == null || message.getLength() == 0) {
            FineLoggerFactory.getLogger().error("[Resource] Message received is empty.");
            return;
        }
        Object object = message.getObject();
        if (object == null) {
            FineLoggerFactory.getLogger().error("[Resource] Message {}'s data object is null.", message);
            return;
        }
        Address src = message.getSrc();
        if (src == null) {
            FineLoggerFactory.getLogger().error("[Resource] Message {}'s address is null.", message);
            return;
        }
        if (object instanceof SyncMessage) {
            handle((SyncMessage) object, src);
            return;
        }
        if (!(object instanceof List)) {
            FineLoggerFactory.getLogger().error("[Resource] Unknown message received:{}.", message);
            return;
        }
        for (Object obj : (List) object) {
            if (obj instanceof SyncMessage) {
                handle((SyncMessage) obj, src);
            } else {
                FineLoggerFactory.getLogger().error("[Resource] Unknown message type:{}.", obj.getClass());
            }
        }
    }

    private void handle(SyncMessage syncMessage, Address address) {
        this.HANDLING_THREAD_POOL.submit(() -> {
            handleSyncMessage(syncMessage, address);
        });
    }

    private void handleSyncMessage(SyncMessage syncMessage, Address address) {
        int event = syncMessage.getEvent();
        if (FineLoggerFactory.getLogger().isDebugEnabled()) {
            FineLoggerFactory.getLogger().debug("[Resource] Process received {} message of key: {}.", syncMessage.getEventName(), syncMessage.getKey());
        }
        switch (event) {
            case 20:
                handleSyncReq(syncMessage, address);
                return;
            case 21:
                handleSyncAck(syncMessage, address);
                return;
            case 22:
                handleSyncRcv(syncMessage, address);
                return;
            case 23:
                handleSyncData(syncMessage, address);
                return;
            case 24:
                handleSyncOK(syncMessage, address);
                return;
            default:
                return;
        }
    }

    private void handleSyncOK(SyncMessage syncMessage, Address address) {
        if (syncMessage != null) {
            this.SYNC_CLUSTER.confirmReceived(syncMessage, address);
        }
    }

    private void handleSyncRcv(SyncMessage syncMessage, Address address) {
    }

    private void handleSyncData(SyncMessage syncMessage, Address address) {
        Serializable data = syncMessage.getData();
        if (data != null) {
            Address[] addressArr = {address};
            if (data instanceof DiffElement) {
                ResourceSyncHelper.persist((DiffElement) data, syncMessage.isRequireBackup());
                this.SYNC_CLUSTER.confirmReceived(syncMessage, address);
            } else if (data instanceof List) {
                List list = (List) data;
                FineLoggerFactory.getLogger().info("[Resource] Batch message {} of {} DiffElements is received.", syncMessage.getKey(), Integer.valueOf(list.size()));
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    ResourceSyncHelper.persist((DiffElement) it.next(), syncMessage.isRequireBackup());
                }
            } else if (!(data instanceof ConsistenceEntry)) {
                FineLoggerFactory.getLogger().error("[Resource] Unknown sync data message type: {}.", addressArr.getClass());
                return;
            } else if (syncMessage.getKey().startsWith(SyncConstants.CONSISTENCE_CHECK_PREFIX)) {
                this.SYNC_CLUSTER.confirmReceived(syncMessage, address);
                EventDispatcher.fire(SyncCacheEvents.CONSISTENCE_CHECKED, (ConsistenceEntry) data);
            }
            this.SYNC_CLUSTER.send(addressArr, new SyncMessage(24, syncMessage.getKey(), null));
        }
    }

    private void handleSyncReq(SyncMessage syncMessage, Address address) {
        Serializable data = syncMessage.getData();
        String key = syncMessage.getKey();
        if (data != null) {
            Address[] addressArr = {address};
            if (data instanceof ResourceArchitectureProvider) {
                handleBootstrapMsg(addressArr, key, data);
                return;
            }
            if (data instanceof ResourceEntry) {
                handleBootstrapEntryMsg(addressArr, key, data);
            } else if (data instanceof DiffElement) {
                handleSupportMsg(addressArr, key, data);
            } else if (data instanceof ConsistenceEntry) {
                handleConsistentCheckReq(addressArr, key, data);
            }
        }
    }

    private void handleSyncAck(SyncMessage syncMessage, Address address) {
        if (checkShouldReceive(syncMessage.getKey())) {
            if (syncMessage.getKey().equals(SyncConstants.BOOTSTRAP_SYNC_ACK)) {
                processBootstrapAck(syncMessage, address);
            } else {
                this.SYNC_CLUSTER.send(new Address[]{address}, new SyncMessage(22, syncMessage.getKey(), null));
            }
        }
    }

    private void handleConsistentCheckReq(Address[] addressArr, String str, Serializable serializable) {
        if (!str.startsWith(SyncConstants.CONSISTENCE_CHECK_PREFIX)) {
            FineLoggerFactory.getLogger().error("[Resource] Unknown consistence check message.");
            return;
        }
        ConsistenceEntry consistenceEntry = (ConsistenceEntry) serializable;
        boolean z = false;
        ResourceEntry entry = consistenceEntry.getEntry();
        ResourceEntry resourceEntry = SyncModuleContext.getFsyncContext().getArchitecture().get(consistenceEntry.getPath());
        if (entry != null && resourceEntry != null) {
            z = entry.getDigest().equals(resourceEntry.getDigest());
            FineLoggerFactory.getLogger().info("[Resource] Request node {}'s digest is {}, and current node's digest is {}.", addressArr[0], entry.getDigest(), resourceEntry.getDigest());
        } else if (entry == null && resourceEntry == null) {
            z = true;
            FineLoggerFactory.getLogger().info("[Resource] {} is not exist at both local and remote {}", consistenceEntry.getPath(), addressArr[0]);
        }
        consistenceEntry.setConsistence(z);
        this.SYNC_CLUSTER.send(addressArr, new SyncMessage(23, str, consistenceEntry));
    }

    private void handleSupportMsg(Address[] addressArr, String str, Serializable serializable) {
        DiffElement diffElement;
        DiffElement diffElement2 = (DiffElement) serializable;
        if (ResourceIOUtils.exist(diffElement2.getPath())) {
            diffElement = new DiffElement(0, diffElement2.getPath(), ResourceIOUtils.readBytes(diffElement2.getPath()));
        } else {
            FineLoggerFactory.getLogger().warn("[Resource] {} is not exist at calibration node yet, {} will backup and remove it.", diffElement2.getPath(), addressArr[0]);
            diffElement = new DiffElement(1, diffElement2.getPath());
        }
        this.SYNC_CLUSTER.sendWithFuture(addressArr, new SyncMessage(23, str, diffElement, true));
    }

    private void processBootstrapAck(SyncMessage syncMessage, Address address) {
        Serializable data = syncMessage.getData();
        if (!(data instanceof List)) {
            FineLoggerFactory.getLogger().error("[Resource] Unknown sync request message type: {}.", data.getClass());
        } else {
            FineLoggerFactory.getLogger().info("[Resource] {} bootstrap sync files will send from {}.", Integer.valueOf(((List) data).size()), address);
        }
    }

    private List<DiffElement> diff(ResourceEntry resourceEntry) {
        return DiffUtils.getDiffElements(SyncModuleContext.getFsyncContext().getArchitecture().get(resourceEntry.getPath()), resourceEntry);
    }

    private void handleBootstrapEntryMsg(Address[] addressArr, String str, Serializable serializable) {
        if (str.startsWith(SyncConstants.BOOTSTRAP_SYNC_PREFIX)) {
            rspBootstrapMsg(addressArr, diff((ResourceEntry) serializable));
        } else {
            FineLoggerFactory.getLogger().error("[Resource] Unknown bootstrap message.");
        }
    }

    private void handleBootstrapMsg(Address[] addressArr, String str, Serializable serializable) {
        if (!str.startsWith(SyncConstants.BOOTSTRAP_SYNC_PREFIX)) {
            FineLoggerFactory.getLogger().error("[Resource] Unknown bootstrap message.");
        } else {
            rspBootstrapMsg(addressArr, SyncModuleContext.getFsyncContext().getArchitecture().diff((ResourceArchitectureProvider) serializable));
        }
    }

    private void rspBootstrapMsg(Address[] addressArr, List<DiffElement> list) {
        ArrayList arrayList = new ArrayList();
        for (DiffElement diffElement : list) {
            if (diffElement.getType() != 3) {
                arrayList.add(diffElement);
            }
        }
        rspSyncAck(addressArr, arrayList);
        if (arrayList.isEmpty()) {
            FineLoggerFactory.getLogger().info("[Resource] No differences(compares to {}) for bootstrap sync.", addressArr[0]);
            return;
        }
        FineLoggerFactory.getLogger().info("[Resource] {} different files found, sync now.", Integer.valueOf(arrayList.size()));
        if (arrayList.size() > 100) {
            ArrayList arrayList2 = new ArrayList();
            Iterator<DiffElement> it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add(it.next());
                if (arrayList2.size() == 100) {
                    rspBootstrapBatch(addressArr, arrayList2);
                    arrayList2.clear();
                }
            }
            if (arrayList2.size() > 0) {
                rspBootstrapBatch(addressArr, arrayList2);
            }
            FineLoggerFactory.getLogger().info("[Resource] Batch bootstrap message of {} DiffElements is send to {}.", Integer.valueOf(arrayList.size()), addressArr[0]);
        } else {
            for (DiffElement diffElement2 : arrayList) {
                rspBootstrapSingle(addressArr, diffElement2);
                FineLoggerFactory.getLogger().info("[Resource] Single bootstrap message of {}:{} is send to {}.", diffElement2.getTypeName(), diffElement2.getPath(), addressArr[0]);
            }
        }
        FsyncMessagePublisher.getInstance().alertException("Fine-Core_Resource_bootstrap_inconsistent", ClusterEngineUtils.transformAddress(addressArr[0]).getName(), ResourceSyncConfig.getInstance().getSyncBackupPath());
    }

    private void rspSyncAck(Address[] addressArr, List<DiffElement> list) {
        try {
            this.SYNC_CLUSTER.send(addressArr, new SyncMessage(21, SyncConstants.BOOTSTRAP_SYNC_ACK, (Serializable) list));
        } catch (Exception e) {
            FineLoggerFactory.getLogger().error(e.getMessage(), e);
        }
    }

    private void rspBootstrapBatch(Address[] addressArr, List<DiffElement> list) {
        this.SYNC_CLUSTER.send(addressArr, new SyncMessage(23, SyncConstants.BATCH_MSG_PREFIX + Objects.hash(list), (Serializable) list, true));
    }

    private void rspBootstrapSingle(Address[] addressArr, DiffElement diffElement) {
        this.SYNC_CLUSTER.send(addressArr, new SyncMessage(23, diffElement.getPath(), diffElement, true));
    }

    private boolean checkShouldReceive(String str) {
        return true;
    }

    public void close() {
        this.HANDLING_THREAD_POOL.shutdown();
    }

    @Override // com.fr.third.jgroups.ReceiverAdapter, com.fr.third.jgroups.MembershipListener
    public void viewAccepted(View view) {
        if (view.getMembers().size() > ResourceSyncConfig.getInstance().getNodesLimit()) {
            boolean z = false;
            Iterator<ClusterNode> it = ClusterEngineUtils.translateMembers(view.getMembers()).iterator();
            while (it.hasNext()) {
                if (it.next().getID().equals(ResourceSyncConfig.getInstance().getCalibrationNode())) {
                    z = true;
                }
            }
            if (z) {
                return;
            }
            FineLoggerFactory.getLogger().error("[Resource] Node counts exceed limit counts {}, please use file server instead of file sync.", Integer.valueOf(ResourceSyncConfig.getInstance().getNodesLimit()));
        }
    }
}
