package rx.operators;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Assert;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
import rx.util.functions.Func1;
import rx.util.functions.Functions;

/* loaded from: input_file:rx/operators/OperationGroupBy.class */
public final class OperationGroupBy {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:rx/operators/OperationGroupBy$GroupBy.class */
    public static class GroupBy<K, V> implements Observable.OnSubscribeFunc<GroupedObservable<K, V>> {
        private final Observable<KeyValue<K, V>> source;
        private final ConcurrentHashMap<K, GroupedSubject<K, V>> groupedObservables;
        private final SafeObservableSubscription actualParentSubscription;
        private final AtomicInteger numGroupSubscriptions;
        private final AtomicBoolean unsubscribeRequested;

        private GroupBy(Observable<KeyValue<K, V>> observable) {
            this.groupedObservables = new ConcurrentHashMap<>();
            this.actualParentSubscription = new SafeObservableSubscription();
            this.numGroupSubscriptions = new AtomicInteger();
            this.unsubscribeRequested = new AtomicBoolean(false);
            this.source = observable;
        }

        @Override // rx.Observable.OnSubscribeFunc
        public Subscription onSubscribe(final Observer<? super GroupedObservable<K, V>> observer) {
            this.actualParentSubscription.wrap(this.source.subscribe(new Observer<KeyValue<K, V>>() { // from class: rx.operators.OperationGroupBy.GroupBy.1
                @Override // rx.Observer
                public void onCompleted() {
                    Iterator<V> it = GroupBy.this.groupedObservables.values().iterator();
                    while (it.hasNext()) {
                        ((GroupedSubject) it.next()).onCompleted();
                    }
                    observer.onCompleted();
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    Iterator<V> it = GroupBy.this.groupedObservables.values().iterator();
                    while (it.hasNext()) {
                        ((GroupedSubject) it.next()).onError(th);
                    }
                    observer.onError(th);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // rx.Observer
                public void onNext(KeyValue<K, V> keyValue) {
                    GroupedSubject groupedSubject = (GroupedSubject) GroupBy.this.groupedObservables.get(((KeyValue) keyValue).key);
                    if (groupedSubject == null) {
                        if (GroupBy.this.unsubscribeRequested.get()) {
                            return;
                        }
                        GroupedSubject create = GroupedSubject.create(((KeyValue) keyValue).key, this);
                        GroupedSubject groupedSubject2 = (GroupedSubject) GroupBy.this.groupedObservables.putIfAbsent(((KeyValue) keyValue).key, create);
                        if (groupedSubject2 == null) {
                            groupedSubject = create;
                            observer.onNext(groupedSubject);
                        } else {
                            groupedSubject = groupedSubject2;
                        }
                    }
                    groupedSubject.onNext(((KeyValue) keyValue).value);
                }
            }));
            return new Subscription() { // from class: rx.operators.OperationGroupBy.GroupBy.2
                @Override // rx.Subscription
                public void unsubscribe() {
                    if (GroupBy.this.numGroupSubscriptions.get() == 0) {
                        GroupBy.this.actualParentSubscription.unsubscribe();
                        GroupBy.this.unsubscribeRequested.set(true);
                    }
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void subscribeKey(K k) {
            this.numGroupSubscriptions.incrementAndGet();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unsubscribeKey(K k) {
            if (this.numGroupSubscriptions.decrementAndGet() == 0) {
                this.actualParentSubscription.unsubscribe();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:rx/operators/OperationGroupBy$GroupedSubject.class */
    public static class GroupedSubject<K, T> extends GroupedObservable<K, T> implements Observer<T> {
        private final AtomicReference<Observer<? super T>> subscribedObserver;

        static <K, T> GroupedSubject<K, T> create(final K k, final GroupBy<K, T> groupBy) {
            final AtomicReference atomicReference = new AtomicReference(OperationGroupBy.access$800());
            return new GroupedSubject<>(k, new Observable.OnSubscribeFunc<T>() { // from class: rx.operators.OperationGroupBy.GroupedSubject.1
                private final SafeObservableSubscription subscription = new SafeObservableSubscription();

                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(Observer<? super T> observer) {
                    atomicReference.set(observer);
                    groupBy.subscribeKey(k);
                    return this.subscription.wrap(new Subscription() { // from class: rx.operators.OperationGroupBy.GroupedSubject.1.1
                        @Override // rx.Subscription
                        public void unsubscribe() {
                            atomicReference.set(OperationGroupBy.access$800());
                            groupBy.unsubscribeKey(k);
                        }
                    });
                }
            }, atomicReference);
        }

        public GroupedSubject(K k, Observable.OnSubscribeFunc<T> onSubscribeFunc, AtomicReference<Observer<? super T>> atomicReference) {
            super(k, onSubscribeFunc);
            this.subscribedObserver = atomicReference;
        }

        @Override // rx.Observer
        public void onCompleted() {
            this.subscribedObserver.get().onCompleted();
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            this.subscribedObserver.get().onError(th);
        }

        @Override // rx.Observer
        public void onNext(T t) {
            this.subscribedObserver.get().onNext(t);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:rx/operators/OperationGroupBy$KeyValue.class */
    public static class KeyValue<K, V> {
        private final K key;
        private final V value;

        private KeyValue(K k, V v) {
            this.key = k;
            this.value = v;
        }
    }

    /* loaded from: input_file:rx/operators/OperationGroupBy$UnitTest.class */
    public static class UnitTest {
        final Func1<String, Integer> length = new Func1<String, Integer>() { // from class: rx.operators.OperationGroupBy.UnitTest.1
            @Override // rx.util.functions.Func1
            public Integer call(String str) {
                return Integer.valueOf(str.length());
            }
        };

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:rx/operators/OperationGroupBy$UnitTest$Event.class */
        public static class Event {
            int source;
            String message;

            private Event() {
            }

            public String toString() {
                return "Event => source: " + this.source + " message: " + this.message;
            }
        }

        @Test
        public void testGroupBy() {
            Map map = toMap(Observable.create(OperationGroupBy.groupBy(Observable.from("one", "two", "three", "four", "five", "six"), this.length)));
            Assert.assertEquals(3L, map.size());
            Assert.assertArrayEquals(Arrays.asList("one", "two", "six").toArray(), ((Collection) map.get(3)).toArray());
            Assert.assertArrayEquals(Arrays.asList("four", "five").toArray(), ((Collection) map.get(4)).toArray());
            Assert.assertArrayEquals(Arrays.asList("three").toArray(), ((Collection) map.get(5)).toArray());
        }

        @Test
        public void testEmpty() {
            Assert.assertTrue(toMap(Observable.create(OperationGroupBy.groupBy(Observable.empty(), this.length))).isEmpty());
        }

        @Test
        public void testError() {
            Observable create = Observable.create(OperationGroupBy.groupBy(Observable.concat(Observable.from("one", "two", "three", "four", "five", "six"), Observable.error(new RuntimeException("forced failure"))), this.length));
            final AtomicInteger atomicInteger = new AtomicInteger();
            final AtomicInteger atomicInteger2 = new AtomicInteger();
            final AtomicReference atomicReference = new AtomicReference();
            create.mapMany(new Func1<GroupedObservable<Integer, String>, Observable<String>>() { // from class: rx.operators.OperationGroupBy.UnitTest.3
                @Override // rx.util.functions.Func1
                public Observable<String> call(final GroupedObservable<Integer, String> groupedObservable) {
                    atomicInteger.incrementAndGet();
                    return groupedObservable.map(new Func1<String, String>() { // from class: rx.operators.OperationGroupBy.UnitTest.3.1
                        @Override // rx.util.functions.Func1
                        public String call(String str) {
                            return "Event => key: " + groupedObservable.getKey() + " value: " + str;
                        }
                    });
                }
            }).subscribe(new Observer<String>() { // from class: rx.operators.OperationGroupBy.UnitTest.2
                @Override // rx.Observer
                public void onCompleted() {
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    th.printStackTrace();
                    atomicReference.set(th);
                }

                @Override // rx.Observer
                public void onNext(String str) {
                    atomicInteger2.incrementAndGet();
                    System.out.println(str);
                }
            });
            Assert.assertEquals(3L, atomicInteger.get());
            Assert.assertEquals(6L, atomicInteger2.get());
            Assert.assertNotNull(atomicReference.get());
        }

        private static <K, V> Map<K, Collection<V>> toMap(Observable<GroupedObservable<K, V>> observable) {
            final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
            observable.toBlockingObservable().forEach(new Action1<GroupedObservable<K, V>>() { // from class: rx.operators.OperationGroupBy.UnitTest.4
                @Override // rx.util.functions.Action1
                public void call(final GroupedObservable<K, V> groupedObservable) {
                    concurrentHashMap.put(groupedObservable.getKey(), new ConcurrentLinkedQueue());
                    groupedObservable.subscribe((Action1<? super V>) new Action1<V>() { // from class: rx.operators.OperationGroupBy.UnitTest.4.1
                        @Override // rx.util.functions.Action1
                        public void call(V v) {
                            ((Collection) concurrentHashMap.get(groupedObservable.getKey())).add(v);
                        }
                    });
                }
            });
            return concurrentHashMap;
        }

        @Test
        public void testGroupedEventStream() throws Throwable {
            final AtomicInteger atomicInteger = new AtomicInteger();
            final AtomicInteger atomicInteger2 = new AtomicInteger();
            final AtomicInteger atomicInteger3 = new AtomicInteger();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            Observable.create(new Observable.OnSubscribeFunc<Event>() { // from class: rx.operators.OperationGroupBy.UnitTest.5
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(final Observer<? super Event> observer) {
                    System.out.println("*** Subscribing to EventStream ***");
                    atomicInteger2.incrementAndGet();
                    new Thread(new Runnable() { // from class: rx.operators.OperationGroupBy.UnitTest.5.1
                        @Override // java.lang.Runnable
                        public void run() {
                            for (int i = 0; i < 100; i++) {
                                Event event = new Event();
                                event.source = i % 2;
                                event.message = "Event-" + i;
                                observer.onNext(event);
                            }
                            observer.onCompleted();
                        }
                    }).start();
                    return Subscriptions.empty();
                }
            }).groupBy(new Func1<Event, Integer>() { // from class: rx.operators.OperationGroupBy.UnitTest.8
                @Override // rx.util.functions.Func1
                public Integer call(Event event) {
                    return Integer.valueOf(event.source);
                }
            }).mapMany(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() { // from class: rx.operators.OperationGroupBy.UnitTest.7
                @Override // rx.util.functions.Func1
                public Observable<String> call(GroupedObservable<Integer, Event> groupedObservable) {
                    System.out.println("GroupedObservable Key: " + groupedObservable.getKey());
                    atomicInteger3.incrementAndGet();
                    return groupedObservable.map(new Func1<Event, String>() { // from class: rx.operators.OperationGroupBy.UnitTest.7.1
                        @Override // rx.util.functions.Func1
                        public String call(Event event) {
                            return "Source: " + event.source + "  Message: " + event.message;
                        }
                    });
                }
            }).subscribe(new Observer<String>() { // from class: rx.operators.OperationGroupBy.UnitTest.6
                @Override // rx.Observer
                public void onCompleted() {
                    countDownLatch.countDown();
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    th.printStackTrace();
                    countDownLatch.countDown();
                }

                @Override // rx.Observer
                public void onNext(String str) {
                    System.out.println(str);
                    atomicInteger.incrementAndGet();
                }
            });
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(1L, atomicInteger2.get());
            Assert.assertEquals(2L, atomicInteger3.get());
            Assert.assertEquals(100L, atomicInteger.get());
        }

        @Test
        public void testUnsubscribe() throws InterruptedException {
            final AtomicInteger atomicInteger = new AtomicInteger();
            final AtomicInteger atomicInteger2 = new AtomicInteger();
            final AtomicInteger atomicInteger3 = new AtomicInteger();
            final AtomicInteger atomicInteger4 = new AtomicInteger();
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            Observable.create(new Observable.OnSubscribeFunc<Event>() { // from class: rx.operators.OperationGroupBy.UnitTest.9
                @Override // rx.Observable.OnSubscribeFunc
                public Subscription onSubscribe(final Observer<? super Event> observer) {
                    final BooleanSubscription booleanSubscription = new BooleanSubscription();
                    System.out.println("testUnsubscribe => *** Subscribing to EventStream ***");
                    atomicInteger2.incrementAndGet();
                    new Thread(new Runnable() { // from class: rx.operators.OperationGroupBy.UnitTest.9.1
                        @Override // java.lang.Runnable
                        public void run() {
                            for (int i = 0; i < 100 && !booleanSubscription.isUnsubscribed(); i++) {
                                Event event = new Event();
                                event.source = i % 2;
                                event.message = "Event-" + i;
                                observer.onNext(event);
                                atomicInteger4.incrementAndGet();
                            }
                            observer.onCompleted();
                        }
                    }).start();
                    return booleanSubscription;
                }
            }).groupBy(new Func1<Event, Integer>() { // from class: rx.operators.OperationGroupBy.UnitTest.12
                @Override // rx.util.functions.Func1
                public Integer call(Event event) {
                    return Integer.valueOf(event.source);
                }
            }).take(1).mapMany(new Func1<GroupedObservable<Integer, Event>, Observable<String>>() { // from class: rx.operators.OperationGroupBy.UnitTest.11
                @Override // rx.util.functions.Func1
                public Observable<String> call(GroupedObservable<Integer, Event> groupedObservable) {
                    System.out.println("testUnsubscribe => GroupedObservable Key: " + groupedObservable.getKey());
                    atomicInteger3.incrementAndGet();
                    return groupedObservable.take(20).map(new Func1<Event, String>() { // from class: rx.operators.OperationGroupBy.UnitTest.11.1
                        @Override // rx.util.functions.Func1
                        public String call(Event event) {
                            return "testUnsubscribe => Source: " + event.source + "  Message: " + event.message;
                        }
                    });
                }
            }).subscribe(new Observer<String>() { // from class: rx.operators.OperationGroupBy.UnitTest.10
                @Override // rx.Observer
                public void onCompleted() {
                    countDownLatch.countDown();
                }

                @Override // rx.Observer
                public void onError(Throwable th) {
                    th.printStackTrace();
                    countDownLatch.countDown();
                }

                @Override // rx.Observer
                public void onNext(String str) {
                    System.out.println(str);
                    atomicInteger.incrementAndGet();
                }
            });
            countDownLatch.await(5000L, TimeUnit.MILLISECONDS);
            Assert.assertEquals(1L, atomicInteger2.get());
            Assert.assertEquals(1L, atomicInteger3.get());
            Assert.assertEquals(20L, atomicInteger.get());
            Assert.assertEquals(39.0d, atomicInteger4.get(), 10.0d);
        }
    }

    public static <K, T, R> Observable.OnSubscribeFunc<GroupedObservable<K, R>> groupBy(Observable<? extends T> observable, final Func1<? super T, ? extends K> func1, final Func1<? super T, ? extends R> func12) {
        return new GroupBy(observable.map(new Func1<T, KeyValue<K, R>>() { // from class: rx.operators.OperationGroupBy.1
            @Override // rx.util.functions.Func1
            public KeyValue<K, R> call(T t) {
                return new KeyValue<>(Func1.this.call(t), func12.call(t));
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // rx.util.functions.Func1
            public /* bridge */ /* synthetic */ Object call(Object obj) {
                return call((AnonymousClass1<K, R, T>) obj);
            }
        }));
    }

    public static <K, T> Observable.OnSubscribeFunc<GroupedObservable<K, T>> groupBy(Observable<? extends T> observable, Func1<? super T, ? extends K> func1) {
        return groupBy(observable, func1, Functions.identity());
    }

    private static <T> Observer<T> emptyObserver() {
        return new Observer<T>() { // from class: rx.operators.OperationGroupBy.2
            @Override // rx.Observer
            public void onCompleted() {
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
            }

            @Override // rx.Observer
            public void onNext(T t) {
            }
        };
    }

    static /* synthetic */ Observer access$800() {
        return emptyObserver();
    }
}
