package com.fr.swift.service.proxy.handler;

import com.fr.swift.basics.AsyncRpcCallback;
import com.fr.swift.basics.RpcFuture;
import com.fr.swift.basics.base.SwiftInvocation;
import com.fr.swift.basics.base.selector.UrlSelector;
import com.fr.swift.config.entity.SwiftSegmentLocationEntity;
import com.fr.swift.config.service.SwiftSegmentLocationService;
import com.fr.swift.context.SwiftContext;
import com.fr.swift.local.LocalUrl;
import com.fr.swift.log.SwiftLoggers;
import com.fr.swift.property.SwiftProperty;
import com.fr.swift.proxy.InvokerCreator;
import com.fr.swift.proxy.URL;
import com.fr.swift.proxy.annotation.RegisteredHandler;
import com.fr.swift.proxy.annotation.Target;
import com.fr.swift.proxy.handler.BaseProcessHandler;
import com.fr.swift.proxy.handler.QueryableProcessHandler;
import com.fr.swift.query.info.bean.query.QueryInfoBean;
import com.fr.swift.query.query.QueryBean;
import com.fr.swift.query.query.QueryType;
import com.fr.swift.query.result.SerializedQueryResultMerger;
import com.fr.swift.result.serialize.BaseSerializedQueryResultSet;
import com.fr.swift.segment.container.Segment2RepairContainer;
import com.fr.swift.service.proxy.handler.resultset.MergedResultSet;
import com.fr.swift.source.SourceKey;
import com.fr.swift.source.SwiftResultSet;
import com.fr.swift.structure.Pair;
import com.fr.swift.util.JsonBuilder;
import com.fr.third.springframework.context.annotation.Scope;
import com.fr.third.springframework.stereotype.Service;
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Scope("prototype")
@Service
@RegisteredHandler(QueryableProcessHandler.class)
/* loaded from: input_file:com/fr/swift/service/proxy/handler/SwiftQueryableProcessHandler.class */
class SwiftQueryableProcessHandler extends BaseProcessHandler implements QueryableProcessHandler {
    public static SwiftSegmentLocationService LOCATION_SERVICE = (SwiftSegmentLocationService) SwiftContext.get().getBean(SwiftSegmentLocationService.class);

    public SwiftQueryableProcessHandler(InvokerCreator invokerCreator) {
        super(invokerCreator);
    }

