package com.fr.io.sync.cluster;

import com.fr.cluster.ClusterBridge;
import com.fr.cluster.core.ClusterNode;
import com.fr.cluster.engine.base.ClusterEngineUtils;
import com.fr.cluster.engine.core.jchannel.MachineMarker;
import com.fr.concurrent.NamedThreadFactory;
import com.fr.io.base.msg.FsyncMessagePublisher;
import com.fr.io.sync.config.ResourceSyncConfig;
import com.fr.log.FineLoggerFactory;
import com.fr.stable.ArrayUtils;
import com.fr.stable.Filter;
import com.fr.third.jgroups.Address;
import com.fr.third.jgroups.JChannel;
import com.fr.third.jgroups.Message;
import com.fr.third.jgroups.util.Util;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncCluster.class */
public class ResourceSyncCluster {
    private static final int SENDING_ENTITY_INIT_SIZE = 500;
    private final JChannel J_CHANNEL;
    private final Map<String, SyncEntity> sendingSyncEntities = new ConcurrentHashMap(500);
    private final ExecutorService SEND_POOL = Executors.newFixedThreadPool(ResourceSyncConfig.getInstance().getSendPoolSize(), new NamedThreadFactory("ResourceSyncCluster_Sender"));
    private final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResourceSyncCluster_Checker"));
    private volatile boolean inited;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncCluster$ReceivedCheckList.class */
    public static class ReceivedCheckList {
        private final Address[] dest;
        private final Map<Address, Boolean> checkMap = new ConcurrentHashMap();

        ReceivedCheckList(Address[] addressArr) {
            this.dest = addressArr;
            init();
        }

        void init() {
            if (ArrayUtils.isNotEmpty(this.dest)) {
                for (Address address : this.dest) {
                    if (address != null) {
                        this.checkMap.put(address, Boolean.FALSE);
                    }
                }
            }
        }

        void check(Address address) {
            if (address == null || !this.checkMap.containsKey(address) || this.checkMap.get(address).booleanValue()) {
                return;
            }
            this.checkMap.put(address, Boolean.TRUE);
        }

        boolean allChecked() {
            boolean z = true;
            Iterator<Boolean> it = this.checkMap.values().iterator();
            while (it.hasNext()) {
                z &= it.next().booleanValue();
            }
            return z;
        }

        Address[] getUnchecked() {
            HashSet hashSet = new HashSet();
            for (Map.Entry<Address, Boolean> entry : this.checkMap.entrySet()) {
                if (!entry.getValue().booleanValue()) {
                    hashSet.add(entry.getKey());
                }
            }
            return (Address[]) hashSet.toArray(new Address[0]);
        }

