/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.network;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;

public class NetworkTestUtils {
    public static NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol, AbstractConfig serverConfig) throws Exception {
        NioEchoServer server = new NioEchoServer(listenerName, securityProtocol, serverConfig, "localhost", null);
        server.start();
        return server;
    }

    public static Selector createSelector(ChannelBuilder channelBuilder) {
        return new Selector(5000L, new Metrics(), (Time)new MockTime(), "MetricGroup", channelBuilder);
    }

    public static void checkClientConnection(Selector selector, String node, int minMessageSize, int messageCount) throws Exception {
        NetworkTestUtils.waitForChannelReady(selector, node);
        String prefix = TestUtils.randomString(minMessageSize);
        int requests = 0;
        int responses = 0;
        selector.send((Send)new NetworkSend(node, ByteBuffer.wrap((prefix + "-0").getBytes())));
        ++requests;
        while (responses < messageCount) {
            selector.poll(0L);
            Assert.assertEquals((String)"No disconnects should have occurred.", (long)0L, (long)selector.disconnected().size());
            for (NetworkReceive receive : selector.completedReceives()) {
                Assert.assertEquals((Object)(prefix + "-" + responses), (Object)new String(Utils.toArray((ByteBuffer)receive.payload())));
                ++responses;
            }
            for (int i = 0; i < selector.completedSends().size() && requests < messageCount && selector.isChannelReady(node); ++i, ++requests) {
                selector.send((Send)new NetworkSend(node, ByteBuffer.wrap((prefix + "-" + requests).getBytes())));
            }
        }
    }

    public static void waitForChannelReady(Selector selector, String node) throws IOException {
        int secondsLeft = 30;
        while (!selector.isChannelReady(node) && secondsLeft-- > 0) {
            selector.poll(1000L);
        }
        Assert.assertTrue((boolean)selector.isChannelReady(node));
    }

    public static void waitForChannelClose(Selector selector, String node, ChannelState channelState) throws IOException {
        boolean closed = false;
        for (int i = 0; i < 30; ++i) {
            selector.poll(1000L);
            if (selector.channel(node) != null) continue;
            closed = true;
            break;
        }
        Assert.assertTrue((String)"Channel was not closed by timeout", (boolean)closed);
        Assert.assertEquals((Object)channelState, selector.disconnected().get(node));
    }
}

