/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.network.shuffle;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.Timer;
import io.netty.channel.Channel;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.annotations.VisibleForTesting;
import org.spark_project.guava.collect.Lists;

public class ExternalShuffleBlockHandler
extends RpcHandler {
    private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);
    @VisibleForTesting
    final ExternalShuffleBlockResolver blockManager;
    private final OneForOneStreamManager streamManager;
    private final ShuffleMetrics metrics = new ShuffleMetrics();

    public ExternalShuffleBlockHandler(TransportConf conf, File registeredExecutorFile) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
    }

    @VisibleForTesting
    public ExternalShuffleBlockHandler(OneForOneStreamManager streamManager, ExternalShuffleBlockResolver blockManager) {
        this.streamManager = streamManager;
        this.blockManager = blockManager;
    }

    public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) {
        BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message);
        this.handleMessage(msgObj, client, callback);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleMessage(BlockTransferMessage msgObj, TransportClient client, RpcResponseCallback callback) {
        if (msgObj instanceof OpenBlocks) {
            Timer.Context responseDelayContext = this.metrics.openBlockRequestLatencyMillis.time();
            try {
                OpenBlocks msg = (OpenBlocks)msgObj;
                this.checkAuth(client, msg.appId);
                ArrayList blocks = Lists.newArrayList();
                long totalBlockSize = 0L;
                for (String blockId : msg.blockIds) {
                    ManagedBuffer block = this.blockManager.getBlockData(msg.appId, msg.execId, blockId);
                    totalBlockSize += block != null ? block.size() : 0L;
                    blocks.add(block);
                }
                long streamId = this.streamManager.registerStream(client.getClientId(), blocks.iterator());
                if (logger.isTraceEnabled()) {
                    logger.trace("Registered streamId {} with {} buffers for client {} from host {}", new Object[]{streamId, msg.blockIds.length, client.getClientId(), NettyUtils.getRemoteAddress((Channel)client.getChannel())});
                }
                callback.onSuccess(new StreamHandle(streamId, msg.blockIds.length).toByteBuffer());
                this.metrics.blockTransferRateBytes.mark(totalBlockSize);
            }
            finally {
                responseDelayContext.stop();
            }
        } else if (msgObj instanceof RegisterExecutor) {
            Timer.Context responseDelayContext = this.metrics.registerExecutorRequestLatencyMillis.time();
            try {
                RegisterExecutor msg = (RegisterExecutor)msgObj;
                this.checkAuth(client, msg.appId);
                this.blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
                callback.onSuccess(ByteBuffer.wrap(new byte[0]));
            }
            finally {
                responseDelayContext.stop();
            }
        } else {
            throw new UnsupportedOperationException("Unexpected message: " + msgObj);
        }
    }

    public MetricSet getAllMetrics() {
        return this.metrics;
    }

    public StreamManager getStreamManager() {
        return this.streamManager;
    }

    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
        this.blockManager.applicationRemoved(appId, cleanupLocalDirs);
    }

    public void reregisterExecutor(ExternalShuffleBlockResolver.AppExecId appExecId, ExecutorShuffleInfo executorInfo) {
        this.blockManager.registerExecutor(appExecId.appId, appExecId.execId, executorInfo);
    }

    public void close() {
        this.blockManager.close();
    }

    private void checkAuth(TransportClient client, String appId) {
        if (client.getClientId() != null && !client.getClientId().equals(appId)) {
            throw new SecurityException(String.format("Client for %s not authorized for application %s.", client.getClientId(), appId));
        }
    }

    private class ShuffleMetrics
    implements MetricSet {
        private final Map<String, Metric> allMetrics;
        private final Timer openBlockRequestLatencyMillis = new Timer();
        private final Timer registerExecutorRequestLatencyMillis = new Timer();
        private final Meter blockTransferRateBytes = new Meter();

        private ShuffleMetrics() {
            this.allMetrics = new HashMap<String, Metric>();
            this.allMetrics.put("openBlockRequestLatencyMillis", (Metric)this.openBlockRequestLatencyMillis);
            this.allMetrics.put("registerExecutorRequestLatencyMillis", (Metric)this.registerExecutorRequestLatencyMillis);
            this.allMetrics.put("blockTransferRateBytes", (Metric)this.blockTransferRateBytes);
            this.allMetrics.put("registeredExecutorsSize", (Metric)new Gauge<Integer>(){

                public Integer getValue() {
                    return ExternalShuffleBlockHandler.this.blockManager.getRegisteredExecutorsSize();
                }
            });
        }

        public Map<String, Metric> getMetrics() {
            return this.allMetrics;
        }
    }
}