        void reset() {
            Iterator<Map.Entry<Address, Boolean>> it = this.checkMap.entrySet().iterator();
            while (it.hasNext()) {
                it.next().setValue(Boolean.FALSE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncCluster$SendTask.class */
    public class SendTask implements Callable<SyncEntity> {
        final Address[] ADDRESSES;
        final SyncMessage SYNC_MESSAGE;

        SendTask(Address[] addressArr, SyncMessage syncMessage) {
            this.ADDRESSES = addressArr;
            this.SYNC_MESSAGE = syncMessage;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public SyncEntity call() throws Exception {
            SyncEntity syncEntity = new SyncEntity(this.ADDRESSES, this.SYNC_MESSAGE);
            ResourceSyncCluster.this.sendingSyncEntities.put(this.SYNC_MESSAGE.getKey(), syncEntity);
            for (Address address : this.ADDRESSES) {
                if (ResourceSyncCluster.this.validAddress(address)) {
                    ResourceSyncCluster.this.sendMsgTo(address, this.SYNC_MESSAGE);
                } else {
                    syncEntity.confirm(address);
                    FineLoggerFactory.getLogger().error("[Resource] {} is not at current view, ignored.", address);
                }
            }
            return syncEntity;
        }
    }

    /* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncCluster$SendingChecker.class */
    private class SendingChecker implements Runnable {
        private SendingChecker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Iterator it = ResourceSyncCluster.this.sendingSyncEntities.entrySet().iterator();
            while (it.hasNext()) {
                SyncEntity syncEntity = (SyncEntity) ((Map.Entry) it.next()).getValue();
                synchronized (syncEntity) {
                    if (syncEntity.done()) {
                        syncEntity.notifyAll();
                        it.remove();
                    }
                    if (syncEntity.timeout()) {
                        if (syncEntity.shouldRetry()) {
                            FineLoggerFactory.getLogger().info("[Resource] Sync {} timeout, retrying...", syncEntity.getMessage());
                            ResourceSyncCluster.this.send(syncEntity.retry(), syncEntity.getMessage());
                        } else {
                            syncEntity.notifyAll();
                            it.remove();
                            FineLoggerFactory.getLogger().error("[Resource] Send {} failed after retried.", syncEntity.getMessage());
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncCluster$SyncEntity.class */
    public static class SyncEntity implements Serializable {
        private static final long serialVersionUID = -3336965425459265677L;
        private final ReceivedCheckList CHECK_LIST;
        private final SyncMessage message;
        private final long SEND_TIMEOUT = ResourceSyncConfig.getInstance().getSendTimeout();
        private final long RETRY_INTERVAL = ResourceSyncConfig.getInstance().getRetryInterval();
        private volatile int retryCount = ResourceSyncConfig.getInstance().getRetryCount();
        private volatile long start = System.currentTimeMillis();

        SyncEntity(Address[] addressArr, SyncMessage syncMessage) {
            this.CHECK_LIST = new ReceivedCheckList(addressArr);
            this.message = syncMessage;
        }

        void confirm(Address address) {
            synchronized (this) {
                this.CHECK_LIST.check(address);
                if (done()) {
                    notifyAll();
                }
            }
        }

        boolean done() {
            boolean allChecked;
            synchronized (this) {
                allChecked = this.CHECK_LIST.allChecked();
            }
            return allChecked;
        }

        Address[] retry() {
            Address[] unchecked;
            synchronized (this) {
                FineLoggerFactory.getLogger().info("[Resource] {}ms later send will retry.", Long.valueOf(this.RETRY_INTERVAL));
                Util.sleep(this.RETRY_INTERVAL);
                reset();
                this.retryCount--;
                unchecked = this.CHECK_LIST.getUnchecked();
            }
            return unchecked;
        }

        boolean shouldRetry() {
            return this.retryCount > 0;
        }

        boolean started() {
            return this.start >= System.currentTimeMillis();
        }

        boolean timeout() {
            boolean z;
            synchronized (this) {
                z = !done() && System.currentTimeMillis() - this.start > this.SEND_TIMEOUT;
            }
            return z;
        }

        private void reset() {
            synchronized (this) {
                this.start = System.currentTimeMillis();
            }
        }

        public SyncMessage getMessage() {
            return this.message;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fine-core-10.0.jar:com/fr/io/sync/cluster/ResourceSyncCluster$SyncFuture.class */
    public static class SyncFuture implements Future<Boolean> {
        private static final long SYNC_RETRY_TIMEOUT = 60000;
        private final SyncEntity syncEntity;

        SyncFuture(SyncEntity syncEntity) {
            this.syncEntity = syncEntity;
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return this.syncEntity.done();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public Boolean get() throws InterruptedException, ExecutionException {
            try {
                return get(60000L, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                return false;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Future
        public Boolean get(long j, @NotNull TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            long currentTimeMillis = System.currentTimeMillis();
            long millis = timeUnit.toMillis(j);
            while (true) {
                synchronized (this.syncEntity) {
                    if (System.currentTimeMillis() - currentTimeMillis > millis) {
                        FineLoggerFactory.getLogger().error("[Resource] Timeout to retrieve sync result of {}.", this.syncEntity.getMessage());
                        return false;
                    }
                    if (this.syncEntity.timeout() && !this.syncEntity.shouldRetry()) {
                        FineLoggerFactory.getLogger().error("[Resource] Timeout to retrieve sync result of {} after retried.", this.syncEntity.getMessage());
                        FsyncMessagePublisher.getInstance().alertException("Fine-Core_Resource_sync_failed", MachineMarker.currentID());
                        return false;
                    }
                    if (this.syncEntity.done()) {
                        if (this.syncEntity.getMessage().getEvent() == 20) {
                            FineLoggerFactory.getLogger().info("[Resource] Support request {} success.", this.syncEntity.getMessage());
                        } else {
                            FineLoggerFactory.getLogger().info("[Resource] Sync {} to cluster success.", this.syncEntity.getMessage());
                        }
                        return true;
                    }
                    this.syncEntity.wait();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceSyncCluster(JChannel jChannel) {
        this.J_CHANNEL = jChannel;
        this.SCHEDULER.scheduleAtFixedRate(new SendingChecker(), 0L, ResourceSyncConfig.getInstance().getSendingCheckInterval(), TimeUnit.MILLISECONDS);
        this.inited = true;
        FineLoggerFactory.getLogger().info("[Resource] ResourceSyncCluster inited!");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Future<Boolean> sendWithFuture(Address[] addressArr, SyncMessage syncMessage) {
        if (!this.inited) {
            FineLoggerFactory.getLogger().error("[Resource] ResourceSyncCluster not inited!");
            return new NoneFuture();
        }
        if (addressArr == null) {
            addressArr = getDestinations();
        }
        if (addressArr.length != 0) {
            return replicate(addressArr, syncMessage);
        }
        if (FineLoggerFactory.getLogger().isDebugEnabled()) {
            FineLoggerFactory.getLogger().debug("[Resource] No siblings to sync future for {}.", syncMessage);
        }
        return new NoneFuture();
    }

    private Future<Boolean> replicate(Address[] addressArr, SyncMessage syncMessage) {
        try {
            return new SyncFuture((SyncEntity) this.SEND_POOL.submit(new SendTask(addressArr, syncMessage)).get());
        } catch (Exception e) {
            FineLoggerFactory.getLogger().error(e.getMessage(), e);
            return new NoneFuture();
        }
    }

    public final void send(Address[] addressArr, SyncMessage syncMessage) {
        if (syncMessage == null) {
            FineLoggerFactory.getLogger().error("[Resource] syncMessage should not be null.");
            return;
        }
        if (addressArr == null) {
            addressArr = getDestinations();
        }
        if (addressArr.length == 0) {
            FineLoggerFactory.getLogger().info("[Resource] No siblings to sync for {}", syncMessage);
            return;
        }
        for (Address address : addressArr) {
            if (validAddress(address)) {
                sendMsgTo(address, syncMessage);
            } else {
                FineLoggerFactory.getLogger().error("[Resource] {} is not at current view", address);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendMsgTo(Address address, SyncMessage syncMessage) {
        try {
            this.J_CHANNEL.send(new Message(address, this.J_CHANNEL.getAddress(), Util.objectToByteBuffer(syncMessage)));
            if (FineLoggerFactory.getLogger().isDebugEnabled()) {
                FineLoggerFactory.getLogger().debug("[Resource] {} is send to {}.", syncMessage, address);
            }
        } catch (Exception e) {
            FineLoggerFactory.getLogger().error(e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void confirmReceived(SyncMessage syncMessage, Address address) {
        SyncEntity syncEntity = this.sendingSyncEntities.get(syncMessage.getKey());
        if (syncEntity != null) {
            syncEntity.confirm(address);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean validAddress(Address address) {
        return this.J_CHANNEL.getView().containsMember(address);
    }

    private Address[] getDestinations() {
        List<ClusterNode> listNodes = ClusterBridge.getView().listNodes(new Filter<ClusterNode>() { // from class: com.fr.io.sync.cluster.ResourceSyncCluster.1
            @Override // com.fr.stable.Filter
            public boolean accept(ClusterNode clusterNode) {
                return !clusterNode.equals(ClusterBridge.getView().getCurrent());
            }
        });
        ArrayList arrayList = new ArrayList();
        Iterator<ClusterNode> it = listNodes.iterator();
        while (it.hasNext()) {
            arrayList.add(ClusterEngineUtils.transformAddress(it.next(), this.J_CHANNEL.getView().getMembers()));
        }
        return (Address[]) arrayList.toArray(new Address[0]);
    }

    public void close() {
        this.J_CHANNEL.close();
        this.SEND_POOL.shutdown();
        this.SCHEDULER.shutdown();
        this.sendingSyncEntities.clear();
        this.inited = false;
        FineLoggerFactory.getLogger().info("[Resource] ResourceSyncCluster closed!");
    }
}
