/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.net.ssl.SSLEngine;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.memory.SimpleMemoryPool;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.EchoServer;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.SelectorTest;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.SslChannelBuilder;
import org.apache.kafka.common.network.SslSender;
import org.apache.kafka.common.network.SslTransportLayer;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.ssl.SslFactory;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestSslUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SslSelectorTest
extends SelectorTest {
    private Map<String, Object> sslClientConfigs;

    @Override
    @Before
    public void setUp() throws Exception {
        File trustStoreFile = File.createTempFile("truststore", ".jks");
        Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
        this.server = new EchoServer(SecurityProtocol.SSL, sslServerConfigs);
        this.server.start();
        this.time = new MockTime();
        this.sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client");
        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
        this.channelBuilder.configure(this.sslClientConfigs);
        this.metrics = new Metrics();
        this.selector = new Selector(5000L, this.metrics, this.time, "MetricGroup", this.channelBuilder, new LogContext());
    }

    @Override
    @After
    public void tearDown() throws Exception {
        this.selector.close();
        this.server.close();
        this.metrics.close();
    }

    @Override
    public SecurityProtocol securityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    @Test
    public void testDisconnectWithIntermediateBufferedBytes() throws Exception {
        int requestSize = 102400;
        String node = "0";
        String request = TestUtils.randomString(requestSize);
        this.selector.close();
        this.channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
        this.channelBuilder.configure(this.sslClientConfigs);
        this.selector = new Selector(5000L, this.metrics, this.time, "MetricGroup", this.channelBuilder, new LogContext());
        this.connect("0", new InetSocketAddress("localhost", this.server.port));
        this.selector.send((Send)this.createSend("0", request));
        TestUtils.waitForCondition(new TestCondition(){

            @Override
            public boolean conditionMet() {
                try {
                    SslSelectorTest.this.selector.poll(0L);
                    return SslSelectorTest.this.selector.channel("0").hasBytesBuffered();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }, 2000L, "Failed to reach socket state with bytes buffered");
        this.selector.close("0");
        this.verifySelectorEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRenegotiation() throws Exception {
        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT){

            protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException {
                SocketChannel socketChannel = (SocketChannel)key.channel();
                SslTransportLayer transportLayer = new SslTransportLayer(id, key, sslFactory.createSslEngine(host, socketChannel.socket().getPort()), true);
                transportLayer.startHandshake();
                return transportLayer;
            }
        };
        channelBuilder.configure(this.sslClientConfigs);
        try (Selector selector = new Selector(5000L, this.metrics, this.time, "MetricGroup2", (ChannelBuilder)channelBuilder, new LogContext());){
            int reqs = 500;
            String node = "0";
            InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
            selector.connect(node, addr, 4096, 4096);
            int requests = 0;
            int responses = 0;
            int renegotiates = 0;
            while (!selector.isChannelReady(node)) {
                selector.poll(1000L);
            }
            selector.send((Send)this.createSend(node, node + "-" + 0));
            ++requests;
            while (responses < reqs) {
                selector.poll(0L);
                if (responses >= 100 && renegotiates == 0) {
                    ++renegotiates;
                    this.server.renegotiate();
                }
                Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)selector.disconnected().size());
                for (NetworkReceive receive : selector.completedReceives()) {
                    String[] pieces = this.asString(receive).split("-");
                    Assert.assertEquals((String)"Should be in the form 'conn-counter'", (long)2L, (long)pieces.length);
                    Assert.assertEquals((String)"Check the source", (Object)receive.source(), (Object)pieces[0]);
                    Assert.assertEquals((String)"Check that the receive has kindly been rewound", (long)0L, (long)receive.payload().position());
                    Assert.assertEquals((String)"Check the request counter", (long)responses, (long)Integer.parseInt(pieces[1]));
                    ++responses;
                }
                for (int i = 0; i < selector.completedSends().size() && requests < reqs && selector.isChannelReady(node); ++i, ++requests) {
                    selector.send((Send)this.createSend(node, node + "-" + requests));
                }
            }
        }
    }

    @Test
    public void testDisabledRenegotiation() throws Exception {
        String node = "0";
        InetSocketAddress addr = new InetSocketAddress("localhost", this.server.port);
        this.selector.connect(node, addr, 4096, 4096);
        while (!this.selector.isChannelReady(node)) {
            this.selector.poll(1000L);
        }
        this.selector.send((Send)this.createSend(node, node + "-" + 0));
        this.selector.poll(0L);
        this.server.renegotiate();
        this.selector.send((Send)this.createSend(node, node + "-" + 1));
        long expiryTime = System.currentTimeMillis() + 2000L;
        ArrayList disconnected = new ArrayList();
        while (!disconnected.contains(node) && System.currentTimeMillis() < expiryTime) {
            this.selector.poll(10L);
            disconnected.addAll(this.selector.disconnected().keySet());
        }
        Assert.assertTrue((String)"Renegotiation should cause disconnection", (boolean)disconnected.contains(node));
    }

    @Override
    public void testMuteOnOOM() throws Exception {
        this.selector.close();
        SimpleMemoryPool pool = new SimpleMemoryPool(900L, 900, false, null);
        File trustStoreFile = File.createTempFile("truststore", ".jks");
        Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
        this.channelBuilder = new SslChannelBuilder(Mode.SERVER);
        this.channelBuilder.configure(sslServerConfigs);
        this.selector = new Selector(-1, 5000L, this.metrics, this.time, "MetricGroup", new HashMap(), true, false, this.channelBuilder, (MemoryPool)pool, new LogContext());
        try (ServerSocketChannel ss = ServerSocketChannel.open();){
            List completed;
            ss.bind(new InetSocketAddress(0));
            InetSocketAddress serverAddress = (InetSocketAddress)ss.getLocalAddress();
            SslSender sender1 = this.createSender(serverAddress, this.randomPayload(900));
            SslSender sender2 = this.createSender(serverAddress, this.randomPayload(900));
            sender1.start();
            sender2.start();
            SocketChannel channelX = ss.accept();
            channelX.configureBlocking(false);
            SocketChannel channelY = ss.accept();
            channelY.configureBlocking(false);
            this.selector.register("clientX", channelX);
            this.selector.register("clientY", channelY);
            boolean handshaked = false;
            NetworkReceive firstReceive = null;
            long deadline = System.currentTimeMillis() + 5000L;
            while (System.currentTimeMillis() < deadline) {
                this.selector.poll(10L);
                completed = this.selector.completedReceives();
                if (firstReceive == null) {
                    if (!completed.isEmpty()) {
                        Assert.assertEquals((String)"expecting a single request", (long)1L, (long)completed.size());
                        firstReceive = (NetworkReceive)completed.get(0);
                        Assert.assertTrue((boolean)this.selector.isMadeReadProgressLastPoll());
                        Assert.assertEquals((long)0L, (long)pool.availableMemory());
                    }
                } else {
                    Assert.assertTrue((String)"only expecting single request", (boolean)completed.isEmpty());
                }
                if (!(handshaked = sender1.waitForHandshake(1L) && sender2.waitForHandshake(1L)) || firstReceive == null || !this.selector.isOutOfMemory()) continue;
                break;
            }
            Assert.assertTrue((String)"could not initiate connections within timeout", (boolean)handshaked);
            this.selector.poll(10L);
            Assert.assertTrue((boolean)this.selector.completedReceives().isEmpty());
            Assert.assertEquals((long)0L, (long)pool.availableMemory());
            Assert.assertNotNull((String)"First receive not complete", firstReceive);
            Assert.assertTrue((String)"Selector not out of memory", (boolean)this.selector.isOutOfMemory());
            firstReceive.close();
            Assert.assertEquals((long)900L, (long)pool.availableMemory());
            completed = Collections.emptyList();
            deadline = System.currentTimeMillis() + 5000L;
            while (System.currentTimeMillis() < deadline && completed.isEmpty()) {
                this.selector.poll(1000L);
                completed = this.selector.completedReceives();
            }
            Assert.assertEquals((String)"could not read remaining request within timeout", (long)1L, (long)completed.size());
            Assert.assertEquals((long)0L, (long)pool.availableMemory());
            Assert.assertFalse((boolean)this.selector.isOutOfMemory());
        }
    }

    @Override
    protected void connect(String node, InetSocketAddress serverAddr) throws IOException {
        this.blockingConnect(node, serverAddr);
    }

    private SslSender createSender(InetSocketAddress serverAddress, byte[] payload) {
        return new SslSender(serverAddress, payload);
    }

    private static class TestSslChannelBuilder
    extends SslChannelBuilder {
        public TestSslChannelBuilder(Mode mode) {
            super(mode);
        }

        protected SslTransportLayer buildTransportLayer(SslFactory sslFactory, String id, SelectionKey key, String host) throws IOException {
            SocketChannel socketChannel = (SocketChannel)key.channel();
            SSLEngine sslEngine = sslFactory.createSslEngine(host, socketChannel.socket().getPort());
            TestSslTransportLayer transportLayer = new TestSslTransportLayer(id, key, sslEngine);
            transportLayer.startHandshake();
            return transportLayer;
        }

        class TestSslTransportLayer
        extends SslTransportLayer {
            boolean muteSocket;

            public TestSslTransportLayer(String channelId, SelectionKey key, SSLEngine sslEngine) throws IOException {
                super(channelId, key, sslEngine, false);
                this.muteSocket = false;
            }

            protected int readFromSocketChannel() throws IOException {
                if (this.muteSocket) {
                    this.muteSocket = false;
                    return 0;
                }
                this.muteSocket = true;
                return super.readFromSocketChannel();
            }
        }
    }
}

