/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.tmq.client.system.consumer.implement;

import com.alibaba.dts.shade.com.taobao.common.fulllinkstresstesting.SplitEnvUtil;
import com.alibaba.tmq.client.TMQFactory;
import com.alibaba.tmq.client.context.ClientContext;
import com.alibaba.tmq.client.system.consumer.Consumer;
import com.alibaba.tmq.client.system.consumer.config.ConsumerConfig;
import com.alibaba.tmq.client.system.consumer.executer.ConsumerExecuter;
import com.alibaba.tmq.client.system.consumer.listener.MessageListener;
import com.alibaba.tmq.client.util.StringUtil;
import com.alibaba.tmq.common.constants.Constants;
import com.alibaba.tmq.common.domain.ConsumerKey;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DefaultConsumer
extends ClientContext
implements Consumer,
Constants {
    private static final Log logger = LogFactory.getLog(DefaultConsumer.class);
    private final ConsumerExecuter consumerExecuter;
    private final ConsumerConfig consumerConfig;

    public DefaultConsumer(ConsumerExecuter consumerExecuter, ConsumerConfig consumerConfig) {
        this.consumerExecuter = consumerExecuter;
        this.consumerConfig = consumerConfig;
    }

    @Override
    public void start() {
        if (!SplitEnvUtil.needStartDTS()) {
            logger.error((Object)"[DefaultConsumer]: start error, because of isolation environment");
            return;
        }
        ConcurrentHashMap<ConsumerKey, MessageListener> listenerTable = this.consumerExecuter.getListenerTable();
        if (listenerTable.isEmpty()) {
            throw new RuntimeException("[DefaultConsumer]: start error, there is no any messageListener");
        }
        try {
            DefaultConsumer.initClient();
        }
        catch (Throwable e) {
            throw new RuntimeException("[DefaultConsumer]: start initClient error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig, e);
        }
        for (Map.Entry<ConsumerKey, MessageListener> entry : listenerTable.entrySet()) {
            ConsumerKey consumerKey = entry.getKey();
            try {
                clientRemoting.initConnection(this.consumerConfig.getInstanceName(), 1, this.consumerConfig.getConsumerId(), consumerKey.getTopic(), consumerKey.getTag());
                logger.warn((Object)("[DefaultConsumer]: start initConnection, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + consumerKey));
            }
            catch (Throwable e) {
                logger.error((Object)("[DefaultConsumer]: start initConnection error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + consumerKey), e);
                throw new RuntimeException("[DefaultConsumer]: start initConnection error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig, e);
            }
        }
        this.consumerExecuter.setStart(true);
    }

    @Override
    public void subscribe(String topic, String tag, MessageListener messageListener) {
        if (StringUtil.isBlank(topic)) {
            throw new RuntimeException("[DefaultConsumer]: subscribe error, topic is empty");
        }
        if (null == messageListener) {
            throw new RuntimeException("[DefaultConsumer]: subscribe error, messageListener is null");
        }
        ConcurrentHashMap<ConsumerKey, MessageListener> listenerTable = this.consumerExecuter.getListenerTable();
        String beforeListenerTable = listenerTable.toString();
        listenerTable.put(new ConsumerKey(this.consumerConfig.getConsumerId(), topic, tag), messageListener);
        logger.warn((Object)("[DefaultConsumer]: subscribe, consumerId:" + this.consumerConfig.getConsumerId() + ", instanceName:" + this.consumerConfig.getInstanceName() + ", topic:" + topic + ", tag:" + tag + ", beforeListenerTable:" + beforeListenerTable + ", afterListenerTable" + listenerTable));
    }

    @Override
    public void shutdown() {
        this.consumerExecuter.setStart(false);
        TMQFactory.removeConsumer(this.consumerConfig.getConsumerId(), this.consumerConfig.getInstanceName());
        ConcurrentHashMap<ConsumerKey, MessageListener> listenerTable = this.consumerExecuter.getListenerTable();
        if (listenerTable.isEmpty()) {
            return;
        }
        for (Map.Entry<ConsumerKey, MessageListener> entry : listenerTable.entrySet()) {
            ConsumerKey consumerKey = entry.getKey();
            try {
                clientRemoting.removeConnection(this.consumerConfig.getInstanceName(), 1, this.consumerConfig.getConsumerId(), consumerKey.getTopic(), consumerKey.getTag());
                logger.warn((Object)("[DefaultConsumer]: shutdown removeConnection, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + consumerKey));
            }
            catch (Throwable e) {
                logger.error((Object)("[DefaultConsumer]: shutdown removeConnection error, clientConfig:" + clientConfig + ", consumerConfig:" + this.consumerConfig + ", consumerKey:" + consumerKey), e);
            }
        }
    }
}

