package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.operators.sort.SpillingThread;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/operators/sort/CombiningSpillingBehaviour.class */
public final class CombiningSpillingBehaviour<R> implements SpillingThread.SpillingBehaviour<R> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CombiningSpillingBehaviour.class);
    private final GroupCombineFunction<R, R> combineFunction;
    private final TypeSerializer<R> serializer;
    private final TypeComparator<R> comparator;
    private final boolean objectReuseEnabled;
    private final Configuration udfConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CombiningSpillingBehaviour(GroupCombineFunction<R, R> groupCombineFunction, TypeSerializer<R> typeSerializer, TypeComparator<R> typeComparator, boolean z, Configuration configuration) {
        this.combineFunction = groupCombineFunction;
        this.serializer = typeSerializer;
        this.objectReuseEnabled = z;
        this.udfConfig = configuration;
        this.comparator = typeComparator;
    }

    @Override // org.apache.flink.runtime.operators.sort.SpillingThread.SpillingBehaviour
    public void open() {
        try {
            FunctionUtils.openFunction(this.combineFunction, this.udfConfig);
        } catch (Throwable th) {
            throw new FlinkRuntimeException("The user-defined combiner failed in its 'open()' method.", th);
        }
    }

    @Override // org.apache.flink.runtime.operators.sort.SpillingThread.SpillingBehaviour
    public void close() {
        try {
            FunctionUtils.closeFunction(this.combineFunction);
        } catch (Throwable th) {
            throw new FlinkRuntimeException("The user-defined combiner failed in its 'close()' method.", th);
        }
    }

    @Override // org.apache.flink.runtime.operators.sort.SpillingThread.SpillingBehaviour
    public void spillBuffer(CircularElement<R> circularElement, ChannelWriterOutputView channelWriterOutputView, LargeRecordHandler<R> largeRecordHandler) throws IOException {
        LOG.debug("Combining buffer {}.", Integer.valueOf(circularElement.getId()));
        InMemorySorter<R> buffer = circularElement.getBuffer();
        CombineValueIterator combineValueIterator = new CombineValueIterator(buffer, this.serializer.mo2374createInstance(), this.objectReuseEnabled);
        WriterCollector writerCollector = new WriterCollector(channelWriterOutputView, this.serializer);
        int i = 0;
        int size = buffer.size() - 1;
        while (i < size) {
            int i2 = i;
            while (i < size) {
                try {
                    if (0 != buffer.compare(i, i + 1)) {
                        break;
                    } else {
                        i++;
                    }
                } catch (Exception e) {
                    throw new IOException("An error occurred in the combiner user code.", e);
                }
            }
            if (i == i2) {
                buffer.writeToOutput(channelWriterOutputView, i2, 1);
            } else {
                combineValueIterator.set(i2, i);
                this.combineFunction.combine(combineValueIterator, writerCollector);
            }
            i++;
        }
        if (i == size) {
            buffer.writeToOutput(channelWriterOutputView, size, 1);
        }
        LOG.debug("Combined and spilled buffer {}.", Integer.valueOf(circularElement.getId()));
    }

    @Override // org.apache.flink.runtime.operators.sort.SpillingThread.SpillingBehaviour
    public void mergeRecords(MergeIterator<R> mergeIterator, ChannelWriterOutputView channelWriterOutputView) throws IOException {
        WriterCollector writerCollector = new WriterCollector(channelWriterOutputView, this.serializer);
        try {
            if (this.objectReuseEnabled) {
                ReusingKeyGroupedIterator reusingKeyGroupedIterator = new ReusingKeyGroupedIterator(mergeIterator, this.serializer, this.comparator);
                while (reusingKeyGroupedIterator.nextKey()) {
                    this.combineFunction.combine(reusingKeyGroupedIterator.getValues(), writerCollector);
                }
            } else {
                NonReusingKeyGroupedIterator nonReusingKeyGroupedIterator = new NonReusingKeyGroupedIterator(mergeIterator, this.comparator);
                while (nonReusingKeyGroupedIterator.nextKey()) {
                    this.combineFunction.combine(nonReusingKeyGroupedIterator.getValues(), writerCollector);
                }
            }
        } catch (Exception e) {
            throw new IOException("An error occurred in the combiner user code.");
        }
    }
}
