package com.fr.process.engine.pipe;

import com.fr.concurrent.NamedThreadFactory;
import com.fr.event.Event;
import com.fr.event.Listener;
import com.fr.log.FineLoggerFactory;
import com.fr.process.engine.PipeMonitor;
import com.fr.process.engine.ProcessEventDispatcher;
import com.fr.process.engine.core.CarryMessageEvent;
import com.fr.process.engine.core.FineProcessConstants;
import com.fr.process.engine.core.FineProcessEngineEvent;
import com.fr.process.engine.core.FineProcessEventDispatcher;
import com.fr.stable.StringUtils;
import com.fr.stable.serialize.SerializationUtils;
import com.fr.third.guava.io.BaseEncoding;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:fine-core-10.0.jar:com/fr/process/engine/pipe/FineProcessPipeMonitor.class */
public class FineProcessPipeMonitor implements PipeMonitor {
    private InputStream in;
    private final ExecutorService service = Executors.newFixedThreadPool(2, new NamedThreadFactory("FineProcessPipeMonitor"));
    private final ProcessEventDispatcher processEventDispatcher = new FineProcessEventDispatcher();
    private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue();
    private final BlockingQueue<String> messageQueue = new LinkedBlockingQueue();
    private volatile boolean started = false;
    private Producer producer = new Producer();
    private Consumer consumer = new Consumer();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fine-core-10.0.jar:com/fr/process/engine/pipe/FineProcessPipeMonitor$Consumer.class */
    public class Consumer implements Runnable {
        volatile boolean cancel;

        private Consumer() {
            this.cancel = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.cancel) {
                try {
                    FineProcessPipeMonitor.this.processEventDispatcher.fire((Event) FineProcessPipeMonitor.this.eventQueue.take());
                } catch (InterruptedException e) {
                }
            }
        }

        public void stop() {
            this.cancel = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:fine-core-10.0.jar:com/fr/process/engine/pipe/FineProcessPipeMonitor$Producer.class */
    public class Producer implements Runnable {
        volatile boolean cancel;

        private Producer() {
            this.cancel = false;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.cancel) {
                try {
                    FineProcessPipeMonitor.this.readEvent(FineProcessPipeMonitor.this.in);
                } catch (IOException e) {
                    FineLoggerFactory.getLogger().error(e.getMessage(), e);
                }
            }
        }

        public void stop() {
            this.cancel = true;
        }
    }

    public FineProcessPipeMonitor(InputStream inputStream) {
        this.in = inputStream;
    }

    @Override // com.fr.process.engine.PipeMonitor
    public void start() {
        if (isStarted()) {
            return;
        }
        this.started = true;
        collectEvent();
        autoFire();
    }

    private void collectEvent() {
        this.service.submit(this.producer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void readEvent(InputStream inputStream) throws IOException {
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return;
            }
            if (readLine.length() > FineProcessConstants.HEAD_ARRAY.length && readLine.startsWith(FineProcessConstants.SIMPLE_HEAD)) {
                addEventObject(BaseEncoding.base64().decode(readLine.substring(FineProcessConstants.SIMPLE_HEAD.length())));
            } else if (StringUtils.isNotEmpty(readLine)) {
                System.out.println(readLine);
            }
        }
    }

    private void addEventObject(byte[] bArr) {
        try {
            Object deserialize = SerializationUtils.deserialize(bArr);
            if (deserialize instanceof FineProcessEngineEvent) {
                this.eventQueue.put((Event) deserialize);
            } else {
                this.messageQueue.put(((CarryMessageEvent) deserialize).info());
            }
        } catch (Exception e) {
            FineLoggerFactory.getLogger().error(e.getMessage(), e);
        }
    }

    private void autoFire() {
        this.service.submit(this.consumer);
    }

    @Override // com.fr.process.engine.PipeMonitor
    public void stop() {
        this.started = false;
        this.producer.stop();
        this.consumer.stop();
        this.service.shutdown();
    }

    @Override // com.fr.process.engine.PipeMonitor
    public boolean isStarted() {
        return this.started;
    }

    @Override // com.fr.process.engine.EventListen
    public <T> void listen(Event<T> event, Listener<T> listener) {
        this.processEventDispatcher.listen(event, listener);
    }

    @Override // com.fr.process.engine.MessageInfo
    public String info() {
        try {
            return this.messageQueue.poll(2L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return "";
        }
    }
}