    @Override // com.fr.swift.proxy.handler.BaseProcessHandler, com.fr.swift.proxy.ProcessHandler
    public SwiftResultSet processResult(final Method method, Target[] targetArr, Object... objArr) throws Throwable {
        final Class declaringClass = method.getDeclaringClass();
        Class[] parameterTypes = method.getParameterTypes();
        String name = method.getName();
        final List copyOnWriteArrayList = new CopyOnWriteArrayList();
        QueryBean queryBean = (QueryBean) JsonBuilder.readValue((String) objArr[0], QueryInfoBean.class);
        Object sourceKey = new SourceKey(queryBean.getTableName());
        QueryType queryType = queryBean.getQueryType();
        switch (queryBean.getQueryType()) {
            case GROUP:
            case CROSS_GROUP:
                queryBean.setQueryType(QueryType.LOCAL_GROUP_PART);
                break;
            default:
                queryBean.setQueryType(QueryType.LOCAL_DETAIL);
                break;
        }
        List<Pair<URL, String>> processUrl = processUrl(targetArr, queryBean, sourceKey);
        if (processUrl.isEmpty()) {
            queryBean.setQueryType(queryType);
            return new MergedResultSet(SerializedQueryResultMerger.merger(new ArrayList(), queryBean), queryBean);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(processUrl.size());
        for (final Pair<URL, String> pair : processUrl) {
            ((RpcFuture) invoke(this.invokerCreator.createAsyncInvoker(declaringClass, pair.getKey()), declaringClass, method, name, parameterTypes, pair.getValue())).addCallback(new AsyncRpcCallback() { // from class: com.fr.swift.service.proxy.handler.SwiftQueryableProcessHandler.1
                @Override // com.fr.swift.basics.AsyncRpcCallback
                public void success(Object obj) {
                    try {
                        BaseSerializedQueryResultSet baseSerializedQueryResultSet = (BaseSerializedQueryResultSet) obj;
                        baseSerializedQueryResultSet.setInvoker(new BaseSerializedQueryResultSet.SyncInvoker() { // from class: com.fr.swift.service.proxy.handler.SwiftQueryableProcessHandler.1.1
                            @Override // com.fr.swift.result.serialize.BaseSerializedQueryResultSet.SyncInvoker
                            public <D> BaseSerializedQueryResultSet<D> invoke() {
                                try {
                                    return (BaseSerializedQueryResultSet) SwiftQueryableProcessHandler.this.invokerCreator.createSyncInvoker(declaringClass, (URL) pair.getKey()).invoke(new SwiftInvocation(method, new Object[]{pair.getValue()})).recreate();
                                } catch (Throwable th) {
                                    throw new RuntimeException(th);
                                }
                            }
                        });
                        copyOnWriteArrayList.add(baseSerializedQueryResultSet);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        countDownLatch.countDown();
                        throw th;
                    }
                }

                @Override // com.fr.swift.basics.AsyncRpcCallback
                public void fail(Exception exc) {
                    try {
                        SwiftLoggers.getLogger().error("Remote invoke error:", exc);
                        Segment2RepairContainer.INSTANCE.addAfterParseException(exc);
                    } finally {
                        countDownLatch.countDown();
                    }
                }
            });
        }
        countDownLatch.await(3L, TimeUnit.MINUTES);
        queryBean.setQueryType(queryType);
        return (SwiftResultSet) mergeResult(copyOnWriteArrayList, queryBean);
    }

    @Override // com.fr.swift.proxy.handler.BaseProcessHandler
    protected Object mergeResult(List list, Object... objArr) {
        try {
            return new MergedResultSet(SerializedQueryResultMerger.merger(list, (QueryBean) objArr[0]), (QueryBean) objArr[0]);
        } catch (SQLException e) {
            SwiftLoggers.getLogger().error("Query error: when merge results from nodes " + e);
            return null;
        }
    }

    @Override // com.fr.swift.proxy.handler.AbstractProcessHandler
    public List<Pair<URL, String>> processUrl(Target[] targetArr, Object... objArr) throws Exception {
        QueryBean queryBean = (QueryBean) objArr[0];
        SourceKey sourceKey = (SourceKey) objArr[1];
        if (!((SwiftProperty) SwiftContext.get().getBean(SwiftProperty.class)).isCluster()) {
            List<SwiftSegmentLocationEntity> tableSegsByClusterId = LOCATION_SERVICE.getTableSegsByClusterId(sourceKey, ((SwiftProperty) SwiftContext.get().getBean(SwiftProperty.class)).getClusterId());
            HashSet hashSet = new HashSet();
            Iterator<SwiftSegmentLocationEntity> it = tableSegsByClusterId.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getSegmentId());
            }
            if (hashSet.isEmpty()) {
                return new ArrayList();
            }
            queryBean.setQuerySegments(hashSet);
            return Collections.singletonList(new Pair(new LocalUrl(), JsonBuilder.writeJsonString(queryBean)));
        }
        ArrayList arrayList = new ArrayList();
        Collection<String> urlFilter = this.invokerCreator.urlFilter(((SwiftProperty) SwiftContext.get().getBean(SwiftProperty.class)).getOnlineNodes());
        HashSet hashSet2 = new HashSet();
        for (String str : urlFilter) {
            List<SwiftSegmentLocationEntity> tableSegsByClusterId2 = LOCATION_SERVICE.getTableSegsByClusterId(sourceKey, str);
            HashSet hashSet3 = new HashSet();
            for (SwiftSegmentLocationEntity swiftSegmentLocationEntity : tableSegsByClusterId2) {
                if (swiftSegmentLocationEntity.getSegmentId().contains("@MEMORY@")) {
                    hashSet3.add(swiftSegmentLocationEntity.getSegmentId());
                } else if (!hashSet2.contains(swiftSegmentLocationEntity.getSegmentId())) {
                    hashSet3.add(swiftSegmentLocationEntity.getSegmentId());
                    hashSet2.add(swiftSegmentLocationEntity.getSegmentId());
                }
            }
            if (!hashSet3.isEmpty()) {
                queryBean.setQuerySegments(hashSet3);
                arrayList.add(new Pair(UrlSelector.getInstance().getFactory().getURL(str), JsonBuilder.writeJsonString(queryBean)));
            }
        }
        return arrayList;
    }
}
