/*
 * Decompiled with CFR 0.152.
 */
package com.ning.metrics.meteo.subscribers;

import com.espertech.esper.client.EPServiceProvider;
import com.ning.metrics.meteo.subscribers.AMQSubscriberConfig;
import com.ning.metrics.meteo.subscribers.Subscriber;
import com.ning.metrics.meteo.subscribers.TopicListener;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

class AMQSubscriber
implements Subscriber {
    private static final Logger log = Logger.getLogger(AMQSubscriber.class);
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AMQSubscriberConfig amqConfig;
    private final MessageListener listener;

    public AMQSubscriber(AMQSubscriberConfig amqConfig, EPServiceProvider epService) {
        this.amqConfig = amqConfig;
        this.listener = new TopicListener(amqConfig.getEventOutputName(), epService);
    }

    @Override
    public void subscribe() {
        this.failSafeConnect(this.amqConfig.getInitialBackoffTime(), this.amqConfig.getMaxBackoffTime());
    }

    private void failSafeConnect(int initialBackoffTime, int maxBackoffTime) {
        int backoffTime = initialBackoffTime;
        this.unsubscribe();
        try {
            log.info((Object)"Attempting to connect to ActiveMQ");
            this.connect();
            return;
        }
        catch (JMSException e) {
            log.warn((Object)("Unable to connect to ActiveMQ. Will retry in " + backoffTime + " ms"), (Throwable)e);
            this.unsubscribe();
            try {
                Thread.sleep(backoffTime);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
            if ((backoffTime *= 2) > maxBackoffTime) {
                backoffTime = maxBackoffTime;
            }
            this.failSafeConnect(backoffTime, maxBackoffTime);
            return;
        }
    }

    private void connect() throws JMSException {
        String url = String.format("%s://%s:%d", this.amqConfig.getProtocol(), this.amqConfig.getHost(), this.amqConfig.getPort());
        log.info((Object)("Connecting to: " + url));
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.amqConfig.getUsername(), this.amqConfig.getPassword(), url);
        this.connection = connectionFactory.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.consumer = this.session.createConsumer((Destination)this.session.createTopic(this.amqConfig.getTopic()));
        this.consumer.setMessageListener(this.listener);
        log.info((Object)"Connected!");
    }

    @Override
    public void unsubscribe() {
        try {
            this.closed.set(true);
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
            if (this.session != null) {
                this.session.close();
                this.session = null;
            }
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        }
        catch (JMSException jMSException) {
            // empty catch block
        }
    }
}

