/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.recipes.core.export;

import com.google.common.collect.Iterators;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.fluo.api.observer.Observer;
import org.apache.fluo.recipes.core.export.ExportBucket;
import org.apache.fluo.recipes.core.export.ExportEntry;
import org.apache.fluo.recipes.core.export.ExportQueue;
import org.apache.fluo.recipes.core.export.Exporter;
import org.apache.fluo.recipes.core.export.SequencedExport;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;

public class ExportObserver<K, V>
extends AbstractObserver {
    private String queueId;
    private Class<K> keyType;
    private Class<V> valType;
    SimpleSerializer serializer;
    private Exporter<K, V> exporter;
    private long memLimit;

    protected String getQueueId() {
        return this.queueId;
    }

    SimpleSerializer getSerializer() {
        return this.serializer;
    }

    public void init(final Observer.Context context) throws Exception {
        this.queueId = context.getObserverConfiguration().getString("queueId");
        final ExportQueue.Options opts = new ExportQueue.Options(this.queueId, context.getAppConfiguration());
        this.keyType = ((Object)((Object)this)).getClass().getClassLoader().loadClass(opts.keyType);
        this.valType = ((Object)((Object)this)).getClass().getClassLoader().loadClass(opts.valueType);
        this.exporter = ((Object)((Object)this)).getClass().getClassLoader().loadClass(opts.exporterType).asSubclass(Exporter.class).newInstance();
        this.serializer = SimpleSerializer.getInstance(context.getAppConfiguration());
        this.memLimit = opts.getBufferSize();
        this.exporter.init(new Exporter.Context(){

            @Override
            public String getQueueId() {
                return ExportObserver.this.queueId;
            }

            @Override
            public SimpleConfiguration getExporterConfiguration() {
                return opts.getExporterConfiguration();
            }

            @Override
            public Observer.Context getObserverContext() {
                return context;
            }
        });
    }

    public Observer.ObservedColumn getObservedColumn() {
        return new Observer.ObservedColumn(ExportBucket.newNotificationColumn(this.queueId), Observer.NotificationType.WEAK);
    }

    public void process(TransactionBase tx, Bytes row, Column column) throws Exception {
        ExportBucket bucket = new ExportBucket(tx, row);
        Bytes continueRow = bucket.getContinueRow();
        Iterator<ExportEntry> input = bucket.getExportIterator(continueRow);
        MemLimitIterator memLimitIter = new MemLimitIterator(input, this.memLimit, 8 + this.queueId.length());
        Iterator exportIterator = Iterators.transform((Iterator)memLimitIter, ee -> new SequencedExport<K, V>(this.serializer.deserialize(ee.key, this.keyType), this.serializer.deserialize(ee.value, this.valType), ee.seq));
        exportIterator = Iterators.consumingIterator((Iterator)exportIterator);
        this.exporter.processExports(exportIterator);
        if (input.hasNext() || continueRow != null) {
            bucket.notifyExportObserver();
        }
        if (input.hasNext() && !memLimitIter.hasNext()) {
            bucket.setContinueRow(input.next());
            continueRow = null;
        }
        if (continueRow != null) {
            bucket.clearContinueRow();
        }
    }

    private static class MemLimitIterator
    implements Iterator<ExportEntry> {
        private long memConsumed = 0L;
        private long memLimit;
        private int extraPerKey;
        private Iterator<ExportEntry> source;

        public MemLimitIterator(Iterator<ExportEntry> input, long limit, int extraPerKey) {
            this.source = input;
            this.memLimit = limit;
            this.extraPerKey = extraPerKey;
        }

        @Override
        public boolean hasNext() {
            return this.memConsumed < this.memLimit && this.source.hasNext();
        }

        @Override
        public ExportEntry next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            ExportEntry ee = this.source.next();
            this.memConsumed += (long)(ee.key.length + this.extraPerKey + ee.value.length);
            return ee;
        }

        @Override
        public void remove() {
            this.source.remove();
        }
    }
}

