/*
 * Decompiled with CFR 0.152.
 */
package org.apache.fluo.recipes.core.export;

import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.regex.Pattern;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.config.SimpleConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.recipes.core.common.RowRange;
import org.apache.fluo.recipes.core.common.TableOptimizations;
import org.apache.fluo.recipes.core.common.TransientRegistry;
import org.apache.fluo.recipes.core.export.Export;
import org.apache.fluo.recipes.core.export.ExportBucket;
import org.apache.fluo.recipes.core.export.ExportObserver;
import org.apache.fluo.recipes.core.export.Exporter;
import org.apache.fluo.recipes.core.serialization.SimpleSerializer;

public class ExportQueue<K, V> {
    private static final String RANGE_BEGIN = "#";
    private static final String RANGE_END = ":~";
    private int numBuckets;
    private SimpleSerializer serializer;
    private String queueId;

    ExportQueue(Options opts, SimpleSerializer serializer) throws Exception {
        this.queueId = opts.queueId;
        this.numBuckets = opts.numBuckets;
        this.serializer = serializer;
    }

    public void add(TransactionBase tx, K key, V value) {
        this.addAll(tx, Collections.singleton(new Export<K, V>(key, value)).iterator());
    }

    public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) {
        HashSet<Integer> bucketsNotified = new HashSet<Integer>();
        while (exports.hasNext()) {
            Export<K, V> export = exports.next();
            byte[] k = this.serializer.serialize(export.getKey());
            byte[] v = this.serializer.serialize(export.getValue());
            int hash = Hashing.murmur3_32().hashBytes(k).asInt();
            int bucketId = Math.abs(hash % this.numBuckets);
            ExportBucket bucket = new ExportBucket(tx, this.queueId, bucketId, this.numBuckets);
            bucket.add(tx.getStartTimestamp(), k, v);
            if (bucketsNotified.contains(bucketId)) continue;
            bucket.notifyExportObserver();
            bucketsNotified.add(bucketId);
        }
    }

    public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId, SimpleConfiguration appConfig) {
        Options opts = new Options(exportQueueId, appConfig);
        try {
            return new ExportQueue(opts, SimpleSerializer.getInstance(appConfig));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void configure(FluoConfiguration fluoConfig, Options opts) {
        SimpleConfiguration appConfig = fluoConfig.getAppConfiguration();
        opts.save(appConfig);
        fluoConfig.addObserver(new ObserverSpecification(ExportObserver.class.getName(), Collections.singletonMap("queueId", opts.queueId)));
        Bytes exportRangeStart = Bytes.of((String)(opts.queueId + RANGE_BEGIN));
        Bytes exportRangeStop = Bytes.of((String)(opts.queueId + RANGE_END));
        new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue." + opts.queueId, new RowRange(exportRangeStart, exportRangeStop));
        TableOptimizations.registerOptimization(appConfig, opts.queueId, Optimizer.class);
    }

    public static class Options {
        private static final String PREFIX = "recipes.exportQueue.";
        static final long DEFAULT_BUFFER_SIZE = 0x100000L;
        static final int DEFAULT_BUCKETS_PER_TABLET = 10;
        int numBuckets;
        Integer bucketsPerTablet = null;
        Long bufferSize;
        String keyType;
        String valueType;
        String exporterType;
        String queueId;
        SimpleConfiguration exporterConfig;

        Options(String queueId, SimpleConfiguration appConfig) {
            this.queueId = queueId;
            this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets");
            this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter");
            this.keyType = appConfig.getString(PREFIX + queueId + ".key");
            this.valueType = appConfig.getString(PREFIX + queueId + ".val");
            this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", 0x100000L);
            this.bucketsPerTablet = appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", 10);
            this.exporterConfig = appConfig.subset(PREFIX + queueId + ".exporterCfg");
        }

        public Options(String queueId, String exporterType, String keyType, String valueType, int buckets) {
            Preconditions.checkArgument((buckets > 0 ? 1 : 0) != 0);
            this.queueId = queueId;
            this.numBuckets = buckets;
            this.exporterType = exporterType;
            this.keyType = keyType;
            this.valueType = valueType;
        }

        public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter, Class<K> keyType, Class<V> valueType, int buckets) {
            this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets);
        }

        public Options setBufferSize(long bufferSize) {
            Preconditions.checkArgument((bufferSize > 0L ? 1 : 0) != 0, (Object)"Buffer size must be positive");
            this.bufferSize = bufferSize;
            return this;
        }

        long getBufferSize() {
            if (this.bufferSize == null) {
                return 0x100000L;
            }
            return this.bufferSize;
        }

        public Options setBucketsPerTablet(int bucketsPerTablet) {
            Preconditions.checkArgument((bucketsPerTablet > 0 ? 1 : 0) != 0, (Object)("bucketsPerTablet is <= 0 : " + bucketsPerTablet));
            this.bucketsPerTablet = bucketsPerTablet;
            return this;
        }

        int getBucketsPerTablet() {
            if (this.bucketsPerTablet == null) {
                return 10;
            }
            return this.bucketsPerTablet;
        }

        public Options setExporterConfiguration(SimpleConfiguration config) {
            Objects.requireNonNull(config);
            this.exporterConfig = config;
            return this;
        }

        public SimpleConfiguration getExporterConfiguration() {
            if (this.exporterConfig == null) {
                return new SimpleConfiguration();
            }
            return this.exporterConfig;
        }

        public String getQueueId() {
            return this.queueId;
        }

        void save(SimpleConfiguration appConfig) {
            appConfig.setProperty(PREFIX + this.queueId + ".buckets", this.numBuckets + "");
            appConfig.setProperty(PREFIX + this.queueId + ".exporter", this.exporterType + "");
            appConfig.setProperty(PREFIX + this.queueId + ".key", this.keyType);
            appConfig.setProperty(PREFIX + this.queueId + ".val", this.valueType);
            if (this.bufferSize != null) {
                appConfig.setProperty(PREFIX + this.queueId + ".bufferSize", this.bufferSize);
            }
            if (this.bucketsPerTablet != null) {
                appConfig.setProperty(PREFIX + this.queueId + ".bucketsPerTablet", this.bucketsPerTablet);
            }
            if (this.exporterConfig != null) {
                Iterator keys = this.exporterConfig.getKeys();
                while (keys.hasNext()) {
                    String key = (String)keys.next();
                    appConfig.setProperty(PREFIX + this.queueId + ".exporterCfg." + key, this.exporterConfig.getRawString(key));
                }
            }
        }
    }

    public static class Optimizer
    implements TableOptimizations.TableOptimizationsFactory {
        @Override
        public TableOptimizations getTableOptimizations(String queueId, SimpleConfiguration appConfig) {
            Options opts = new Options(queueId, appConfig);
            ArrayList<Bytes> splits = new ArrayList<Bytes>();
            Bytes exportRangeStart = Bytes.of((String)(opts.queueId + ExportQueue.RANGE_BEGIN));
            Bytes exportRangeStop = Bytes.of((String)(opts.queueId + ExportQueue.RANGE_END));
            splits.add(exportRangeStart);
            splits.add(exportRangeStop);
            ArrayList<Bytes> exportSplits = new ArrayList<Bytes>();
            for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) {
                exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets));
            }
            Collections.sort(exportSplits);
            splits.addAll(exportSplits);
            TableOptimizations tableOptim = new TableOptimizations();
            tableOptim.setSplits(splits);
            tableOptim.setTabletGroupingRegex(Pattern.quote(queueId + ":"));
            return tableOptim;
        }
    }
}

