package com.fr.swift.adaptor.log;

import com.fr.swift.adaptor.log.filter.FilterResultSet;
import com.fr.swift.context.SwiftContext;
import com.fr.swift.db.Database;
import com.fr.swift.db.Table;
import com.fr.swift.db.impl.SwiftDatabase;
import com.fr.swift.executor.task.TaskRouter;
import com.fr.swift.log.SwiftLoggers;
import com.fr.swift.service.RealtimeService;
import com.fr.swift.service.SwiftCoreService;
import com.fr.swift.source.SourceKey;
import com.fr.swift.util.JpaAdaptor;
import com.fr.swift.util.concurrent.PoolThreadFactory;
import com.fr.swift.util.concurrent.SwiftExecutors;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:fine-swift-log-adaptor-10.0.jar:com/fr/swift/adaptor/log/BufferedInsert.class */
public class BufferedInsert implements Runnable, SwiftCoreService {
    private BlockingQueue<List<Object>> dataQueue = new ArrayBlockingQueue(1000);
    private ExecutorService executorService;
    private RealtimeService realtimeService;
    private Database db;

    @Override // com.fr.swift.service.SwiftCoreService
    public void start() {
        this.db = SwiftDatabase.getInstance();
        this.realtimeService = (RealtimeService) SwiftContext.get().getBean("swiftRealtimeService", RealtimeService.class);
        this.executorService = SwiftExecutors.newSingleThreadExecutor(new PoolThreadFactory(getClass()));
        this.executorService.execute(this);
    }

    @Override // com.fr.swift.service.SwiftCoreService
    public void stop() {
        this.dataQueue.clear();
        this.executorService.shutdownNow();
        this.executorService = null;
        this.db = null;
        this.realtimeService = null;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                List<Object> take = this.dataQueue.take();
                Class<?> cls = take.get(0).getClass();
                Table table = this.db.getTable(new SourceKey(JpaAdaptor.getTableName(cls)));
                FilterResultSet filterResultSet = new FilterResultSet(new LogRowSet(table.getMeta(), take, cls));
                if (TaskRouter.getInstance().getIdleTasks().size() <= 3000) {
                    this.realtimeService.insert(table.getSourceKey(), filterResultSet);
                    SwiftLoggers.getLogger().debug("Take {} data size: {}", table.getSourceKey(), Integer.valueOf(take.size()));
                } else {
                    SwiftLoggers.getLogger().debug("TaskRouter oversized and reject data size: {}", Integer.valueOf(take.size()));
                }
                MemoryState.addAndGetUsedMemorySize(-(MemoryState.getObjectSize(take.get(0).getClass()).longValue() * take.size()));
            } catch (InterruptedException e) {
                return;
            } catch (Exception e2) {
                SwiftLoggers.getLogger().error(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void submit(Class<?> cls, List<Object> list) {
        if (this.dataQueue.offer(list)) {
            return;
        }
        SwiftLoggers.getLogger().warn("swift BufferedInsert rejected {} {}", Integer.valueOf(list.size()), cls.getSimpleName());
    }
}
