/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.security.authenticator;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.security.auth.Subject;
import javax.security.auth.login.Configuration;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.CertStores;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Mode;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.network.SaslChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.ScramFormatter;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.ScramMechanism;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SaslAuthenticatorTest {
    private static final int BUFFER_SIZE = 4096;
    private NioEchoServer server;
    private Selector selector;
    private ChannelBuilder channelBuilder;
    private CertStores serverCertStores;
    private CertStores clientCertStores;
    private Map<String, Object> saslClientConfigs;
    private Map<String, Object> saslServerConfigs;
    private CredentialCache credentialCache;
    private int nextCorrelationId;

    @Before
    public void setup() throws Exception {
        this.serverCertStores = new CertStores(true, "localhost");
        this.clientCertStores = new CertStores(false, "localhost");
        this.saslServerConfigs = this.serverCertStores.getTrustingConfig(this.clientCertStores);
        this.saslClientConfigs = this.clientCertStores.getTrustingConfig(this.serverCertStores);
        this.credentialCache = new CredentialCache();
    }

    @After
    public void teardown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
        if (this.selector != null) {
            this.selector.close();
        }
    }

    @Test
    public void testValidSaslPlainOverSsl() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(1, 0);
    }

    @Test
    public void testValidSaslPlainOverPlaintext() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(1, 0);
    }

    @Test
    public void testInvalidPasswordSaslPlain() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setClientOptions("PLAIN", "myuser", "invalidpassword");
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN", "Authentication failed: Invalid username or password");
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testInvalidUsernameSaslPlain() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setClientOptions("PLAIN", "invaliduser", "mypassword");
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientAuthenticationFailure(securityProtocol, node, "PLAIN", "Authentication failed: Invalid username or password");
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testMissingUsernameSaslPlain() throws Exception {
        String node = "0";
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setClientOptions("PLAIN", null, "mypassword");
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = this.createEchoServer(securityProtocol);
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        try {
            this.selector.connect(node, addr, 4096, 4096);
            Assert.fail((String)"SASL/PLAIN channel created without username");
        }
        catch (IOException e) {
            Assert.assertTrue((String)"Channels not closed", (boolean)this.selector.channels().isEmpty());
            for (SelectionKey key : this.selector.keys()) {
                Assert.assertFalse((String)"Key not cancelled", (boolean)key.isValid());
            }
        }
    }

    @Test
    public void testMissingPasswordSaslPlain() throws Exception {
        String node = "0";
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.setClientOptions("PLAIN", "myuser", null);
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = this.createEchoServer(securityProtocol);
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        try {
            this.selector.connect(node, addr, 4096, 4096);
            Assert.fail((String)"SASL/PLAIN channel created without password");
        }
        catch (IOException iOException) {
            // empty catch block
        }
    }

    @Test
    public void testMechanismPluggability() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnection(securityProtocol, node);
    }

    @Test
    public void testMultipleServerMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("DIGEST-MD5", Arrays.asList("DIGEST-MD5", "PLAIN", "SCRAM-SHA-256"));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        String node1 = "1";
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.createAndCheckClientConnection(securityProtocol, node1);
        String node2 = "2";
        this.saslClientConfigs.put("sasl.mechanism", "DIGEST-MD5");
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node2, addr, 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node2, 100, 10);
        String node3 = "3";
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
        this.createSelector(securityProtocol, this.saslClientConfigs);
        this.selector.connect(node3, new InetSocketAddress("127.0.0.1", this.server.port()), 4096, 4096);
        NetworkTestUtils.checkClientConnection(this.selector, node3, 100, 10);
    }

    @Test
    public void testValidSaslScramSha256() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.createAndCheckClientConnection(securityProtocol, "0");
        this.server.verifyAuthenticationMetrics(1, 0);
    }

    @Test
    public void testValidSaslScramMechanisms() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("SCRAM-SHA-256", new ArrayList<String>(ScramMechanism.mechanismNames()));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        for (String mechanism : ScramMechanism.mechanismNames()) {
            this.saslClientConfigs.put("sasl.mechanism", mechanism);
            this.createAndCheckClientConnection(securityProtocol, "node-" + mechanism);
        }
    }

    @Test
    public void testInvalidPasswordSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("username", "myuser");
        options.put("password", "invalidpassword");
        jaasConfig.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), options);
        String node = "0";
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testUnknownUserSaslScram() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        TestJaasConfig jaasConfig = this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("username", "unknownUser");
        options.put("password", "mypassword");
        jaasConfig.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), options);
        String node = "0";
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testUserCredentialsUnavailableForScramMechanism() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("SCRAM-SHA-256", new ArrayList<String>(ScramMechanism.mechanismNames()));
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache("myuser", "mypassword");
        this.server.credentialCache().cache(ScramMechanism.SCRAM_SHA_256.mechanismName(), ScramCredential.class).remove("myuser");
        String node = "1";
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-256");
        this.createAndCheckClientAuthenticationFailure(securityProtocol, node, "SCRAM-SHA-256", null);
        this.server.verifyAuthenticationMetrics(0, 1);
        this.saslClientConfigs.put("sasl.mechanism", "SCRAM-SHA-512");
        this.createAndCheckClientConnection(securityProtocol, "2");
        this.server.verifyAuthenticationMetrics(1, 1);
    }

    @Test
    public void testScramUsernameWithSpecialCharacters() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        String username = "special user= test,scram";
        String password = username + "-password";
        TestJaasConfig jaasConfig = this.configureMechanisms("SCRAM-SHA-256", Arrays.asList("SCRAM-SHA-256"));
        HashMap<String, Object> options = new HashMap<String, Object>();
        options.put("username", username);
        options.put("password", password);
        jaasConfig.createOrUpdateEntry("KafkaClient", ScramLoginModule.class.getName(), options);
        this.server = this.createEchoServer(securityProtocol);
        this.updateScramCredentialCache(username, password);
        this.createAndCheckClientConnection(securityProtocol, "0");
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion0() throws Exception {
        this.testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short)0);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverPlaintextHandshakeVersion1() throws Exception {
        this.testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short)1);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion0() throws Exception {
        this.testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_SSL, (short)0);
    }

    @Test
    public void testUnauthenticatedApiVersionsRequestOverSslHandshakeVersion1() throws Exception {
        this.testUnauthenticatedApiVersionsRequest(SecurityProtocol.SASL_PLAINTEXT, (short)1);
    }

    @Test
    public void testApiVersionsRequestWithUnsupportedVersion() throws Exception {
        short handshakeVersion = ApiKeys.SASL_HANDSHAKE.latestVersion();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node = "1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node);
        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, Short.MAX_VALUE, "someclient", 1);
        ApiVersionsRequest request = (ApiVersionsRequest)new ApiVersionsRequest.Builder().build();
        this.selector.send(request.toSend(node, header));
        ByteBuffer responseBuffer = this.waitForResponse();
        ResponseHeader.parse((ByteBuffer)responseBuffer);
        ApiVersionsResponse response = ApiVersionsResponse.parse((ByteBuffer)responseBuffer, (short)0);
        Assert.assertEquals((Object)Errors.UNSUPPORTED_VERSION, (Object)response.error());
        this.sendVersionRequestReceiveResponse(node);
        this.sendHandshakeRequestReceiveResponse(node, handshakeVersion);
        this.authenticateUsingSaslPlainAndCheckConnection(node, handshakeVersion > 0);
    }

    @Test
    public void testSaslHandshakeRequestWithUnsupportedVersion() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, Short.MAX_VALUE, "someclient", 2);
        this.selector.send(request.toSend(node1, header));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
    }

    @Test
    public void testInvalidSaslPacket() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        this.sendHandshakeRequestReceiveResponse(node1, (short)1);
        Random random = new Random();
        byte[] bytes = new byte[1024];
        random.nextBytes(bytes);
        this.selector.send((Send)new NetworkSend(node1, ByteBuffer.wrap(bytes)));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
        String node2 = "invalid2";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node2);
        random.nextBytes(bytes);
        this.selector.send((Send)new NetworkSend(node2, ByteBuffer.wrap(bytes)));
        NetworkTestUtils.waitForChannelClose(this.selector, node2, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testInvalidApiVersionsRequestSequence() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        this.sendHandshakeRequestReceiveResponse(node1, (short)1);
        ApiVersionsRequest request = this.createApiVersionsRequestV0();
        RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, request.version(), "someclient", 2);
        this.selector.send(request.toSend(node1, versionsHeader));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
    }

    @Test
    public void testPacketSizeTooBig() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        this.sendHandshakeRequestReceiveResponse(node1, (short)1);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.putInt(Integer.MAX_VALUE);
        buffer.put(new byte[buffer.capacity() - 4]);
        buffer.rewind();
        this.selector.send((Send)new NetworkSend(node1, buffer));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
        String node2 = "invalid2";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node2);
        buffer.clear();
        buffer.putInt(Integer.MAX_VALUE);
        buffer.put(new byte[buffer.capacity() - 4]);
        buffer.rewind();
        this.selector.send((Send)new NetworkSend(node2, buffer));
        NetworkTestUtils.waitForChannelClose(this.selector, node2, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testDisallowedKafkaRequestsBeforeAuthentication() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node1 = "invalid1";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node1);
        MetadataRequest metadataRequest1 = (MetadataRequest)new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
        RequestHeader metadataRequestHeader1 = new RequestHeader(ApiKeys.METADATA, metadataRequest1.version(), "someclient", 1);
        this.selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
        NetworkTestUtils.waitForChannelClose(this.selector, node1, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good1");
        String node2 = "invalid2";
        this.createClientConnection(SecurityProtocol.PLAINTEXT, node2);
        this.sendHandshakeRequestReceiveResponse(node2, (short)1);
        MetadataRequest metadataRequest2 = (MetadataRequest)new MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
        RequestHeader metadataRequestHeader2 = new RequestHeader(ApiKeys.METADATA, metadataRequest2.version(), "someclient", 2);
        this.selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
        NetworkTestUtils.waitForChannelClose(this.selector, node2, ChannelState.READY.state());
        this.selector.close();
        this.createAndCheckClientConnection(securityProtocol, "good2");
    }

    @Test
    public void testInvalidLoginModule() throws Exception {
        TestJaasConfig jaasConfig = this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        jaasConfig.createOrUpdateEntry("KafkaClient", "InvalidLoginModule", TestJaasConfig.defaultClientOptions());
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.server = this.createEchoServer(securityProtocol);
        try {
            this.createSelector(securityProtocol, this.saslClientConfigs);
            Assert.fail((String)"SASL/PLAIN channel created without valid login module");
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
    }

    @Test
    public void testDisabledMechanism() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("PLAIN", Arrays.asList("DIGEST-MD5"));
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testInvalidMechanism() throws Exception {
        String node = "0";
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.saslClientConfigs.put("sasl.mechanism", "INVALID");
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, node);
        this.server.verifyAuthenticationMetrics(0, 1);
    }

    @Test
    public void testDynamicJaasConfiguration() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        HashMap<String, Object> serverOptions = new HashMap<String, Object>();
        serverOptions.put("user_user1", "user1-secret");
        serverOptions.put("user_user2", "user2-secret");
        TestJaasConfig staticJaasConfig = new TestJaasConfig();
        staticJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), serverOptions);
        staticJaasConfig.setClientOptions("PLAIN", "user1", "invalidpassword");
        Configuration.setConfiguration(staticJaasConfig);
        this.server = this.createEchoServer(securityProtocol);
        this.createAndCheckClientConnectionFailure(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret"));
        this.createAndCheckClientConnection(securityProtocol, "2");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user2-secret"));
        this.createAndCheckClientConnectionFailure(securityProtocol, "3");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret"));
        this.createAndCheckClientConnection(securityProtocol, "4");
        String module1 = TestJaasConfig.jaasConfigProperty("PLAIN", "user1", "user1-secret").value();
        String module2 = TestJaasConfig.jaasConfigProperty("PLAIN", "user2", "user2-secret").value();
        this.saslClientConfigs.put("sasl.jaas.config", new Password(module1 + " " + module2));
        try {
            this.createClientConnection(securityProtocol, "1");
            Assert.fail((String)"Connection created with multiple login modules in sasl.jaas.config");
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
    }

    @Test
    public void testJaasConfigurationForListener() throws Exception {
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Arrays.asList("PLAIN"));
        TestJaasConfig staticJaasConfig = new TestJaasConfig();
        HashMap<String, Object> globalServerOptions = new HashMap<String, Object>();
        globalServerOptions.put("user_global1", "gsecret1");
        globalServerOptions.put("user_global2", "gsecret2");
        staticJaasConfig.createOrUpdateEntry("KafkaServer", PlainLoginModule.class.getName(), globalServerOptions);
        HashMap<String, Object> clientListenerServerOptions = new HashMap<String, Object>();
        clientListenerServerOptions.put("user_client1", "csecret1");
        clientListenerServerOptions.put("user_client2", "csecret2");
        String clientJaasEntryName = "client.KafkaServer";
        staticJaasConfig.createOrUpdateEntry(clientJaasEntryName, PlainLoginModule.class.getName(), clientListenerServerOptions);
        Configuration.setConfiguration(staticJaasConfig);
        this.server = this.createEchoServer(new ListenerName("client"), securityProtocol);
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
        this.createAndCheckClientConnection(securityProtocol, "1");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
        this.createAndCheckClientConnectionFailure(securityProtocol, "2");
        this.server.close();
        this.server = this.createEchoServer(new ListenerName("other"), securityProtocol);
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "global1", "gsecret1"));
        this.createAndCheckClientConnection(securityProtocol, "3");
        this.saslClientConfigs.put("sasl.jaas.config", TestJaasConfig.jaasConfigProperty("PLAIN", "client1", "csecret1"));
        this.createAndCheckClientConnectionFailure(securityProtocol, "4");
    }

    @Test
    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslScramSslServerWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void oldSaslScramSslClientWithoutSaslAuthenticateHeader() throws Exception {
        this.verifySaslAuthenticateHeaderInterop(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void oldSaslPlainPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT, "PLAIN");
    }

    @Test
    public void oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslScramPlaintextClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_PLAINTEXT, "SCRAM-SHA-256");
    }

    @Test
    public void oldSaslPlainSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslPlainSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "PLAIN");
    }

    @Test
    public void oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(false, true, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    @Test
    public void oldSaslScramSslClientWithoutSaslAuthenticateHeaderFailure() throws Exception {
        this.verifySaslAuthenticateHeaderInteropWithFailure(true, false, SecurityProtocol.SASL_SSL, "SCRAM-SHA-512");
    }

    private void verifySaslAuthenticateHeaderInterop(boolean enableHeaderOnServer, boolean enableHeaderOnClient, SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
        this.configureMechanisms(saslMechanism, Arrays.asList(saslMechanism));
        this.createServer(securityProtocol, saslMechanism, enableHeaderOnServer);
        String node = "0";
        this.createClientConnection(securityProtocol, saslMechanism, node, enableHeaderOnClient);
        NetworkTestUtils.checkClientConnection(this.selector, "0", 100, 10);
    }

    private void verifySaslAuthenticateHeaderInteropWithFailure(boolean enableHeaderOnServer, boolean enableHeaderOnClient, SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
        TestJaasConfig jaasConfig = this.configureMechanisms(saslMechanism, Arrays.asList(saslMechanism));
        jaasConfig.setClientOptions(saslMechanism, "myuser", "invalidpassword");
        this.createServer(securityProtocol, saslMechanism, enableHeaderOnServer);
        String node = "0";
        this.createClientConnection(securityProtocol, saslMechanism, node, enableHeaderOnClient);
        NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.AUTHENTICATE.state());
    }

    private void createServer(SecurityProtocol securityProtocol, String saslMechanism, boolean enableSaslAuthenticateHeader) throws Exception {
        this.server = enableSaslAuthenticateHeader ? this.createEchoServer(securityProtocol) : this.startServerWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism);
        this.updateScramCredentialCache("myuser", "mypassword");
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String saslMechanism, String node, boolean enableSaslAuthenticateHeader) throws Exception {
        if (enableSaslAuthenticateHeader) {
            this.createClientConnection(securityProtocol, node);
        } else {
            this.createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism, node);
        }
    }

    private NioEchoServer startServerWithoutSaslAuthenticateHeader(final SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
        final ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        Map configs = Collections.emptyMap();
        final JaasContext jaasContext = JaasContext.load((JaasContext.Type)JaasContext.Type.SERVER, (ListenerName)listenerName, configs);
        boolean isScram = ScramMechanism.isScram((String)saslMechanism);
        if (isScram) {
            ScramCredentialUtils.createCache((CredentialCache)this.credentialCache, Arrays.asList(saslMechanism));
        }
        SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext, securityProtocol, listenerName, saslMechanism, true, this.credentialCache){

            protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id, TransportLayer transportLayer, Subject subject) throws IOException {
                return new SaslServerAuthenticator(configs, id, jaasContext, subject, null, SaslAuthenticatorTest.this.credentialCache, listenerName, securityProtocol, transportLayer){

                    protected ApiVersionsResponse apiVersionsResponse() {
                        ArrayList apiVersions = new ArrayList(ApiVersionsResponse.defaultApiVersionsResponse().apiVersions());
                        Iterator it = apiVersions.iterator();
                        while (it.hasNext()) {
                            ApiVersionsResponse.ApiVersion apiVersion = (ApiVersionsResponse.ApiVersion)it.next();
                            if (apiVersion.apiKey != ApiKeys.SASL_AUTHENTICATE.id) continue;
                            it.remove();
                            break;
                        }
                        return new ApiVersionsResponse(0, Errors.NONE, apiVersions);
                    }

                    protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
                    }
                };
            }
        };
        serverChannelBuilder.configure(this.saslServerConfigs);
        this.server = new NioEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), "localhost", (ChannelBuilder)serverChannelBuilder, this.credentialCache);
        this.server.start();
        return this.server;
    }

    private void createClientConnectionWithoutSaslAuthenticateHeader(SecurityProtocol securityProtocol, final String saslMechanism, String node) throws Exception {
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        Map configs = Collections.emptyMap();
        JaasContext jaasContext = JaasContext.load((JaasContext.Type)JaasContext.Type.CLIENT, null, configs);
        SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, listenerName, saslMechanism, true, null){

            protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id, String serverHost, String servicePrincipal, TransportLayer transportLayer, Subject subject) throws IOException {
                return new SaslClientAuthenticator(configs, id, subject, servicePrincipal, serverHost, saslMechanism, true, transportLayer){

                    protected SaslHandshakeRequest createSaslHandshakeRequest(short version) {
                        return new SaslHandshakeRequest.Builder(saslMechanism).build((short)0);
                    }

                    protected void saslAuthenticateVersion(short version) {
                    }
                };
            }
        };
        clientChannelBuilder.configure(this.saslClientConfigs);
        this.selector = NetworkTestUtils.createSelector((ChannelBuilder)clientChannelBuilder);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
    }

    private void testUnauthenticatedApiVersionsRequest(SecurityProtocol securityProtocol, short saslHandshakeVersion) throws Exception {
        SecurityProtocol clientProtocol;
        this.configureMechanisms("PLAIN", Arrays.asList("PLAIN"));
        this.server = this.createEchoServer(securityProtocol);
        String node = "1";
        switch (securityProtocol) {
            case SASL_PLAINTEXT: {
                clientProtocol = SecurityProtocol.PLAINTEXT;
                break;
            }
            case SASL_SSL: {
                clientProtocol = SecurityProtocol.SSL;
                break;
            }
            default: {
                throw new IllegalArgumentException("Server protocol " + securityProtocol + " is not SASL");
            }
        }
        this.createClientConnection(clientProtocol, node);
        NetworkTestUtils.waitForChannelReady(this.selector, node);
        ApiVersionsResponse versionsResponse = this.sendVersionRequestReceiveResponse(node);
        Assert.assertEquals((long)ApiKeys.SASL_HANDSHAKE.oldestVersion(), (long)versionsResponse.apiVersion((short)ApiKeys.SASL_HANDSHAKE.id).minVersion);
        Assert.assertEquals((long)ApiKeys.SASL_HANDSHAKE.latestVersion(), (long)versionsResponse.apiVersion((short)ApiKeys.SASL_HANDSHAKE.id).maxVersion);
        Assert.assertEquals((long)ApiKeys.SASL_AUTHENTICATE.oldestVersion(), (long)versionsResponse.apiVersion((short)ApiKeys.SASL_AUTHENTICATE.id).minVersion);
        Assert.assertEquals((long)ApiKeys.SASL_AUTHENTICATE.latestVersion(), (long)versionsResponse.apiVersion((short)ApiKeys.SASL_AUTHENTICATE.id).maxVersion);
        SaslHandshakeResponse handshakeResponse = this.sendHandshakeRequestReceiveResponse(node, saslHandshakeVersion);
        Assert.assertEquals(Collections.singletonList("PLAIN"), (Object)handshakeResponse.enabledMechanisms());
        this.authenticateUsingSaslPlainAndCheckConnection(node, saslHandshakeVersion > 0);
    }

    private void authenticateUsingSaslPlainAndCheckConnection(String node, boolean enableSaslAuthenticateHeader) throws Exception {
        String authString = "\u0000myuser\u0000mypassword";
        ByteBuffer authBuf = ByteBuffer.wrap(authString.getBytes("UTF-8"));
        if (enableSaslAuthenticateHeader) {
            SaslAuthenticateRequest request = (SaslAuthenticateRequest)new SaslAuthenticateRequest.Builder(authBuf).build();
            this.sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_AUTHENTICATE, (AbstractRequest)request);
        } else {
            this.selector.send((Send)new NetworkSend(node, authBuf));
            this.waitForResponse();
        }
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
    }

    private TestJaasConfig configureMechanisms(String clientMechanism, List<String> serverMechanisms) {
        this.saslClientConfigs.put("sasl.mechanism", clientMechanism);
        this.saslServerConfigs.put("sasl.enabled.mechanisms", serverMechanisms);
        return TestJaasConfig.createConfiguration(clientMechanism, serverMechanisms);
    }

    private void createSelector(SecurityProtocol securityProtocol, Map<String, Object> clientConfigs) {
        if (this.selector != null) {
            this.selector.close();
            this.selector = null;
        }
        String saslMechanism = (String)this.saslClientConfigs.get("sasl.mechanism");
        this.channelBuilder = ChannelBuilders.clientChannelBuilder((SecurityProtocol)securityProtocol, (JaasContext.Type)JaasContext.Type.CLIENT, (AbstractConfig)new TestSecurityConfig(clientConfigs), null, (String)saslMechanism, (boolean)true);
        this.selector = NetworkTestUtils.createSelector(this.channelBuilder);
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return this.createEchoServer(ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol), securityProtocol);
    }

    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), this.credentialCache);
    }

    private void createClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createSelector(securityProtocol, this.saslClientConfigs);
        InetSocketAddress addr = new InetSocketAddress("127.0.0.1", this.server.port());
        this.selector.connect(node, addr, 4096, 4096);
    }

    private void createAndCheckClientConnection(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createClientConnection(securityProtocol, node);
        NetworkTestUtils.checkClientConnection(this.selector, node, 100, 10);
        this.selector.close();
        this.selector = null;
    }

    private void createAndCheckClientAuthenticationFailure(SecurityProtocol securityProtocol, String node, String mechanism, String expectedErrorMessage) throws Exception {
        ChannelState finalState = this.createAndCheckClientConnectionFailure(securityProtocol, node);
        AuthenticationException exception = finalState.exception();
        Assert.assertTrue((String)("Invalid exception class " + exception.getClass()), (boolean)(exception instanceof SaslAuthenticationException));
        if (expectedErrorMessage == null) {
            expectedErrorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + mechanism;
        }
        Assert.assertEquals((Object)expectedErrorMessage, (Object)exception.getMessage());
    }

    private ChannelState createAndCheckClientConnectionFailure(SecurityProtocol securityProtocol, String node) throws Exception {
        this.createClientConnection(securityProtocol, node);
        ChannelState finalState = NetworkTestUtils.waitForChannelClose(this.selector, node, ChannelState.State.AUTHENTICATION_FAILED);
        this.selector.close();
        this.selector = null;
        return finalState;
    }

    private AbstractResponse sendKafkaRequestReceiveResponse(String node, ApiKeys apiKey, AbstractRequest request) throws IOException {
        RequestHeader header = new RequestHeader(apiKey, request.version(), "someclient", this.nextCorrelationId++);
        Send send = request.toSend(node, header);
        this.selector.send(send);
        ByteBuffer responseBuffer = this.waitForResponse();
        return NetworkClient.parseResponse((ByteBuffer)responseBuffer, (RequestHeader)header);
    }

    private SaslHandshakeResponse sendHandshakeRequestReceiveResponse(String node, short version) throws Exception {
        SaslHandshakeRequest handshakeRequest = new SaslHandshakeRequest.Builder("PLAIN").build(version);
        SaslHandshakeResponse response = (SaslHandshakeResponse)this.sendKafkaRequestReceiveResponse(node, ApiKeys.SASL_HANDSHAKE, (AbstractRequest)handshakeRequest);
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
        return response;
    }

    private ApiVersionsResponse sendVersionRequestReceiveResponse(String node) throws Exception {
        ApiVersionsRequest handshakeRequest = this.createApiVersionsRequestV0();
        ApiVersionsResponse response = (ApiVersionsResponse)this.sendKafkaRequestReceiveResponse(node, ApiKeys.API_VERSIONS, (AbstractRequest)handshakeRequest);
        Assert.assertEquals((Object)Errors.NONE, (Object)response.error());
        return response;
    }

    private ByteBuffer waitForResponse() throws IOException {
        int waitSeconds = 10;
        do {
            this.selector.poll(1000L);
        } while (this.selector.completedReceives().isEmpty() && waitSeconds-- > 0);
        Assert.assertEquals((long)1L, (long)this.selector.completedReceives().size());
        return ((NetworkReceive)this.selector.completedReceives().get(0)).payload();
    }

    private void updateScramCredentialCache(String username, String password) throws NoSuchAlgorithmException {
        for (String mechanism : (List)this.saslServerConfigs.get("sasl.enabled.mechanisms")) {
            ScramMechanism scramMechanism = ScramMechanism.forMechanismName((String)mechanism);
            if (scramMechanism == null) continue;
            ScramFormatter formatter = new ScramFormatter(scramMechanism);
            ScramCredential credential = formatter.generateCredential(password, 4096);
            this.credentialCache.cache(scramMechanism.mechanismName(), ScramCredential.class).put(username, (Object)credential);
        }
    }

    private ApiVersionsRequest createApiVersionsRequestV0() {
        return (ApiVersionsRequest)new ApiVersionsRequest.Builder(0).build();
    }
}

