package org.apache.accumulo.tracer;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.Accumulo;
import org.apache.accumulo.server.ServerOpts;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.server.util.time.SimpleTimer;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader;
import org.apache.accumulo.tracer.thrift.RemoteSpan;
import org.apache.accumulo.tracer.thrift.SpanReceiver;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/tracer/TraceServer.class */
public class TraceServer implements Watcher {
    private static final Logger log = LoggerFactory.getLogger(TraceServer.class);
    private final ServerConfigurationFactory serverConfiguration;
    private final TServer server;
    private final AtomicReference<BatchWriter> writer;
    private final Connector connector;
    final String table;
    private static final int BATCH_WRITER_MAX_LATENCY = 5;
    private static final long SCHEDULE_PERIOD = 1000;
    private static final long SCHEDULE_DELAY = 1000;

    /* loaded from: input_file:org/apache/accumulo/tracer/TraceServer$ByteArrayTransport.class */
    static class ByteArrayTransport extends TTransport {
        TByteArrayOutputStream out = new TByteArrayOutputStream();

        ByteArrayTransport() {
        }

        public boolean isOpen() {
            return true;
        }

        public void open() throws TTransportException {
        }

        public void close() {
        }

        public int read(byte[] bArr, int i, int i2) {
            return 0;
        }

        public void write(byte[] bArr, int i, int i2) throws TTransportException {
            this.out.write(bArr, i, i2);
        }

        public byte[] get() {
            return this.out.get();
        }

        public int len() {
            return this.out.len();
        }
    }

    /* loaded from: input_file:org/apache/accumulo/tracer/TraceServer$Receiver.class */
    class Receiver implements SpanReceiver.Iface {
        Receiver() {
        }

