/*
 * Decompiled with CFR 0.152.
 */
package com.ning.metrics.meteo.subscribers;

import com.espertech.esper.client.EPServiceProvider;
import com.google.inject.Inject;
import com.ning.metrics.meteo.subscribers.Subscriber;
import com.ning.metrics.meteo.subscribers.UdpJsonSubscriberConfig;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import net.sf.json.JSON;
import net.sf.json.JSONObject;
import net.sf.json.JSONSerializer;
import net.sf.json.xml.XMLSerializer;
import org.apache.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.xml.sax.SAXException;

class UdpJsonSubscriber
implements Subscriber {
    private final Logger log = Logger.getLogger(UdpJsonSubscriber.class);
    private final EPServiceProvider esperSink;
    private final UdpJsonSubscriberConfig config;
    private final DatagramSocket socket;
    private final DatagramPacket packet;
    private Thread acceptThread;
    private ExecutorService handlerPool;
    boolean running = false;

    @Inject
    public UdpJsonSubscriber(UdpJsonSubscriberConfig config, EPServiceProvider esperSink) throws SocketException, UnknownHostException {
        this.config = config;
        this.esperSink = esperSink;
        this.socket = new DatagramSocket(config.getPort(), InetAddress.getByName("0.0.0.0"));
        this.packet = new DatagramPacket(new byte[config.getPacketSize()], config.getPacketSize());
        this.log.info((Object)("Created UDP socket on port " + config.getPort() + " with packet size " + config.getPacketSize()));
    }

    @Override
    public void subscribe() {
        this.running = true;
        this.handlerPool = Executors.newCachedThreadPool();
        this.acceptThread = new Thread(){

            @Override
            public void run() {
                while (UdpJsonSubscriber.this.running) {
                    try {
                        UdpJsonSubscriber.this.log.debug((Object)"Waiting on packet");
                        UdpJsonSubscriber.this.socket.receive(UdpJsonSubscriber.this.packet);
                        UdpJsonSubscriber.this.log.debug((Object)("Got packet: " + UdpJsonSubscriber.this.packet.getLength()));
                    }
                    catch (IOException e) {
                        UdpJsonSubscriber.this.log.error((Object)("Error receiving packet: " + e.getMessage()));
                    }
                    UdpJsonSubscriber.this.handlerPool.submit(new PacketHandler(UdpJsonSubscriber.this.packet.getData()));
                }
            }
        };
        this.acceptThread.setDaemon(true);
        this.acceptThread.start();
    }

    @Override
    public void unsubscribe() {
        this.log.info((Object)"Unsubscribing...");
        this.running = false;
        this.acceptThread.interrupt();
        try {
            this.acceptThread.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        this.handlerPool.shutdownNow();
    }

    protected class PacketHandler
    implements Runnable {
        private final byte[] packetData;

        public PacketHandler(byte[] packetData) {
            this.packetData = packetData;
        }

        @Override
        public void run() {
            UdpJsonSubscriber.this.log.debug((Object)"Running handler...");
            XMLSerializer serializer = new XMLSerializer();
            JSONObject json = null;
            try {
                json = (JSONObject)JSONSerializer.toJSON((Object)new String(this.packetData));
            }
            catch (ClassCastException e) {
                UdpJsonSubscriber.this.log.error((Object)("Error converting packet to JSON: " + e.getMessage()));
                return;
            }
            catch (Exception e) {
                UdpJsonSubscriber.this.log.error((Object)("Got exception: " + e.getMessage()));
                return;
            }
            if (!json.has("timestamp")) {
                json.put((Object)"timestamp", (Object)new Date().getTime());
            }
            serializer.setRootName(UdpJsonSubscriber.this.config.getEventOutputName());
            String xml = serializer.write((JSON)json);
            DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
            try {
                DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
                Document document = dBuilder.parse(new ByteArrayInputStream(xml.getBytes()));
                UdpJsonSubscriber.this.esperSink.getEPRuntime().sendEvent((Node)document);
                UdpJsonSubscriber.this.log.debug((Object)("JSON event submitted: " + json));
            }
            catch (ParserConfigurationException e) {
                UdpJsonSubscriber.this.log.error((Object)("Error with parser configuration: " + e.getMessage()));
                return;
            }
            catch (SAXException e) {
                UdpJsonSubscriber.this.log.error((Object)("SAX Exception: " + e.getMessage()));
                return;
            }
            catch (IOException e) {
                UdpJsonSubscriber.this.log.error((Object)("IO Exception: " + e.getMessage()));
                e.printStackTrace();
                return;
            }
        }
    }
}

