/*
 * Decompiled with CFR 0.152.
 */
package org.apache.arrow.vector.file;

import java.io.IOException;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullableTinyIntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.file.ReadChannel;
import org.apache.arrow.vector.schema.ArrowMessage;
import org.apache.arrow.vector.stream.ArrowStreamReader;
import org.apache.arrow.vector.stream.ArrowStreamWriter;
import org.apache.arrow.vector.stream.MessageSerializerTest;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.Assert;
import org.junit.Test;

public class TestArrowStreamPipe {
    Schema schema = MessageSerializerTest.testSchema();
    BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);

    @Test
    public void pipeTest() throws IOException, InterruptedException {
        int NUM_BATCHES = 10;
        Pipe pipe = Pipe.open();
        WriterThread writer = new WriterThread(NUM_BATCHES, pipe.sink());
        ReaderThread reader = new ReaderThread(pipe.source());
        writer.start();
        reader.start();
        reader.join();
        writer.join();
        Assert.assertEquals((long)NUM_BATCHES, (long)reader.getBatchesRead());
        Assert.assertEquals((long)writer.bytesWritten(), (long)reader.bytesRead());
    }

    private final class ReaderThread
    extends Thread {
        private int batchesRead = 0;
        private final ArrowStreamReader reader;
        private final BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
        private boolean done = false;

        public ReaderThread(ReadableByteChannel sourceChannel) throws IOException {
            this.reader = new ArrowStreamReader(sourceChannel, this.alloc){

                protected ArrowMessage readMessage(ReadChannel in, BufferAllocator allocator) throws IOException {
                    ArrowMessage message = super.readMessage(in, allocator);
                    if (message == null) {
                        ReaderThread.this.done = true;
                    } else {
                        ReaderThread.this.batchesRead++;
                    }
                    return message;
                }

                public boolean loadNextBatch() throws IOException {
                    if (!super.loadNextBatch()) {
                        return false;
                    }
                    if (!ReaderThread.this.done) {
                        VectorSchemaRoot root = this.getVectorSchemaRoot();
                        Assert.assertEquals((long)16L, (long)root.getRowCount());
                        NullableTinyIntVector vector = (NullableTinyIntVector)root.getFieldVectors().get(0);
                        Assert.assertEquals((long)((byte)(ReaderThread.this.batchesRead - 1)), (long)vector.getAccessor().get(0));
                        for (int i = 1; i < 16; ++i) {
                            if (i < 8) {
                                Assert.assertEquals((long)((byte)(i + 1)), (long)vector.getAccessor().get(i));
                                continue;
                            }
                            Assert.assertTrue((boolean)vector.getAccessor().isNull(i));
                        }
                    }
                    return true;
                }
            };
        }

        @Override
        public void run() {
            try {
                Assert.assertEquals((Object)TestArrowStreamPipe.this.schema, (Object)this.reader.getVectorSchemaRoot().getSchema());
                Assert.assertTrue((String)((Field)this.reader.getVectorSchemaRoot().getSchema().getFields().get(0)).getTypeLayout().getVectorTypes().toString(), (((Field)this.reader.getVectorSchemaRoot().getSchema().getFields().get(0)).getTypeLayout().getVectors().size() > 0 ? 1 : 0) != 0);
                while (!this.done) {
                    Assert.assertTrue((boolean)this.reader.loadNextBatch());
                }
            }
            catch (IOException e) {
                e.printStackTrace();
                Assert.fail((String)e.toString());
            }
        }

        public int getBatchesRead() {
            return this.batchesRead;
        }

        public long bytesRead() {
            return this.reader.bytesRead();
        }
    }

    private final class WriterThread
    extends Thread {
        private final int numBatches;
        private final ArrowStreamWriter writer;
        private final VectorSchemaRoot root;

        public WriterThread(int numBatches, WritableByteChannel sinkChannel) throws IOException {
            this.numBatches = numBatches;
            BufferAllocator allocator = TestArrowStreamPipe.this.alloc.newChildAllocator("writer thread", 0L, Integer.MAX_VALUE);
            this.root = VectorSchemaRoot.create((Schema)TestArrowStreamPipe.this.schema, (BufferAllocator)allocator);
            this.writer = new ArrowStreamWriter(this.root, null, sinkChannel);
        }

        @Override
        public void run() {
            try {
                this.writer.start();
                for (int j = 0; j < this.numBatches; ++j) {
                    ((FieldVector)this.root.getFieldVectors().get(0)).allocateNew();
                    NullableTinyIntVector.Mutator mutator = (NullableTinyIntVector.Mutator)((FieldVector)this.root.getFieldVectors().get(0)).getMutator();
                    mutator.set(0, j);
                    for (int i = 1; i < 16; ++i) {
                        mutator.set(i, i < 8 ? 1 : 0, (byte)(i + 1));
                    }
                    mutator.setValueCount(16);
                    this.root.setRowCount(16);
                    this.writer.writeBatch();
                }
                this.writer.close();
                this.root.close();
            }
            catch (IOException e) {
                e.printStackTrace();
                Assert.fail((String)e.toString());
            }
        }

        public long bytesWritten() {
            return this.writer.bytesWritten();
        }
    }
}