        @Override // org.apache.accumulo.tracer.thrift.SpanReceiver.Iface
        public void span(RemoteSpan remoteSpan) throws TException {
            String hexString = Long.toHexString(remoteSpan.traceId);
            String hexString2 = Long.toHexString(remoteSpan.start);
            Mutation mutation = new Mutation(new Text(hexString));
            Mutation mutation2 = new Mutation(new Text("idx:" + remoteSpan.svc + ":" + hexString2));
            mutation2.put(new Text(remoteSpan.description), new Text(remoteSpan.sender), new Value((hexString + ":" + Long.toHexString(remoteSpan.stop - remoteSpan.start)).getBytes(StandardCharsets.UTF_8)));
            ByteArrayTransport byteArrayTransport = new ByteArrayTransport();
            remoteSpan.write(new TCompactProtocol(byteArrayTransport));
            String hexString3 = Long.toHexString(remoteSpan.parentId);
            if (remoteSpan.parentId == 477902) {
                hexString3 = "";
            }
            TraceServer.put(mutation, "span", hexString3 + ":" + Long.toHexString(remoteSpan.spanId), byteArrayTransport.get(), byteArrayTransport.len());
            Mutation mutation3 = null;
            if (remoteSpan.parentId == 477902) {
                mutation3 = new Mutation(new Text("start:" + hexString2));
                TraceServer.put(mutation3, "id", hexString, byteArrayTransport.get(), byteArrayTransport.len());
            }
            try {
                BatchWriter batchWriter = (BatchWriter) TraceServer.this.writer.get();
                if (null == batchWriter) {
                    TraceServer.log.warn("writer is not ready; discarding span.");
                    return;
                }
                batchWriter.addMutation(mutation);
                batchWriter.addMutation(mutation2);
                if (mutation3 != null) {
                    batchWriter.addMutation(mutation3);
                }
            } catch (MutationsRejectedException e) {
                TraceServer.log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for span information and stacktrace. cause: " + e);
                if (TraceServer.log.isDebugEnabled()) {
                    TraceServer.log.debug("discarded span due to rejection of mutation: " + mutation, e);
                }
            } catch (RuntimeException e2) {
                TraceServer.log.warn("Unable to write mutation to table; discarding span. set log level to DEBUG for stacktrace. cause: " + e2);
                TraceServer.log.debug("unable to write mutation to table due to exception.", e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void put(Mutation mutation, String str, String str2, byte[] bArr, int i) {
        mutation.put(new Text(str), new Text(str2), new Value(bArr, 0, i));
    }

    public TraceServer(ServerConfigurationFactory serverConfigurationFactory, String str) throws Exception {
        PasswordToken passwordToken;
        this.serverConfiguration = serverConfigurationFactory;
        log.info("Version 1.7.2");
        log.info("Instance " + serverConfigurationFactory.getInstance().getInstanceID());
        AccumuloConfiguration configuration = serverConfigurationFactory.getConfiguration();
        this.table = configuration.get(Property.TRACE_TABLE);
        while (true) {
            try {
                boolean equals = configuration.get(Property.TRACE_TOKEN_TYPE).equals(Property.TRACE_TOKEN_TYPE.getDefaultValue());
                String str2 = configuration.get(Property.TRACE_USER);
                str2 = configuration.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED) ? SecurityUtil.getServerPrincipal(str2) : str2;
                Map allPropertiesWithPrefix = configuration.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX);
                if (allPropertiesWithPrefix.isEmpty() && equals) {
                    passwordToken = new PasswordToken(configuration.get(Property.TRACE_PASSWORD).getBytes(StandardCharsets.UTF_8));
                } else {
                    AuthenticationToken.Properties properties = new AuthenticationToken.Properties();
                    PasswordToken passwordToken2 = (AuthenticationToken) AccumuloVFSClassLoader.getClassLoader().loadClass(configuration.get(Property.TRACE_TOKEN_TYPE)).asSubclass(AuthenticationToken.class).newInstance();
                    int length = Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey().length();
                    for (Map.Entry entry : allPropertiesWithPrefix.entrySet()) {
                        properties.put(((String) entry.getKey()).substring(length), (CharSequence) entry.getValue());
                    }
                    passwordToken2.init(properties);
                    passwordToken = passwordToken2;
                }
                Connector connector = serverConfigurationFactory.getInstance().getConnector(str2, passwordToken);
                if (!connector.tableOperations().exists(this.table)) {
                    connector.tableOperations().create(this.table);
                    IteratorSetting iteratorSetting = new IteratorSetting(10, "ageoff", AgeOffFilter.class.getName());
                    AgeOffFilter.setTTL(iteratorSetting, 604800000L);
                    connector.tableOperations().attachIterator(this.table, iteratorSetting);
                }
                connector.tableOperations().setProperty(this.table, Property.TABLE_FORMATTER_CLASS.getKey(), TraceFormatter.class.getName());
                this.connector = connector;
                int port = configuration.getPort(Property.TRACE_PORT);
                ServerSocket socket = ServerSocketChannel.open().socket();
                socket.setReuseAddress(true);
                socket.bind(new InetSocketAddress(str, port));
                TThreadPoolServer.Args args = new TThreadPoolServer.Args(new TServerSocket(socket));
                args.processor(new SpanReceiver.Processor(new Receiver()));
                this.server = new TThreadPoolServer(args);
                registerInZooKeeper(socket.getInetAddress().getHostAddress() + ":" + socket.getLocalPort(), configuration.get(Property.TRACE_ZK_PATH));
                this.writer = new AtomicReference<>(this.connector.createBatchWriter(this.table, new BatchWriterConfig().setMaxLatency(5L, TimeUnit.SECONDS)));
                return;
            } catch (RuntimeException e) {
                log.info("Waiting to checking/create the trace table.", e);
                UtilWaitThread.sleep(1000L);
            }
        }
    }

    public void run() throws Exception {
        SimpleTimer.getInstance(this.serverConfiguration.getConfiguration()).schedule(new Runnable() { // from class: org.apache.accumulo.tracer.TraceServer.1
            @Override // java.lang.Runnable
            public void run() {
                TraceServer.this.flush();
            }
        }, 1000L, 1000L);
        this.server.serve();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flush() {
        try {
            BatchWriter batchWriter = this.writer.get();
            if (null != batchWriter) {
                batchWriter.flush();
            } else if (this.connector.tableOperations().exists(this.table)) {
                resetWriter();
            }
        } catch (MutationsRejectedException e) {
            log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + e);
            log.debug("flushing traces failed due to exception", e);
            resetWriter();
        } catch (RuntimeException e2) {
            log.warn("Problem flushing traces, resetting writer. Set log level to DEBUG to see stacktrace. cause: " + e2);
            log.debug("flushing traces failed due to exception", e2);
            resetWriter();
        }
    }

    private void resetWriter() {
        BatchWriter batchWriter = null;
        try {
            try {
                batchWriter = this.connector.createBatchWriter(this.table, new BatchWriterConfig().setMaxLatency(5L, TimeUnit.SECONDS));
                batchWriter = this.writer.getAndSet(batchWriter);
                if (null != batchWriter) {
                    try {
                        batchWriter.close();
                    } catch (Exception e) {
                        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + e);
                        log.debug("batch writer close failed with exception", e);
                    }
                }
            } catch (Exception e2) {
                log.warn("Unable to create a batch writer, will retry. Set log level to DEBUG to see stacktrace. cause: " + e2);
                log.debug("batch writer creation failed with exception.", e2);
                batchWriter = this.writer.getAndSet(batchWriter);
                if (null != batchWriter) {
                    try {
                        batchWriter.close();
                    } catch (Exception e3) {
                        log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + e3);
                        log.debug("batch writer close failed with exception", e3);
                    }
                }
            }
        } catch (Throwable th) {
            BatchWriter andSet = this.writer.getAndSet(batchWriter);
            if (null != andSet) {
                try {
                    andSet.close();
                } catch (Exception e4) {
                    log.warn("Problem closing batch writer. Set log level to DEBUG to see stacktrace. cause: " + e4);
                    log.debug("batch writer close failed with exception", e4);
                    throw th;
                }
            }
            throw th;
        }
    }

