/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import java.io.Serializable;
import kafka.common.NotificationHandler;
import kafka.common.ZkNodeChangeNotificationListener;
import kafka.common.ZkNodeChangeNotificationListener$;
import kafka.security.auth.Group$;
import kafka.security.auth.Resource;
import kafka.security.auth.ResourceType;
import kafka.utils.TestUtils$;
import kafka.zk.LiteralAclChangeStore$;
import kafka.zk.LiteralAclStore$;
import kafka.zk.ZkAclChangeStore$;
import kafka.zk.ZooKeeperTestHarness;
import org.apache.kafka.common.resource.PatternType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005\u001dc\u0001\u0002\r\u001a\u0001yAQ!\n\u0001\u0005\u0002\u0019Bq!\u000b\u0001C\u0002\u0013%!\u0006\u0003\u00042\u0001\u0001\u0006Ia\u000b\u0005\ne\u0001\u0001\r\u00111A\u0005\nMB\u0011b\u000e\u0001A\u0002\u0003\u0007I\u0011\u0002\u001d\t\u0013y\u0002\u0001\u0019!A!B\u0013!\u0004\"C \u0001\u0001\u0004\u0005\r\u0011\"\u0003A\u0011-\t\t\u0002\u0001a\u0001\u0002\u0004%I!a\u0005\t\u0015\u0005]\u0001\u00011A\u0001B\u0003&\u0011\tC\u0004\u0002\u001a\u0001!\t%a\u0007\t\u000f\u0005=\u0002\u0001\"\u0011\u0002\u001c!9\u0011\u0011\b\u0001\u0005\u0002\u0005m\u0001bBA\"\u0001\u0011\u0005\u00111\u0004\u0004\u0005\u0007\u0002!A\tC\u0003&\u001d\u0011\u00051\nC\u0004M\u001d\t\u0007I\u0011B'\t\rys\u0001\u0015!\u0003O\u0011\u001dyf\u00021A\u0005\n\u0001Dq\u0001\u001a\bA\u0002\u0013%Q\r\u0003\u0004h\u001d\u0001\u0006K!\u0019\u0005\u0006Y:!\t%\u001c\u0005\u0006m:!\ta\u001e\u0005\b\u0003\u0013qA\u0011AA\u0006\u0005\u0011R6NT8eK\u000eC\u0017M\\4f\u001d>$\u0018NZ5dCRLwN\u001c'jgR,g.\u001a:UKN$(B\u0001\u000e\u001c\u0003\u0019\u0019w.\\7p]*\tA$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001y\u0002C\u0001\u0011$\u001b\u0005\t#B\u0001\u0012\u001c\u0003\tQ8.\u0003\u0002%C\t!\"l\\8LK\u0016\u0004XM\u001d+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#A\u0014\u0011\u0005!\u0002Q\"A\r\u0002%\rD\u0017M\\4f\u000bb\u0004\u0018N]1uS>tWj]\u000b\u0002WA\u0011AfL\u0007\u0002[)\ta&A\u0003tG\u0006d\u0017-\u0003\u00021[\t\u0019\u0011J\u001c;\u0002'\rD\u0017M\\4f\u000bb\u0004\u0018N]1uS>tWj\u001d\u0011\u0002)9|G/\u001b4jG\u0006$\u0018n\u001c8MSN$XM\\3s+\u0005!\u0004C\u0001\u00156\u0013\t1\u0014D\u0001\u0011[W:{G-Z\"iC:<WMT8uS\u001aL7-\u0019;j_:d\u0015n\u001d;f]\u0016\u0014\u0018\u0001\u00078pi&4\u0017nY1uS>tG*[:uK:,'o\u0018\u0013fcR\u0011\u0011\b\u0010\t\u0003YiJ!aO\u0017\u0003\tUs\u0017\u000e\u001e\u0005\b{\u0015\t\t\u00111\u00015\u0003\rAH%M\u0001\u0016]>$\u0018NZ5dCRLwN\u001c'jgR,g.\u001a:!\u0003Mqw\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s+\u0005\t\u0005C\u0001\"\u000f\u001b\u0005\u0001!a\u0006+fgRtu\u000e^5gS\u000e\fG/[8o\u0011\u0006tG\r\\3s'\rqQ\t\u0013\t\u0003Y\u0019K!aR\u0017\u0003\r\u0005s\u0017PU3g!\tA\u0013*\u0003\u0002K3\t\u0019bj\u001c;jM&\u001c\u0017\r^5p]\"\u000bg\u000e\u001a7feR\t\u0011)\u0001\u0005nKN\u001c\u0018mZ3t+\u0005q\u0005cA(U-6\t\u0001K\u0003\u0002R%\u00069Q.\u001e;bE2,'BA*.\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003+B\u00131\"\u0011:sCf\u0014UO\u001a4feB\u0011q\u000bX\u0007\u00021*\u0011\u0011LW\u0001\u0005CV$\bN\u0003\u0002\\7\u0005A1/Z2ve&$\u00180\u0003\u0002^1\nA!+Z:pkJ\u001cW-A\u0005nKN\u001c\u0018mZ3tA\u0005IA\u000f\u001b:poNK'0Z\u000b\u0002CB\u0019AFY\u0016\n\u0005\rl#AB(qi&|g.A\u0007uQJ|woU5{K~#S-\u001d\u000b\u0003s\u0019Dq!P\n\u0002\u0002\u0003\u0007\u0011-\u0001\u0006uQJ|woU5{K\u0002B#\u0001F5\u0011\u00051R\u0017BA6.\u0005!1x\u000e\\1uS2,\u0017a\u00059s_\u000e,7o\u001d(pi&4\u0017nY1uS>tGCA\u001do\u0011\u0015yW\u00031\u0001q\u0003Mqw\u000e^5gS\u000e\fG/[8o\u001b\u0016\u001c8/Y4f!\ra\u0013o]\u0005\u0003e6\u0012Q!\u0011:sCf\u0004\"\u0001\f;\n\u0005Ul#\u0001\u0002\"zi\u0016\f\u0001B]3dK&4X\r\u001a\u000b\u0002qB!\u00110a\u0001W\u001d\tQxP\u0004\u0002|}6\tAP\u0003\u0002~;\u00051AH]8pizJ\u0011AL\u0005\u0004\u0003\u0003i\u0013a\u00029bG.\fw-Z\u0005\u0005\u0003\u000b\t9AA\u0002TKFT1!!\u0001.\u00031\u0019X\r\u001e+ie><8+\u001b>f)\rI\u0014Q\u0002\u0005\u0007\u0003\u001f9\u0002\u0019A\u0016\u0002\u000b%tG-\u001a=\u0002/9|G/\u001b4jG\u0006$\u0018n\u001c8IC:$G.\u001a:`I\u0015\fHcA\u001d\u0002\u0016!9Q\bCA\u0001\u0002\u0004\t\u0015\u0001\u00068pi&4\u0017nY1uS>t\u0007*\u00198eY\u0016\u0014\b%A\u0003tKR,\u0006\u000fF\u0001:Q\rQ\u0011q\u0004\t\u0005\u0003C\tY#\u0004\u0002\u0002$)!\u0011QEA\u0014\u0003\u0015QWO\\5u\u0015\t\tI#A\u0002pe\u001eLA!!\f\u0002$\t1!)\u001a4pe\u0016\f\u0001\u0002^3be\u0012{wO\u001c\u0015\u0004\u0017\u0005M\u0002\u0003BA\u0011\u0003kIA!a\u000e\u0002$\t)\u0011I\u001a;fe\u00069B/Z:u!J|7-Z:t\u001d>$\u0018NZ5dCRLwN\u001c\u0015\u0004\u0019\u0005u\u0002\u0003BA\u0011\u0003\u007fIA!!\u0011\u0002$\t!A+Z:u\u0003y!Xm\u001d;To\u0006dGn\\<t!J|7-Z:t_J,\u0005pY3qi&|g\u000eK\u0002\u000e\u0003{\u0001")
public class ZkNodeChangeNotificationListenerTest
extends ZooKeeperTestHarness {
    private final int changeExpirationMs;
    private ZkNodeChangeNotificationListener notificationListener;
    private TestNotificationHandler notificationHandler;

    private int changeExpirationMs() {
        return this.changeExpirationMs;
    }

    private ZkNodeChangeNotificationListener notificationListener() {
        return this.notificationListener;
    }

    private void notificationListener_$eq(ZkNodeChangeNotificationListener x$1) {
        this.notificationListener = x$1;
    }

    private TestNotificationHandler notificationHandler() {
        return this.notificationHandler;
    }

    private void notificationHandler_$eq(TestNotificationHandler x$1) {
        this.notificationHandler = x$1;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        this.zkClient().createAclPaths();
        this.notificationHandler_$eq(new TestNotificationHandler());
    }

    @Override
    @After
    public void tearDown() {
        if (this.notificationListener() != null) {
            this.notificationListener().close();
        }
        super.tearDown();
    }

    @Test
    public void testProcessNotification() {
        Resource notificationMessage1 = new Resource((ResourceType)Group$.MODULE$, "messageA", PatternType.LITERAL);
        Resource notificationMessage2 = new Resource((ResourceType)Group$.MODULE$, "messageB", PatternType.LITERAL);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), ZkAclChangeStore$.MODULE$.SequenceNumberPrefix(), (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), ZkNodeChangeNotificationListener$.MODULE$.$lessinit$greater$default$6()));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(notificationMessage1);
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            if (this.notificationHandler().received().size() != 1) return false;
            Object object = this.notificationHandler().received().last();
            Resource resource = notificationMessage1;
            if (object != null) {
                if (!object.equals(resource)) return false;
                return true;
            }
            if (resource == null) return true;
            return false;
        }, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to send/process notification message in the timeout period.", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
        this.zkClient().createAclChangeNotification(notificationMessage2);
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            if (this.notificationHandler().received().size() != 2) return false;
            Object object = this.notificationHandler().received().last();
            Resource resource = notificationMessage2;
            if (object != null) {
                if (!object.equals(resource)) return false;
                return true;
            }
            if (resource == null) return true;
            return false;
        }, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Failed to send/process notification message in the timeout period.", TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(3), 10).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, new StringBuilder(7).append("message").append(i).toString(), PatternType.LITERAL)));
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.notificationHandler().received().size() == 10, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Expected 10 invocations of processNotifications, but there were ").append(this.notificationHandler().received()).toString(), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    @Test
    public void testSwallowsProcessorException() {
        this.notificationHandler().setThrowSize(2);
        this.notificationListener_$eq(new ZkNodeChangeNotificationListener(this.zkClient(), LiteralAclChangeStore$.MODULE$.aclChangePath(), ZkAclChangeStore$.MODULE$.SequenceNumberPrefix(), (NotificationHandler)this.notificationHandler(), (long)this.changeExpirationMs(), ZkNodeChangeNotificationListener$.MODULE$.$lessinit$greater$default$6()));
        this.notificationListener().init();
        this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, "messageA", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, "messageB", PatternType.LITERAL));
        this.zkClient().createAclChangeNotification(new Resource((ResourceType)Group$.MODULE$, "messageC", PatternType.LITERAL));
        TestUtils$.MODULE$.waitUntilTrue((Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.notificationHandler().received().size() == 3, (Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(63).append("Expected 2 invocations of processNotifications, but there were ").append(this.notificationHandler().received()).toString(), TestUtils$.MODULE$.waitUntilTrue$default$3(), TestUtils$.MODULE$.waitUntilTrue$default$4(), TestUtils$.MODULE$.waitUntilTrue$default$5());
    }

    public ZkNodeChangeNotificationListenerTest() {
        this.changeExpirationMs = 1000;
    }

    public class TestNotificationHandler
    implements NotificationHandler {
        private final ArrayBuffer<Resource> messages;
        private volatile Option<Object> throwSize;

        private ArrayBuffer<Resource> messages() {
            return this.messages;
        }

        private Option<Object> throwSize() {
            return this.throwSize;
        }

        private void throwSize_$eq(Option<Object> x$1) {
            this.throwSize = x$1;
        }

        public void processNotification(byte[] notificationMessage) {
            this.messages().$plus$eq((Object)LiteralAclStore$.MODULE$.changeStore().decode(notificationMessage));
            if (this.throwSize().contains((Object)BoxesRunTime.boxToInteger((int)this.messages().size()))) {
                throw new RuntimeException("Oh no, my processing failed!");
            }
        }

        public Seq<Resource> received() {
            return this.messages();
        }

        public void setThrowSize(int index) {
            this.throwSize_$eq((Option<Object>)Option$.MODULE$.apply((Object)BoxesRunTime.boxToInteger((int)index)));
        }

        public /* synthetic */ ZkNodeChangeNotificationListenerTest kafka$common$ZkNodeChangeNotificationListenerTest$TestNotificationHandler$$$outer() {
            return ZkNodeChangeNotificationListenerTest.this;
        }

        public TestNotificationHandler() {
            if (ZkNodeChangeNotificationListenerTest.this == null) {
                throw null;
            }
            this.messages = (ArrayBuffer)ArrayBuffer$.MODULE$.empty();
            this.throwSize = Option$.MODULE$.empty();
        }
    }
}

