/*
 * Decompiled with CFR 0.152.
 */
package tigase.cluster;

import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import tigase.cluster.ClusterController;
import tigase.cluster.ClusteredComponent;
import tigase.disco.ServiceEntity;
import tigase.disco.ServiceIdentity;
import tigase.disco.XMPPService;
import tigase.net.ConnectionType;
import tigase.net.SocketType;
import tigase.server.ConnectionManager;
import tigase.server.Packet;
import tigase.server.ServiceChecker;
import tigase.stats.StatisticsList;
import tigase.util.Algorithms;
import tigase.util.DNSResolver;
import tigase.util.JIDUtils;
import tigase.util.TimeUtils;
import tigase.xml.Element;
import tigase.xmpp.Authorization;
import tigase.xmpp.PacketErrorTypeException;
import tigase.xmpp.XMPPIOService;

public class ClusterConnectionManager
extends ConnectionManager<XMPPIOService>
implements XMPPService,
ClusteredComponent {
    private static final Logger log = Logger.getLogger("tigase.cluster.ClusterConnectionManager");
    public int[] PORTS = new int[]{5277};
    public static final String SECRET_PROP_KEY = "secret";
    public String SECRET_PROP_VAL = "someSecret";
    public static final String PORT_LOCAL_HOST_PROP_KEY = "local-host";
    public static final String PORT_ROUTING_TABLE_PROP_KEY = "routing-table";
    public String[] PORT_IFC_PROP_VAL = new String[]{"*"};
    public static final String RETURN_SERVICE_DISCO_KEY = "service-disco";
    public static final boolean RETURN_SERVICE_DISCO_VAL = true;
    public static final String IDENTITY_TYPE_KEY = "identity-type";
    public static final String IDENTITY_TYPE_VAL = "generic";
    public static final String CONNECT_ALL_PAR = "--cluster-connect-all";
    public static final String CONNECT_ALL_PROP_KEY = "connect-all";
    public static final String CLUSTER_CONTR_ID_PROP_KEY = "cluster-controller-id";
    public static final boolean CONNECT_ALL_PROP_VAL = false;
    public static final String COMPRESS_STREAM_PROP_KEY = "compress-stream";
    public static final boolean COMPRESS_STREAM_PROP_VAL = true;
    public static final String XMLNS = "tigase:cluster";
    private ClusterController clusterController = null;
    private IOServiceStatisticsGetter ioStatsGetter = new IOServiceStatisticsGetter();
    private ServiceEntity serviceEntity = null;
    private String identity_type = "generic";
    private boolean connect_all = false;
    private boolean compress_stream = true;
    private String cluster_controller_id = null;
    private int nodesNo = 0;
    private long totalNodeDisconnects = 0L;
    private long[] lastDay = new long[24];
    private long[] lastHour = new long[60];
    private int lastDayIdx = 0;
    private int lastHourIdx = 0;

    @Override
    public int hashCodeForPacket(Packet packet) {
        if (packet.getElemTo() != null) {
            return packet.getElemTo().hashCode();
        }
        return packet.getTo().hashCode();
    }

    @Override
    public int processingThreads() {
        return Math.max(Runtime.getRuntime().availableProcessors(), this.nodesNo);
    }

    @Override
    public void processPacket(Packet packet) {
        if (log.isLoggable(Level.FINEST)) {
            log.finest("Processing packet: " + packet.toString());
        }
        if (packet.getElemTo() != null && packet.getElemTo().equals(this.getComponentId())) {
            try {
                this.addOutPacket(Authorization.FEATURE_NOT_IMPLEMENTED.getResponseMessage(packet, "Not implemented", true));
            }
            catch (PacketErrorTypeException e) {
                log.warning("Packet processing exception: " + e);
            }
            return;
        }
        this.writePacketToSocket(packet.packRouted());
    }

    @Override
    protected boolean writePacketToSocket(Packet p) {
        return super.writePacketToSocket(p);
    }

    @Override
    public Queue<Packet> processSocketData(XMPPIOService serv) {
        Packet p = null;
        while ((p = serv.getReceivedPackets().poll()) != null) {
            if (log.isLoggable(Level.FINEST)) {
                log.finest("Processing socket data: " + p.getStringData());
            }
            if (p.getElemName().equals("handshake")) {
                this.processHandshake(p, serv);
                continue;
            }
            if (p.isRouted()) {
                p = p.unpackRouted();
            }
            this.addOutPacket(p);
        }
        return null;
    }

    private void processHandshake(Packet p, XMPPIOService serv) {
        switch (serv.connectionType()) {
            case connect: {
                String data = p.getElemCData();
                if (data == null) {
                    this.serviceConnected(serv);
                    break;
                }
                log.warning("Incorrect packet received: " + p.getStringData());
                break;
            }
            case accept: {
                String digest = p.getElemCData();
                String id = (String)serv.getSessionData().get("sessionID");
                String secret = (String)serv.getSessionData().get(SECRET_PROP_KEY);
                try {
                    String loc_digest = Algorithms.hexDigest((String)id, (String)secret, (String)"SHA");
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest("Calculating digest: id=" + id + ", secret=" + secret + ", digest=" + loc_digest);
                    }
                    if (digest != null && digest.equals(loc_digest)) {
                        Packet resp = new Packet(new Element("handshake"));
                        this.writePacketToSocket(serv, resp);
                        this.serviceConnected(serv);
                        break;
                    }
                    log.warning("Handshaking password doesn't match, disconnecting...");
                    serv.stop();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Handshaking error.", e);
                }
                break;
            }
        }
    }

    protected void serviceConnected(XMPPIOService serv) {
        String[] routings = (String[])serv.getSessionData().get(PORT_ROUTING_TABLE_PROP_KEY);
        this.updateRoutings(routings, true);
        String addr = (String)serv.getSessionData().get("remote-host");
        log.info("Connected to: " + addr);
        this.updateServiceDiscovery(addr, "tigase:cluster connected");
        this.clusterController.nodeConnected(addr);
    }

    @Override
    public void setProperties(Map<String, Object> props) {
        super.setProperties(props);
        this.identity_type = (String)props.get(IDENTITY_TYPE_KEY);
        this.compress_stream = (Boolean)props.get(COMPRESS_STREAM_PROP_KEY);
        this.serviceEntity = new ServiceEntity("tigase:cluster " + this.getName(), null, XMLNS);
        this.serviceEntity.addIdentities(new ServiceIdentity("component", this.identity_type, "tigase:cluster " + this.getName()));
        this.connect_all = (Boolean)props.get(CONNECT_ALL_PROP_KEY);
        this.cluster_controller_id = (String)props.get(CLUSTER_CONTR_ID_PROP_KEY);
        this.connectionDelay = 5000L;
        String[] cl_nodes = (String[])props.get("cluster-nodes");
        if (cl_nodes != null) {
            this.nodesNo = cl_nodes.length;
            for (String node : cl_nodes) {
                String host = JIDUtils.getNodeHost((String)node);
                log.config("Found cluster node host: " + host);
                if (host.equals(this.getDefHostName()) || host.hashCode() <= this.getDefHostName().hashCode() && !this.connect_all) continue;
                log.config("Trying to connect to cluster node: " + host);
                LinkedHashMap<String, Object> port_props = new LinkedHashMap<String, Object>();
                port_props.put(SECRET_PROP_KEY, this.SECRET_PROP_VAL);
                port_props.put(PORT_LOCAL_HOST_PROP_KEY, this.getDefHostName());
                port_props.put("type", (Object)ConnectionType.connect);
                port_props.put("socket", (Object)SocketType.plain);
                port_props.put("remote-host", host);
                port_props.put("ifc", new String[]{host});
                port_props.put("max-reconnects", 99999999);
                port_props.put("port-no", this.PORTS[0]);
                this.addWaitingTask(port_props);
            }
        }
    }

    @Override
    public Map<String, Object> getDefaults(Map<String, Object> params) {
        Map<String, Object> props = super.getDefaults(params);
        props.put(RETURN_SERVICE_DISCO_KEY, true);
        props.put(IDENTITY_TYPE_KEY, IDENTITY_TYPE_VAL);
        if (params.get(CONNECT_ALL_PAR) == null || !((String)params.get(CONNECT_ALL_PAR)).equals("true")) {
            props.put(CONNECT_ALL_PROP_KEY, false);
        } else {
            props.put(CONNECT_ALL_PROP_KEY, true);
        }
        if (params.get("--cluster-nodes") != null) {
            String[] cl_nodes = ((String)params.get("--cluster-nodes")).split(",");
            for (int i = 0; i < cl_nodes.length; ++i) {
                cl_nodes[i] = JIDUtils.getNodeHost((String)cl_nodes[i]);
            }
            this.nodesNo = cl_nodes.length;
            props.put("cluster-nodes", cl_nodes);
        } else {
            props.put("cluster-nodes", new String[]{this.getDefHostName()});
        }
        props.put(CLUSTER_CONTR_ID_PROP_KEY, "cluster-contr@" + this.getDefHostName());
        props.put(COMPRESS_STREAM_PROP_KEY, true);
        return props;
    }

    @Override
    protected Map<String, Object> getParamsForPort(int port) {
        LinkedHashMap<String, Object> defs = new LinkedHashMap<String, Object>();
        defs.put(SECRET_PROP_KEY, this.SECRET_PROP_VAL);
        defs.put("type", (Object)ConnectionType.accept);
        defs.put("socket", (Object)SocketType.plain);
        defs.put("ifc", this.PORT_IFC_PROP_VAL);
        return defs;
    }

    @Override
    protected int[] getDefPlainPorts() {
        return this.PORTS;
    }

    @Override
    protected String getUniqueId(XMPPIOService serv) {
        return serv.getRemoteAddress();
    }

    @Override
    public boolean serviceStopped(XMPPIOService service) {
        boolean result = super.serviceStopped(service);
        if (result) {
            ConnectionType type;
            ConcurrentMap<String, Object> sessionData = service.getSessionData();
            String[] routings = (String[])sessionData.get(PORT_ROUTING_TABLE_PROP_KEY);
            if (routings != null) {
                this.updateRoutings(routings, false);
            }
            if ((type = service.connectionType()) == ConnectionType.connect) {
                this.addWaitingTask(sessionData);
            }
            String addr = (String)sessionData.get("remote-host");
            log.info("Disonnected from: " + addr);
            this.updateServiceDiscovery(addr, "tigase:cluster disconnected");
            this.clusterController.nodeDisconnected(addr);
            ++this.totalNodeDisconnects;
            int hour = TimeUtils.getHourNow();
            if (this.lastDayIdx != hour) {
                this.lastDayIdx = hour;
                this.lastDay[hour] = 0L;
            }
            int n = hour;
            this.lastDay[n] = this.lastDay[n] + 1L;
            int minute = TimeUtils.getMinuteNow();
            if (this.lastHourIdx != minute) {
                this.lastHourIdx = minute;
                this.lastHour[minute] = 0L;
            }
            int n2 = minute;
            this.lastHour[n2] = this.lastHour[n2] + 1L;
        }
        return result;
    }

    @Override
    protected String getServiceId(Packet packet) {
        try {
            return DNSResolver.getHostIP((String)JIDUtils.getNodeHost((String)packet.getTo()));
        }
        catch (UnknownHostException e) {
            log.warning("Uknown host exception for address: " + JIDUtils.getNodeHost((String)packet.getTo()));
            return JIDUtils.getNodeHost((String)packet.getTo());
        }
    }

    @Override
    public void serviceStarted(XMPPIOService serv) {
        super.serviceStarted(serv);
        log.info("cluster connection opened: " + serv.getRemoteAddress() + ", type: " + serv.connectionType().toString() + ", id=" + serv.getUniqueId());
        if (this.compress_stream) {
            log.info("Starting stream compression for: " + serv.getUniqueId());
            serv.startZLib(9);
        }
        switch (serv.connectionType()) {
            case connect: {
                String remote_host = (String)serv.getSessionData().get("remote-host");
                serv.getSessionData().put("hostname-key", remote_host);
                serv.getSessionData().put(PORT_ROUTING_TABLE_PROP_KEY, new String[]{remote_host, ".*@" + remote_host, ".*\\." + remote_host});
                String data = "<stream:stream xmlns='tigase:cluster' xmlns:stream='http://etherx.jabber.org/streams' from='" + this.getDefHostName() + "'" + " to='" + remote_host + "'" + ">";
                log.info("cid: " + (String)serv.getSessionData().get("cid") + ", sending: " + data);
                serv.xmppStreamOpen(data);
                break;
            }
        }
    }

    @Override
    public String xmppStreamOpened(XMPPIOService service, Map<String, String> attribs) {
        log.info("Stream opened: " + attribs.toString());
        switch (service.connectionType()) {
            case connect: {
                String id = attribs.get("id");
                service.getSessionData().put("sessionID", id);
                String secret = (String)service.getSessionData().get(SECRET_PROP_KEY);
                try {
                    String digest = Algorithms.hexDigest((String)id, (String)secret, (String)"SHA");
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest("Calculating digest: id=" + id + ", secret=" + secret + ", digest=" + digest);
                    }
                    return "<handshake>" + digest + "</handshake>";
                }
                catch (NoSuchAlgorithmException e) {
                    log.log(Level.SEVERE, "Can not generate digest for pass phrase.", e);
                    return null;
                }
            }
            case accept: {
                String remote_host = attribs.get("from");
                service.getSessionData().put("hostname-key", remote_host);
                service.getSessionData().put("remote-host", remote_host);
                service.getSessionData().put(PORT_ROUTING_TABLE_PROP_KEY, new String[]{remote_host, ".*@" + remote_host, ".*\\." + remote_host});
                String id = UUID.randomUUID().toString();
                service.getSessionData().put("sessionID", id);
                return "<stream:stream xmlns='tigase:cluster' xmlns:stream='http://etherx.jabber.org/streams' from='" + this.getDefHostName() + "'" + " to='" + remote_host + "'" + " id='" + id + "'" + ">";
            }
        }
        return null;
    }

    @Override
    public void xmppStreamClosed(XMPPIOService serv) {
        log.info("Stream closed.");
    }

    @Override
    protected long getMaxInactiveTime() {
        return 86400000000L;
    }

    private void updateRoutings(String[] routings, boolean add) {
        if (add) {
            for (String route : routings) {
                try {
                    this.addRegexRouting(route);
                }
                catch (Exception e) {
                    log.warning("Can not add regex routing '" + route + "' : " + e);
                }
            }
        } else {
            for (String route : routings) {
                try {
                    this.removeRegexRouting(route);
                }
                catch (Exception e) {
                    log.warning("Can not remove regex routing '" + route + "' : " + e);
                }
            }
        }
    }

    private void updateServiceDiscovery(String jid, String name) {
        ServiceEntity item = new ServiceEntity(jid, null, name);
        log.info("Modifing service-discovery info: " + item.toString());
        this.serviceEntity.addItems(item);
    }

    @Override
    public Element getDiscoInfo(String node, String jid) {
        if (jid != null && this.getName().equals(JIDUtils.getNodeNick((String)jid))) {
            return this.serviceEntity.getDiscoInfo(node);
        }
        return null;
    }

    @Override
    public List<Element> getDiscoFeatures() {
        return null;
    }

    @Override
    public List<Element> getDiscoItems(String node, String jid) {
        if (this.getName().equals(JIDUtils.getNodeNick((String)jid))) {
            return this.serviceEntity.getDiscoItems(node, null);
        }
        return Arrays.asList(this.serviceEntity.getDiscoItem(null, JIDUtils.getNodeID((String)this.getName(), (String)jid)));
    }

    @Override
    protected XMPPIOService getXMPPIOServiceInstance() {
        return new XMPPIOService();
    }

    @Override
    public void getStatistics(StatisticsList list) {
        super.getStatistics(list);
        list.add(this.getName(), "Total disconnects", this.totalNodeDisconnects, Level.FINE);
        list.add(this.getName(), "Last day disconnects", Arrays.toString(this.lastDay), Level.FINE);
        list.add(this.getName(), "Last hour disconnects", Arrays.toString(this.lastHour), Level.FINE);
        this.ioStatsGetter.reset();
        this.doForAllServices(this.ioStatsGetter);
        list.add(this.getName(), "Average compression ratio", this.ioStatsGetter.getAverageCompressionRatio(), Level.FINE);
        list.add(this.getName(), "Average decompression ratio", this.ioStatsGetter.getAverageDecompressionRatio(), Level.FINE);
        list.add(this.getName(), "Bytes sent", this.ioStatsGetter.getBytesSent(), Level.FINE);
        list.add(this.getName(), "Bytes received", this.ioStatsGetter.getBytesReceived(), Level.FINE);
        list.add(this.getName(), "Waiting to send", this.ioStatsGetter.getWaitingToSend(), Level.FINE);
    }

    @Override
    protected Integer getMaxQueueSize(int def) {
        return def * 10;
    }

    @Override
    protected boolean isHighThroughput() {
        return true;
    }

    @Override
    public void setClusterController(ClusterController cl_controller) {
        this.clusterController = cl_controller;
    }

    @Override
    public void nodeConnected(String node) {
    }

    @Override
    public void nodeDisconnected(String node) {
    }

    private class IOServiceStatisticsGetter
    implements ServiceChecker {
        private float compressionRatio = 0.0f;
        private float decompressionRatio = 0.0f;
        private int counter = 0;
        private long bytesReceived = 0L;
        private long bytesSent = 0L;
        private int clIOQueue = 0;
        private StatisticsList list = new StatisticsList(Level.ALL);

        private IOServiceStatisticsGetter() {
        }

        @Override
        public void check(XMPPIOService service) {
            service.getStatistics(this.list);
            this.compressionRatio += this.list.getValue("zlibio", "Average compression rate", -1.0f);
            this.decompressionRatio += this.list.getValue("zlibio", "Average decompression rate", -1.0f);
            ++this.counter;
            this.bytesReceived += this.list.getValue("socketio", "Bytes received", -1L);
            this.bytesSent += this.list.getValue("socketio", "Bytes sent", -1L);
            this.clIOQueue += service.waitingToSendSize();
        }

        public float getAverageCompressionRatio() {
            return this.compressionRatio / (float)this.counter;
        }

        public float getAverageDecompressionRatio() {
            return this.decompressionRatio / (float)this.counter;
        }

        public long getBytesSent() {
            return this.bytesSent;
        }

        public long getBytesReceived() {
            return this.bytesReceived;
        }

        public int getWaitingToSend() {
            return this.clIOQueue;
        }

        public void reset() {
            this.bytesReceived = 0L;
            this.bytesSent = 0L;
            this.compressionRatio = 0.0f;
            this.decompressionRatio = 0.0f;
            this.counter = 0;
            this.clIOQueue = 0;
        }
    }
}

