/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.security.KeyStore;
import java.security.PrivilegedAction;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLServerSocket;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.FlumeAuthenticator;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThriftSource
extends AbstractSource
implements Configurable,
EventDrivenSource {
    public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class);
    public static final String CONFIG_THREADS = "threads";
    public static final String CONFIG_BIND = "bind";
    public static final String CONFIG_PORT = "port";
    public static final String CONFIG_PROTOCOL = "protocol";
    public static final String BINARY_PROTOCOL = "binary";
    public static final String COMPACT_PROTOCOL = "compact";
    private static final String SSL_KEY = "ssl";
    private static final String KEYSTORE_KEY = "keystore";
    private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
    private static final String KEYSTORE_TYPE_KEY = "keystore-type";
    private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
    private static final String KERBEROS_KEY = "kerberos";
    private static final String AGENT_PRINCIPAL = "agent-principal";
    private static final String AGENT_KEYTAB = "agent-keytab";
    private Integer port;
    private String bindAddress;
    private int maxThreads = 0;
    private SourceCounter sourceCounter;
    private TServer server;
    private ExecutorService servingExecutor;
    private String protocol;
    private String keystore;
    private String keystorePassword;
    private String keystoreType;
    private final List<String> excludeProtocols = new LinkedList<String>();
    private boolean enableSsl = false;
    private boolean enableKerberos = false;
    private String principal;
    private FlumeAuthenticator flumeAuth;

    @Override
    public void configure(Context context) {
        logger.info("Configuring thrift source.");
        this.port = context.getInteger(CONFIG_PORT);
        Preconditions.checkNotNull((Object)this.port, (Object)"Port must be specified for Thrift Source.");
        this.bindAddress = context.getString(CONFIG_BIND);
        Preconditions.checkNotNull((Object)this.bindAddress, (Object)"Bind address must be specified for Thrift Source.");
        try {
            this.maxThreads = context.getInteger(CONFIG_THREADS, Integer.valueOf(0));
            this.maxThreads = this.maxThreads <= 0 ? Integer.MAX_VALUE : this.maxThreads;
        }
        catch (NumberFormatException e) {
            logger.warn("Thrift source's \"threads\" property must specify an integer value: " + context.getString(CONFIG_THREADS));
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }
        this.protocol = context.getString(CONFIG_PROTOCOL);
        if (this.protocol == null) {
            this.protocol = COMPACT_PROTOCOL;
        }
        Preconditions.checkArgument((this.protocol.equalsIgnoreCase(BINARY_PROTOCOL) || this.protocol.equalsIgnoreCase(COMPACT_PROTOCOL) ? 1 : 0) != 0, (Object)"binary or compact are the only valid Thrift protocol types to choose from.");
        this.enableSsl = context.getBoolean(SSL_KEY, Boolean.valueOf(false));
        if (this.enableSsl) {
            this.keystore = context.getString(KEYSTORE_KEY);
            this.keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
            this.keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
            String excludeProtocolsStr = context.getString(EXCLUDE_PROTOCOLS);
            if (excludeProtocolsStr == null) {
                this.excludeProtocols.add("SSLv3");
            } else {
                this.excludeProtocols.addAll(Arrays.asList(excludeProtocolsStr.split(" ")));
                if (!this.excludeProtocols.contains("SSLv3")) {
                    this.excludeProtocols.add("SSLv3");
                }
            }
            Preconditions.checkNotNull((Object)this.keystore, (Object)"keystore must be specified when SSL is enabled");
            Preconditions.checkNotNull((Object)this.keystorePassword, (Object)"keystore-password must be specified when SSL is enabled");
            try {
                KeyStore ks = KeyStore.getInstance(this.keystoreType);
                ks.load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
            }
            catch (Exception ex) {
                throw new FlumeException("Thrift source configured with invalid keystore: " + this.keystore, (Throwable)ex);
            }
        }
        this.principal = context.getString(AGENT_PRINCIPAL);
        String keytab = context.getString(AGENT_KEYTAB);
        this.enableKerberos = context.getBoolean(KERBEROS_KEY, Boolean.valueOf(false));
        this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator((String)this.principal, (String)keytab);
        if (this.enableKerberos) {
            if (!this.flumeAuth.isAuthenticated()) {
                throw new FlumeException("Authentication failed in Kerberos mode for principal " + this.principal + " keytab " + keytab);
            }
            this.flumeAuth.startCredentialRefresher();
        }
    }

    @Override
    public void start() {
        logger.info("Starting thrift source");
        this.server = this.getTThreadedSelectorServer();
        if (this.server == null) {
            this.server = this.getTThreadPoolServer();
        }
        this.servingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss").build());
        this.servingExecutor.submit(new Runnable(){

            @Override
            public void run() {
                ThriftSource.this.flumeAuth.execute((PrivilegedAction)new PrivilegedAction<Object>(){

                    @Override
                    public Object run() {
                        ThriftSource.this.server.serve();
                        return null;
                    }
                });
            }
        });
        long timeAfterStart = System.currentTimeMillis();
        while (!this.server.isServing()) {
            try {
                if (System.currentTimeMillis() - timeAfterStart >= 10000L) {
                    throw new FlumeException("Thrift server failed to start!");
                }
                TimeUnit.MILLISECONDS.sleep(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlumeException("Interrupted while waiting for Thrift server to start.", (Throwable)e);
            }
        }
        this.sourceCounter.start();
        logger.info("Started Thrift source.");
        super.start();
    }

    private String getkeyManagerAlgorithm() {
        String algorithm = Security.getProperty("ssl.KeyManagerFactory.algorithm");
        return algorithm != null ? algorithm : KeyManagerFactory.getDefaultAlgorithm();
    }

    private TServerTransport getSSLServerTransport() {
        try {
            TSSLTransportFactory.TSSLTransportParameters params = new TSSLTransportFactory.TSSLTransportParameters();
            params.setKeyStore(this.keystore, this.keystorePassword, this.getkeyManagerAlgorithm(), this.keystoreType);
            TServerSocket transport = TSSLTransportFactory.getServerSocket((int)this.port, (int)120000, (InetAddress)InetAddress.getByName(this.bindAddress), (TSSLTransportFactory.TSSLTransportParameters)params);
            ServerSocket serverSock = transport.getServerSocket();
            if (serverSock instanceof SSLServerSocket) {
                SSLServerSocket sslServerSock = (SSLServerSocket)serverSock;
                ArrayList<String> enabledProtocols = new ArrayList<String>();
                for (String protocol : sslServerSock.getEnabledProtocols()) {
                    if (this.excludeProtocols.contains(protocol)) continue;
                    enabledProtocols.add(protocol);
                }
                sslServerSock.setEnabledProtocols(enabledProtocols.toArray(new String[0]));
            }
            return transport;
        }
        catch (Throwable throwable) {
            throw new FlumeException("Cannot start Thrift source.", throwable);
        }
    }

    private TServerTransport getTServerTransport() {
        try {
            return new TServerSocket(new InetSocketAddress(this.bindAddress, (int)this.port));
        }
        catch (Throwable throwable) {
            throw new FlumeException("Cannot start Thrift source.", throwable);
        }
    }

    private TProtocolFactory getProtocolFactory() {
        if (this.protocol.equals(BINARY_PROTOCOL)) {
            logger.info("Using TBinaryProtocol");
            return new TBinaryProtocol.Factory();
        }
        logger.info("Using TCompactProtocol");
        return new TCompactProtocol.Factory();
    }

    private TServer getTThreadedSelectorServer() {
        if (this.enableSsl || this.enableKerberos) {
            return null;
        }
        try {
            Class<?> serverClass = Class.forName("org.apache.thrift.server.TThreadedSelectorServer");
            Class<?> argsClass = Class.forName("org.apache.thrift.server.TThreadedSelectorServer$Args");
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(new InetSocketAddress(this.bindAddress, (int)this.port));
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Flume Thrift IPC Thread %d").build();
            ExecutorService sourceService = this.maxThreads == 0 ? Executors.newCachedThreadPool(threadFactory) : Executors.newFixedThreadPool(this.maxThreads, threadFactory);
            AbstractNonblockingServer.AbstractNonblockingServerArgs args = (AbstractNonblockingServer.AbstractNonblockingServerArgs)argsClass.getConstructor(TNonblockingServerTransport.class).newInstance(serverTransport);
            Method m = argsClass.getDeclaredMethod("executorService", ExecutorService.class);
            m.invoke((Object)args, sourceService);
            this.populateServerParams((TServer.AbstractServerArgs)args);
            this.server = (TServer)serverClass.getConstructor(argsClass).newInstance(args);
        }
        catch (ClassNotFoundException e) {
            return null;
        }
        catch (Throwable ex) {
            throw new FlumeException("Cannot start Thrift Source.", ex);
        }
        return this.server;
    }

    private TServer getTThreadPoolServer() {
        TServerTransport serverTransport = this.enableSsl ? this.getSSLServerTransport() : this.getTServerTransport();
        TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
        serverArgs.maxWorkerThreads(this.maxThreads);
        this.populateServerParams((TServer.AbstractServerArgs)serverArgs);
        return new TThreadPoolServer(serverArgs);
    }

    private void populateServerParams(TServer.AbstractServerArgs args) {
        args.protocolFactory(this.getProtocolFactory());
        if (this.enableKerberos) {
            args.transportFactory(this.getSASLTransportFactory());
        } else {
            args.transportFactory((TTransportFactory)new TFastFramedTransport.Factory());
        }
        args.processor((TProcessor)new ThriftSourceProtocol.Processor((ThriftSourceProtocol.Iface)new ThriftSourceHandler()));
    }

    private TTransportFactory getSASLTransportFactory() {
        String[] names;
        try {
            names = FlumeAuthenticationUtil.splitKerberosName((String)this.principal);
        }
        catch (IOException e) {
            throw new FlumeException("Error while trying to resolve Principal name - " + this.principal, (Throwable)e);
        }
        HashMap<String, String> saslProperties = new HashMap<String, String>();
        saslProperties.put("javax.security.sasl.qop", "auth");
        TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
        saslTransportFactory.addServerDefinition("GSSAPI", names[0], names[1], saslProperties, FlumeAuthenticationUtil.getSaslGssCallbackHandler());
        return saslTransportFactory;
    }

    @Override
    public void stop() {
        if (this.server != null && this.server.isServing()) {
            this.server.stop();
        }
        if (this.servingExecutor != null) {
            this.servingExecutor.shutdown();
            try {
                if (!this.servingExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.servingExecutor.shutdownNow();
                }
            }
            catch (InterruptedException e) {
                throw new FlumeException("Interrupted while waiting for server to be shutdown.");
            }
        }
        this.sourceCounter.stop();
        super.stop();
    }

    private class ThriftSourceHandler
    implements ThriftSourceProtocol.Iface {
        private ThriftSourceHandler() {
        }

        public Status append(ThriftFlumeEvent event) throws TException {
            Event flumeEvent = EventBuilder.withBody((byte[])event.getBody(), (Map)event.getHeaders());
            ThriftSource.this.sourceCounter.incrementAppendReceivedCount();
            ThriftSource.this.sourceCounter.incrementEventReceivedCount();
            try {
                ThriftSource.this.getChannelProcessor().processEvent(flumeEvent);
            }
            catch (ChannelException ex) {
                logger.warn("Thrift source " + ThriftSource.this.getName() + " could not append events " + "to the channel.", (Throwable)ex);
                return Status.FAILED;
            }
            ThriftSource.this.sourceCounter.incrementAppendAcceptedCount();
            ThriftSource.this.sourceCounter.incrementEventAcceptedCount();
            return Status.OK;
        }

        public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
            ThriftSource.this.sourceCounter.incrementAppendBatchReceivedCount();
            ThriftSource.this.sourceCounter.addToEventReceivedCount(events.size());
            ArrayList flumeEvents = Lists.newArrayList();
            for (ThriftFlumeEvent event : events) {
                flumeEvents.add(EventBuilder.withBody((byte[])event.getBody(), (Map)event.getHeaders()));
            }
            try {
                ThriftSource.this.getChannelProcessor().processEventBatch(flumeEvents);
            }
            catch (ChannelException ex) {
                logger.warn("Thrift source %s could not append events to the channel.", (Object)ThriftSource.this.getName());
                return Status.FAILED;
            }
            ThriftSource.this.sourceCounter.incrementAppendBatchAcceptedCount();
            ThriftSource.this.sourceCounter.addToEventAcceptedCount(events.size());
            return Status.OK;
        }
    }
}

