/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KGroupedTableImpl;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KStreamForeach;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KTableFilter;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableJoinMerger;
import org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableOuterJoin;
import org.apache.kafka.streams.kstream.internals.KTableKTableRightJoin;
import org.apache.kafka.streams.kstream.internals.KTableMapValues;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableRepartitionMap;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.KTableStoreSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.KeyValuePrinter;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;

public class KTableImpl<K, S, V>
extends AbstractStream<K>
implements KTable<K, V> {
    private static final String FILTER_NAME = "KTABLE-FILTER-";
    private static final String FOREACH_NAME = "KTABLE-FOREACH-";
    public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
    public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
    public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
    public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
    private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
    public static final String MERGE_NAME = "KTABLE-MERGE-";
    public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
    public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String SELECT_NAME = "KTABLE-SELECT-";
    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
    private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
    public final ProcessorSupplier<?, ?> processorSupplier;
    private final Serde<K> keySerde;
    private final Serde<V> valSerde;
    private boolean sendOldValues = false;

    public KTableImpl(KStreamBuilder topology, String name, ProcessorSupplier<?, ?> processorSupplier, Set<String> sourceNodes) {
        this(topology, name, processorSupplier, sourceNodes, null, null);
    }

    public KTableImpl(KStreamBuilder topology, String name, ProcessorSupplier<?, ?> processorSupplier, Set<String> sourceNodes, Serde<K> keySerde, Serde<V> valSerde) {
        super(topology, name, sourceNodes);
        this.processorSupplier = processorSupplier;
        this.keySerde = keySerde;
        this.valSerde = valSerde;
    }

    @Override
    public KTable<K, V> filter(Predicate<K, V> predicate) {
        String name = this.topology.newName(FILTER_NAME);
        KTableFilter<K, V> processorSupplier = new KTableFilter<K, V>(this, predicate, false);
        this.topology.addProcessor(name, processorSupplier, this.name);
        return new KTableImpl<K, S, V>(this.topology, name, processorSupplier, this.sourceNodes);
    }

    @Override
    public KTable<K, V> filterNot(Predicate<K, V> predicate) {
        String name = this.topology.newName(FILTER_NAME);
        KTableFilter<K, V> processorSupplier = new KTableFilter<K, V>(this, predicate, true);
        this.topology.addProcessor(name, processorSupplier, this.name);
        return new KTableImpl<K, S, V>(this.topology, name, processorSupplier, this.sourceNodes);
    }

    @Override
    public <V1> KTable<K, V1> mapValues(ValueMapper<V, V1> mapper) {
        String name = this.topology.newName(MAPVALUES_NAME);
        KTableMapValues processorSupplier = new KTableMapValues(this, mapper);
        this.topology.addProcessor(name, processorSupplier, this.name);
        return new KTableImpl<K, S, V>(this.topology, name, processorSupplier, this.sourceNodes);
    }

    @Override
    public void print() {
        this.print(null, null);
    }

    @Override
    public void print(Serde<K> keySerde, Serde<V> valSerde) {
        String name = this.topology.newName(PRINTING_NAME);
        this.topology.addProcessor(name, new KeyValuePrinter(keySerde, valSerde), this.name);
    }

    @Override
    public void writeAsText(String filePath) {
        this.writeAsText(filePath, null, null);
    }

    @Override
    public void writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde) {
        String name = this.topology.newName(PRINTING_NAME);
        try {
            PrintStream printStream = new PrintStream(new FileOutputStream(filePath));
            this.topology.addProcessor(name, new KeyValuePrinter(printStream, keySerde, valSerde), this.name);
        }
        catch (FileNotFoundException e) {
            String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage();
            throw new TopologyBuilderException(message);
        }
    }

    @Override
    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
        this.to(keySerde, valSerde, partitioner, topic);
        return this.topology.table(keySerde, valSerde, topic);
    }

    @Override
    public void foreach(final ForeachAction<K, V> action) {
        String name = this.topology.newName(FOREACH_NAME);
        KStreamForeach processorSupplier = new KStreamForeach(new ForeachAction<K, Change<V>>(){

            @Override
            public void apply(K key, Change<V> value) {
                action.apply(key, value.newValue);
            }
        });
        this.topology.addProcessor(name, processorSupplier, this.name);
    }

    @Override
    public KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        return this.through(keySerde, valSerde, null, topic);
    }

    @Override
    public KTable<K, V> through(StreamPartitioner<K, V> partitioner, String topic) {
        return this.through(null, null, partitioner, topic);
    }

    @Override
    public KTable<K, V> through(String topic) {
        return this.through(null, null, null, topic);
    }

    @Override
    public void to(String topic) {
        this.to(null, null, null, topic);
    }

    @Override
    public void to(StreamPartitioner<K, V> partitioner, String topic) {
        this.to(null, null, partitioner, topic);
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, String topic) {
        this.toStream().to(keySerde, valSerde, null, topic);
    }

    @Override
    public void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic) {
        this.toStream().to(keySerde, valSerde, partitioner, topic);
    }

    @Override
    public KStream<K, V> toStream() {
        String name = this.topology.newName(TOSTREAM_NAME);
        this.topology.addProcessor(name, new KStreamMapValues(new ValueMapper<Change<V>, V>(){

            @Override
            public V apply(Change<V> change) {
                return change.newValue;
            }
        }), this.name);
        return new KStreamImpl(this.topology, name, this.sourceNodes);
    }

    @Override
    public <K1> KStream<K1, V> toStream(KeyValueMapper<K, V, K1> mapper) {
        return this.toStream().selectKey(mapper);
    }

    @Override
    public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String joinThisName = this.topology.newName(JOINTHIS_NAME);
        String joinOtherName = this.topology.newName(JOINOTHER_NAME);
        String joinMergeName = this.topology.newName(MERGE_NAME);
        KTableKTableJoin joinThis = new KTableKTableJoin(this, (KTableImpl)other, joiner);
        KTableKTableJoin joinOther = new KTableKTableJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        KTableKTableJoinMerger<K, V> joinMerge = new KTableKTableJoinMerger<K, V>(new KTableImpl<K, S, V>(this.topology, joinThisName, joinThis, this.sourceNodes), new KTableImpl<K, S, V>(this.topology, joinOtherName, joinOther, ((KTableImpl)other).sourceNodes));
        this.topology.addProcessor(joinThisName, joinThis, this.name);
        this.topology.addProcessor(joinOtherName, joinOther, ((KTableImpl)other).name);
        this.topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
        return new KTableImpl<K, S, V>(this.topology, joinMergeName, joinMerge, allSourceNodes);
    }

    @Override
    public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String joinThisName = this.topology.newName(OUTERTHIS_NAME);
        String joinOtherName = this.topology.newName(OUTEROTHER_NAME);
        String joinMergeName = this.topology.newName(MERGE_NAME);
        KTableKTableOuterJoin joinThis = new KTableKTableOuterJoin(this, (KTableImpl)other, joiner);
        KTableKTableOuterJoin joinOther = new KTableKTableOuterJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        KTableKTableJoinMerger<K, V> joinMerge = new KTableKTableJoinMerger<K, V>(new KTableImpl<K, S, V>(this.topology, joinThisName, joinThis, this.sourceNodes), new KTableImpl<K, S, V>(this.topology, joinOtherName, joinOther, ((KTableImpl)other).sourceNodes));
        this.topology.addProcessor(joinThisName, joinThis, this.name);
        this.topology.addProcessor(joinOtherName, joinOther, ((KTableImpl)other).name);
        this.topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
        return new KTableImpl<K, S, V>(this.topology, joinMergeName, joinMerge, allSourceNodes);
    }

    @Override
    public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) {
        Set<String> allSourceNodes = this.ensureJoinableWith((AbstractStream)((Object)other));
        String joinThisName = this.topology.newName(LEFTTHIS_NAME);
        String joinOtherName = this.topology.newName(LEFTOTHER_NAME);
        String joinMergeName = this.topology.newName(MERGE_NAME);
        KTableKTableLeftJoin joinThis = new KTableKTableLeftJoin(this, (KTableImpl)other, joiner);
        KTableKTableRightJoin joinOther = new KTableKTableRightJoin((KTableImpl)other, this, KTableImpl.reverseJoiner(joiner));
        KTableKTableJoinMerger<K, V> joinMerge = new KTableKTableJoinMerger<K, V>(new KTableImpl<K, S, V>(this.topology, joinThisName, joinThis, this.sourceNodes), new KTableImpl<K, S, V>(this.topology, joinOtherName, joinOther, ((KTableImpl)other).sourceNodes));
        this.topology.addProcessor(joinThisName, joinThis, this.name);
        this.topology.addProcessor(joinOtherName, joinOther, ((KTableImpl)other).name);
        this.topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName);
        return new KTableImpl<K, S, V>(this.topology, joinMergeName, joinMerge, allSourceNodes);
    }

    @Override
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde) {
        String selectName = this.topology.newName(SELECT_NAME);
        KTableRepartitionMap<K, V, K1, V1> selectSupplier = new KTableRepartitionMap<K, V, K1, V1>(this, selector);
        this.topology.addProcessor(selectName, selectSupplier, this.name);
        this.enableSendingOldValues();
        return new KGroupedTableImpl<K1, V1>(this.topology, selectName, this.name, keySerde, valueSerde);
    }

    @Override
    public <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<K, V, KeyValue<K1, V1>> selector) {
        return this.groupBy(selector, null, null);
    }

    KTableValueGetterSupplier<K, V> valueGetterSupplier() {
        if (this.processorSupplier instanceof KTableSource) {
            KTableSource source = (KTableSource)this.processorSupplier;
            this.materialize(source);
            return new KTableSourceValueGetterSupplier(source.topic);
        }
        if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
            return ((KStreamAggProcessorSupplier)this.processorSupplier).view();
        }
        return ((KTableProcessorSupplier)this.processorSupplier).view();
    }

    void enableSendingOldValues() {
        if (!this.sendOldValues) {
            if (this.processorSupplier instanceof KTableSource) {
                KTableSource source = (KTableSource)this.processorSupplier;
                this.materialize(source);
                source.enableSendingOldValues();
            } else if (this.processorSupplier instanceof KStreamAggProcessorSupplier) {
                ((KStreamAggProcessorSupplier)this.processorSupplier).enableSendingOldValues();
            } else {
                ((KTableProcessorSupplier)this.processorSupplier).enableSendingOldValues();
            }
            this.sendOldValues = true;
        }
    }

    boolean sendingOldValueEnabled() {
        return this.sendOldValues;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void materialize(KTableSource<K, ?> source) {
        KTableSource<K, ?> kTableSource = source;
        synchronized (kTableSource) {
            if (!source.isMaterialized()) {
                KTableStoreSupplier<K, V> storeSupplier = new KTableStoreSupplier<K, V>(source.topic, this.keySerde, this.valSerde, null);
                this.topology.addStateStore(storeSupplier, false, this.name);
                source.materialize();
            }
        }
    }
}

