/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.stream.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptTailer;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.support.Exceptions;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.queue.spec.PersistentQueueSpec;
import reactor.rx.Stream;
import reactor.rx.stream.MapStream;
import reactor.rx.subscription.PushSubscription;

public class ChronicleReaderStream<K, V>
extends MapStream<K, V> {
    private final ExecutorService executor;
    protected final String name;
    protected final Codec<Buffer, K, K> keyCodec;
    protected final Codec<Buffer, V, V> valueCodec;
    protected final Chronicle chronicle;
    protected final Map<K, V> localCache = new ConcurrentHashMap();
    protected volatile int consumers = 0;
    protected static final AtomicIntegerFieldUpdater<ChronicleReaderStream> CONSUMER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChronicleReaderStream.class, "consumers");

    public ChronicleReaderStream(String name) throws IOException {
        this(name, ChronicleQueueBuilder.indexed((String)PersistentQueueSpec.DEFAULT_BASE_PATH, (String)name).build());
    }

    public ChronicleReaderStream(String name, Chronicle chronicle) {
        this(name, chronicle, null, null);
    }

    public ChronicleReaderStream(String name, Chronicle chronicle, Codec<Buffer, K, K> keyCodec, Codec<Buffer, V, V> valueCodec) {
        this.executor = Executors.newSingleThreadExecutor((ThreadFactory)new NamedDaemonThreadFactory(name));
        this.keyCodec = keyCodec;
        this.valueCodec = valueCodec;
        this.chronicle = chronicle;
        this.name = name;
    }

    public void subscribe(Subscriber<? super MapStream.Signal<K, V>> s) {
        CONSUMER_UPDATER.incrementAndGet(this);
        try {
            s.onSubscribe((Subscription)new ChronicleSubscription(this, s));
        }
        catch (Throwable t) {
            Exceptions.throwIfFatal((Throwable)t);
            s.onError(t);
        }
    }

    @Override
    public boolean containsKey(Object key) {
        return this.localCache.containsKey(key);
    }

    @Override
    public boolean containsValue(Object value) {
        return this.localCache.containsValue(value);
    }

    @Override
    @NotNull
    public Set<Map.Entry<K, V>> entrySet() {
        return this.localCache.entrySet();
    }

    @Override
    public boolean equals(Object o) {
        return this.localCache.equals(o);
    }

    @Override
    public V get(Object key) {
        return this.localCache.get(key);
    }

    @Override
    public int hashCode() {
        return this.localCache.hashCode();
    }

    @Override
    public boolean isEmpty() {
        return this.localCache.isEmpty();
    }

    @Override
    @NotNull
    public Set<K> keySet() {
        return this.localCache.keySet();
    }

    @Override
    public void clear() {
        throw new UnsupportedOperationException();
    }

    @Override
    public V put(K key, V value) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void putAll(Map<? extends K, ? extends V> m) {
        throw new UnsupportedOperationException();
    }

    @Override
    public V remove(Object key) {
        throw new UnsupportedOperationException();
    }

    @Override
    public int size() {
        return this.localCache.size();
    }

    public Map<K, V> localCache() {
        return this.localCache;
    }

    public Chronicle chronicle() {
        return this.chronicle;
    }

    public Codec<Buffer, K, K> keyCodec() {
        return this.keyCodec;
    }

    public Codec<Buffer, V, V> valueCodec() {
        return this.valueCodec;
    }

    @Override
    @NotNull
    public Collection<V> values() {
        return this.localCache.values();
    }

    @Override
    public String toString() {
        return this.localCache.toString() + "{name=" + this.name + "}";
    }

    class ChronicleSubscription
    extends PushSubscription<MapStream.Signal<K, V>> {
        final MapStream.MutableSignal<K, V> signalContainer;

        public ChronicleSubscription(Stream<MapStream.Signal<K, V>> publisher, Subscriber<? super MapStream.Signal<K, V>> subscriber) {
            super(publisher, subscriber);
            this.signalContainer = new MapStream.MutableSignal();
        }

        @Override
        public void cancel() {
            super.cancel();
            CONSUMER_UPDATER.decrementAndGet(ChronicleReaderStream.this);
        }

        @Override
        protected void onRequest(final long n) {
            ChronicleReaderStream.this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        ExcerptTailer readExcerpt = ChronicleReaderStream.this.chronicle.createTailer();
                        long i = 0L;
                        while ((n == Long.MAX_VALUE || i < n) && ChronicleSubscription.this.terminated == 0) {
                            boolean found = readExcerpt.nextIndex();
                            if (found) {
                                ++i;
                                ChronicleSubscription.this.readExcerpt(readExcerpt);
                                ChronicleSubscription.this.signalContainer.op(MapStream.Operation.put);
                                ChronicleSubscription.this.signalContainer.key(null);
                                ChronicleSubscription.this.signalContainer.value(null);
                                continue;
                            }
                            LockSupport.parkNanos(1L);
                        }
                    }
                    catch (Throwable t) {
                        ChronicleSubscription.this.subscriber.onError(t);
                    }
                }
            });
        }

        private void readExcerpt(ExcerptTailer readExcerpt) {
            long position = readExcerpt.position();
            MapStream.Operation event = (MapStream.Operation)((Object)readExcerpt.readEnum(MapStream.Operation.class));
            if (event == null) {
                readExcerpt.position(position);
                return;
            }
            try {
                switch (event) {
                    case put: {
                        this.onExcerptPut(readExcerpt);
                        return;
                    }
                    case putAll: {
                        int count = readExcerpt.readInt();
                        for (int i = 0; i < count; ++i) {
                            this.onExcerptPut(readExcerpt);
                        }
                        return;
                    }
                    case remove: {
                        this.signalContainer.op(MapStream.Operation.remove);
                        this.signalContainer.key(this.readKey(readExcerpt));
                        break;
                    }
                    case clear: {
                        this.signalContainer.op(MapStream.Operation.clear);
                    }
                }
                this.sync(this.signalContainer);
                this.subscriber.onNext(this.signalContainer);
            }
            catch (Exception e) {
                this.subscriber.onError((Throwable)e);
            }
        }

        private void onExcerptPut(ExcerptTailer excerpt) {
            Object key = this.readKey(excerpt);
            Object value = this.readValue(excerpt);
            this.signalContainer.op(MapStream.Operation.put);
            this.signalContainer.key(key);
            this.signalContainer.value(value);
            this.sync(this.signalContainer);
            this.subscriber.onNext(this.signalContainer);
        }

        private V readValue(ExcerptTailer excerpt) {
            if (ChronicleReaderStream.this.valueCodec == null) {
                return excerpt.readObject();
            }
            int len = excerpt.readInt();
            ByteBuffer bb = ByteBuffer.allocate(len);
            excerpt.read(bb);
            bb.flip();
            return ChronicleReaderStream.this.valueCodec.decoder(null).apply((Object)new Buffer(bb));
        }

        private K readKey(ExcerptTailer excerpt) {
            if (ChronicleReaderStream.this.keyCodec == null) {
                return excerpt.readObject();
            }
            int len = excerpt.readInt();
            ByteBuffer bb = ByteBuffer.allocate(len);
            excerpt.read(bb);
            bb.flip();
            return ChronicleReaderStream.this.keyCodec.decoder(null).apply((Object)new Buffer(bb));
        }

        public void sync(MapStream.Signal<K, V> signal) {
            switch (signal.op()) {
                case put: {
                    ChronicleReaderStream.this.localCache.put(signal.key(), signal.value());
                    break;
                }
                case putAll: {
                    ChronicleReaderStream.this.localCache.put(signal.key(), signal.value());
                    break;
                }
                case remove: {
                    ChronicleReaderStream.this.localCache.remove(signal.key());
                    break;
                }
                case clear: {
                    ChronicleReaderStream.this.localCache.clear();
                }
            }
        }
    }
}