    private void registerInZooKeeper(String str, String str2) throws Exception {
        ZooReaderWriter zooReaderWriter = ZooReaderWriter.getInstance();
        zooReaderWriter.putPersistentData(str2, new byte[0], ZooUtil.NodeExistsPolicy.SKIP);
        log.info("Registering tracer " + str + " at " + str2);
        zooReaderWriter.exists(zooReaderWriter.putEphemeralSequential(str2 + "/trace-", str.getBytes(StandardCharsets.UTF_8)), this);
    }

    private static void loginTracer(AccumuloConfiguration accumuloConfiguration) {
        String str;
        String str2 = (String) accumuloConfiguration.getAllPropertiesWithPrefix(Property.TRACE_TOKEN_PROPERTY_PREFIX).get(Property.TRACE_TOKEN_PROPERTY_PREFIX.getKey() + "keytab");
        if (str2 == null || str2.length() == 0) {
            str2 = accumuloConfiguration.getPath(Property.GENERAL_KERBEROS_KEYTAB);
        }
        if (str2 == null || str2.length() == 0 || (str = accumuloConfiguration.get(Property.TRACE_USER)) == null || str.length() == 0) {
            return;
        }
        log.info("Attempting to login as {} with {}", str, str2);
        if (SecurityUtil.login(str, str2)) {
            try {
                UserGroupInformation.getLoginUser();
                return;
            } catch (IOException e) {
                log.error("Error starting up renewal thread. This shouldn't be happening.", e);
            }
        }
        throw new RuntimeException("Failed to perform Kerberos login for " + str + " using  " + str2);
    }

    public static void main(String[] strArr) throws Exception {
        loginTracer(SiteConfiguration.getInstance());
        ServerOpts serverOpts = new ServerOpts();
        serverOpts.parseArgs("tracer", strArr, new Object[0]);
        Accumulo.setupLogging("tracer");
        ServerConfigurationFactory serverConfigurationFactory = new ServerConfigurationFactory(HdfsZooInstance.getInstance());
        Accumulo.init(VolumeManagerImpl.get(), serverConfigurationFactory, "tracer");
        try {
            new TraceServer(serverConfigurationFactory, serverOpts.getAddress()).run();
            log.info("tracer stopping");
            ZooReaderWriter.getInstance().getZooKeeper().close();
        } catch (Throwable th) {
            log.info("tracer stopping");
            ZooReaderWriter.getInstance().getZooKeeper().close();
            throw th;
        }
    }

    public void process(WatchedEvent watchedEvent) {
        log.debug("event " + watchedEvent.getPath() + " " + watchedEvent.getType() + " " + watchedEvent.getState());
        if (watchedEvent.getState() == Watcher.Event.KeeperState.Expired) {
            log.warn("Trace server lost zookeeper registration at " + watchedEvent.getPath());
            this.server.stop();
        } else if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
            log.warn("Trace server zookeeper entry lost " + watchedEvent.getPath());
            this.server.stop();
        }
        if (watchedEvent.getPath() != null) {
            try {
                if (ZooReaderWriter.getInstance().exists(watchedEvent.getPath(), this)) {
                    return;
                }
            } catch (Exception e) {
                log.error("{}", e.getMessage(), e);
            }
            log.warn("Trace server unable to reset watch on zookeeper registration");
            this.server.stop();
        }
    }
}
