/*
 * Decompiled with CFR 0.152.
 */
package tigase.cluster;

import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.script.Bindings;
import tigase.cluster.ClusterConnection;
import tigase.cluster.ClusterConnectionSelector;
import tigase.cluster.api.ClusterCommandException;
import tigase.cluster.api.ClusterConnectionHandler;
import tigase.cluster.api.ClusterConnectionSelectorIfc;
import tigase.cluster.api.ClusterControllerIfc;
import tigase.cluster.api.ClusterElement;
import tigase.cluster.api.ClusteredComponentIfc;
import tigase.cluster.api.CommandListener;
import tigase.cluster.api.CommandListenerAbstract;
import tigase.cluster.repo.ClConConfigRepository;
import tigase.cluster.repo.ClusterRepoItem;
import tigase.conf.ConfigurationException;
import tigase.db.RepositoryFactory;
import tigase.db.TigaseDBException;
import tigase.db.comp.ComponentRepository;
import tigase.db.comp.RepositoryChangeListenerIfc;
import tigase.disteventbus.EventBus;
import tigase.disteventbus.EventBusFactory;
import tigase.disteventbus.EventHandler;
import tigase.net.ConnectionType;
import tigase.net.SocketType;
import tigase.osgi.ModulesManagerImpl;
import tigase.server.ConnectionManager;
import tigase.server.Packet;
import tigase.server.ServiceChecker;
import tigase.stats.MaxDailyCounterQueue;
import tigase.stats.StatisticsList;
import tigase.sys.TigaseRuntime;
import tigase.util.Algorithms;
import tigase.util.TigaseStringprepException;
import tigase.util.TimeUtils;
import tigase.util.TimerTask;
import tigase.xml.Element;
import tigase.xmpp.Authorization;
import tigase.xmpp.BareJID;
import tigase.xmpp.JID;
import tigase.xmpp.PacketErrorTypeException;
import tigase.xmpp.XMPPIOService;

public class ClusterConnectionManager
extends ConnectionManager<XMPPIOService<Object>>
implements ClusteredComponentIfc,
RepositoryChangeListenerIfc<ClusterRepoItem>,
ClusterConnectionHandler {
    public static final String CLCON_REPO_CLASS_PROP_KEY = "repository-class";
    public static final String CLCON_REPO_CLASS_PROP_VAL = "tigase.cluster.repo.ClConSQLRepository";
    public static final String CLCON_REPO_CLASS_PROPERTY = "--cl-conn-repo-class";
    public static final String CLUSTER_CONNECTIONS_PER_NODE_PAR = "--cluster-connections-per-node";
    public static final String CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY = "cluster-connections-per-node";
    public static final int CLUSTER_CONNECTIONS_PER_NODE_VAL = 5;
    public static final String CLUSTER_CONNECTIONS_SELECTOR_KEY = "connection-selector";
    public static final String DEF_CLUSTER_CONNECTIONS_SELECTOR_VAL = ClusterConnectionSelector.class.getCanonicalName();
    public static final String CLUSTER_CONTR_ID_PROP_KEY = "cluster-controller-id";
    public static final String CLUSTER_INITIATED_EVENT = "cluster-initiated";
    public static final String COMPRESS_STREAM_PROP_KEY = "compress-stream";
    public static final String CONNECT_ALL_PAR = "--cluster-connect-all";
    public static final String CONNECT_ALL_PROP_KEY = "connect-all";
    public static final String NON_CLUSTER_TRAFFIC_ALLOWED_PROP_KEY = "non-cluster-traffic-allowed";
    public static final boolean NON_CLUSTER_TRAFFIC_ALLOWED_PROP_VAL = true;
    public static int ELEMENTS_NUMBER_LIMIT_CLUSTER_PROP_VAL = 100000;
    public static final String IDENTITY_TYPE_KEY = "identity-type";
    public static final String IDENTITY_TYPE_VAL = "generic";
    public static final String PORT_ROUTING_TABLE_PROP_KEY = "routing-table";
    public static final String RETURN_SERVICE_DISCO_KEY = "service-disco";
    public static final String SECRET_PROP_KEY = "secret";
    public static final String XMLNS = "tigase:cluster";
    private static final Logger log = Logger.getLogger(ClusterConnectionManager.class.getName());
    public static final boolean RETURN_SERVICE_DISCO_VAL = true;
    public static final boolean CONNECT_ALL_PROP_VAL = false;
    public static final boolean COMPRESS_STREAM_PROP_VAL = false;
    private static final String SERVICE_CONNECTED_TASK_FUTURE = "service-connected-task-future";
    public static final String REPO_ITEM_EVENT_NAME = "repo-item-modified";
    public static final String EVENTBUS_REPO_ITEM_EVENT_XMLNS = "tigase:system:cluster-update";
    private EventBus eventBus = null;
    public static final String EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_KEY = "eventbus-repository-notifications";
    public static final boolean EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_VALUE = false;
    private ClusterControllerIfc clusterController = null;
    private IOServiceStatisticsGetter ioStatsGetter = new IOServiceStatisticsGetter();
    private String identity_type = "generic";
    private Map<String, ClusterConnection> connectionsPool = new ConcurrentSkipListMap<String, ClusterConnection>();
    private boolean connect_all = false;
    private boolean compress_stream = false;
    private long[] lastDay = new long[24];
    private int lastDayIdx = 0;
    private long[] lastHour = new long[60];
    private int lastHourIdx = 0;
    private MaxDailyCounterQueue<Integer> maxNodes = new MaxDailyCounterQueue(31);
    private int maxNodesWithinLastWeek = 0;
    private int nodesNo = 0;
    private int per_node_conns = 5;
    private ComponentRepository<ClusterRepoItem> repo = null;
    private long servConnectedTimeouts = 0L;
    private long totalNodeDisconnects = 0L;
    private ClusterConnectionSelectorIfc connectionSelector = null;
    private CommandListener sendPacket = new SendPacket("deliver-cluster-packet-cmd");
    private boolean initialClusterConnectedDone = false;
    private boolean nonClusterTrafficAllowed = true;
    private EventHandler clusterEventHandler = null;
    private final TimerTask repoReloadTimerTask = new TimerTask(){

        @Override
        public void run() {
            try {
                if (ClusterConnectionManager.this.repo != null) {
                    ClusterConnectionManager.this.repo.reload();
                }
            }
            catch (TigaseDBException ex) {
                log.log(Level.WARNING, "Items reloading failed", ex);
            }
        }
    };

    @Override
    public int hashCodeForPacket(Packet packet) {
        if (packet.getElemName() == "cluster") {
            ClusterElement clel = new ClusterElement(packet.getElement());
            String userId = clel.getMethodParam("userId");
            if (userId != null) {
                return userId.hashCode();
            }
            Queue<Element> children = clel.getDataPackets();
            if (children != null && children.size() > 0) {
                Element child = children.peek();
                String stanzaAdd = child.getAttributeStaticStr("to");
                if (stanzaAdd != null) {
                    return stanzaAdd.hashCode();
                }
                stanzaAdd = child.getAttributeStaticStr("from");
                if (stanzaAdd != null) {
                    return stanzaAdd.hashCode();
                }
                log.log(Level.FINE, "No stanzaTo or from for cluster packet: {0}", packet);
            }
        }
        if (packet.getStanzaTo() != null) {
            return packet.getStanzaTo().hashCode();
        }
        return packet.getTo().hashCode();
    }

    @Override
    public void initBindings(Bindings binds) {
        super.initBindings(binds);
        binds.put("clusterCM", (Object)this);
        binds.put("comp_repo", (Object)this.repo);
    }

    boolean isInitialClusterConnectedDone() {
        return this.initialClusterConnectedDone;
    }

    @Override
    public void itemAdded(ClusterRepoItem repoItem) {
        log.log(Level.INFO, "Loaded repoItem: {0}", repoItem.toString());
        String host = repoItem.getHostname();
        boolean isCorrect = false;
        try {
            InetAddress addr = InetAddress.getByName(host);
            boolean bl = isCorrect = !addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && NetworkInterface.getByInetAddress(addr) == null;
            if (!isCorrect && log.isLoggable(Level.CONFIG)) {
                log.log(Level.CONFIG, "ClusterRepoItem of local machine, skipping connection attempt: {0}", repoItem);
            }
        }
        catch (SocketException | UnknownHostException ex) {
            log.log(Level.WARNING, "Incorrect ClusterRepoItem, skipping connection attempt: " + repoItem, ex);
        }
        if (isCorrect) {
            for (int i = 0; i < this.per_node_conns; ++i) {
                log.log(Level.CONFIG, "Trying to connect to cluster node: {0}", host);
                LinkedHashMap<String, Object> port_props = new LinkedHashMap<String, Object>(12);
                port_props.put(SECRET_PROP_KEY, repoItem.getPassword());
                port_props.put("local-host", this.getDefHostName());
                port_props.put("type", (Object)ConnectionType.connect);
                port_props.put("socket", (Object)SocketType.plain);
                port_props.put("remote-host", host);
                port_props.put("ifc", new String[]{host});
                port_props.put("max-reconnects", 99999999);
                port_props.put("port-no", repoItem.getPortNo());
                this.addWaitingTask(port_props);
            }
            this.sendEvent(REPO_ITEM_UPDATE_TYPE.ADDED, repoItem.getHostname(), repoItem.getSecondaryHostname());
        }
    }

    @Override
    public void itemRemoved(ClusterRepoItem item) {
        this.sendEvent(REPO_ITEM_UPDATE_TYPE.REMOVED, item.getHostname(), item.getSecondaryHostname());
    }

    @Override
    public void itemUpdated(ClusterRepoItem item) {
        this.sendEvent(REPO_ITEM_UPDATE_TYPE.UPDATED, item.getHostname(), item.getSecondaryHostname());
    }

    @Override
    public void nodeConnected(String node) {
        super.nodeConnected(node);
        this.maxNodes.add(this.getNodesConnectedWithLocal().size());
        this.maxNodesWithinLastWeek = this.maxNodes.getMaxValueInRange(7);
    }

    @Override
    public synchronized void everyHour() {
        super.everyHour();
        this.maxNodes.add(this.getNodesConnectedWithLocal().size());
        this.maxNodesWithinLastWeek = this.maxNodes.getMaxValueInRange(7);
    }

    @Override
    public void nodeDisconnected(String node) {
        super.nodeDisconnected(node);
        this.maxNodes.add(this.getNodesConnectedWithLocal().size());
        this.maxNodesWithinLastWeek = this.maxNodes.getMaxValueInRange(7);
    }

    @Override
    public int processingInThreads() {
        return Math.max(Runtime.getRuntime().availableProcessors(), this.nodesNo) * 8;
    }

    @Override
    public int processingOutThreads() {
        return Math.max(Runtime.getRuntime().availableProcessors(), this.nodesNo) * 8;
    }

    @Override
    public void processOutPacket(Packet packet) {
        if (packet.getElemName() == "cluster") {
            this.clusterController.handleClusterPacket(packet.getElement());
        } else {
            if (log.isLoggable(Level.FINER)) {
                log.log(Level.FINER, "Unexpected packet on cluster connection: {0}", packet);
            }
            super.processOutPacket(packet);
        }
    }

    @Override
    public void processPacket(Packet packet) {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Processing packet: {0}", packet);
        }
        if (packet.getStanzaTo() != null && packet.getStanzaTo().equals(this.getComponentId())) {
            try {
                this.addOutPacket(Authorization.FEATURE_NOT_IMPLEMENTED.getResponseMessage(packet, "Not implemented", true));
            }
            catch (PacketErrorTypeException e) {
                log.log(Level.WARNING, "Packet processing exception: {0}", e);
            }
            return;
        }
        if (packet.getElemName() == "cluster" || packet.getElemName() == "route") {
            this.writePacketToSocket(packet);
        } else if (this.nonClusterTrafficAllowed) {
            this.writePacketToSocket(packet.packRouted());
        } else if (log.isLoggable(Level.FINER)) {
            log.log(Level.FINER, "Unexpected packet for the cluster connetcion: {0}", packet);
        }
    }

    @Override
    public Queue<Packet> processSocketData(XMPPIOService<Object> serv) {
        Packet p = null;
        while ((p = serv.getReceivedPackets().poll()) != null) {
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "Processing socket data: {0}", p);
            }
            if (p.getElemName().equals("handshake")) {
                this.processHandshake(p, serv);
                continue;
            }
            if (p.getAttributeStaticStr(new String[]{"iq", "ping"}, "xmlns") == "urn:xmpp:ping" && this.getDefHostName().getDomain().equals(p.getStanzaTo().getDomain()) && p.getStanzaFrom().getDomain().equals(serv.getSessionData().get("remote-host"))) {
                if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "{0}, received XMPP ping", serv);
                }
                serv.getSessionData().put("lastConnectivityCheck", System.currentTimeMillis());
                continue;
            }
            Packet result = p;
            if (p.isRouted()) {
                try {
                    result = p.unpackRouted();
                }
                catch (TigaseStringprepException ex) {
                    log.log(Level.WARNING, "Packet stringprep addressing problem, dropping packet: {0}", p);
                    return null;
                }
            }
            this.addOutPacket(result);
        }
        return null;
    }

    @Override
    public boolean processUndeliveredPacket(Packet packet, Long stamp, String errorMessage) {
        this.addPacket(packet);
        return true;
    }

    @Override
    public void reconnectionFailed(Map<String, Object> port_props) {
    }

    @Override
    public int schedulerThreads() {
        return 4;
    }

    @Override
    public void serviceStarted(XMPPIOService<Object> serv) {
        if (!this.repoReloadTimerTask.isScheduled()) {
            this.addTimerTaskWithTimeout(this.repoReloadTimerTask, 0L, 15000L);
        }
        ServiceConnectedTimerTask task = new ServiceConnectedTimerTask(serv);
        serv.getSessionData().put(SERVICE_CONNECTED_TASK_FUTURE, task);
        this.addTimerTask((TimerTask)task, 10L, TimeUnit.SECONDS);
        super.serviceStarted(serv);
        log.log(Level.INFO, "cluster connection opened: {0}, type: {1}, id={2}", new Object[]{serv.getRemoteAddress(), serv.connectionType().toString(), serv.getUniqueId()});
        if (this.compress_stream) {
            log.log(Level.INFO, "Starting stream compression for: {0}", serv.getUniqueId());
            serv.startZLib(9);
        }
        switch (serv.connectionType()) {
            case connect: {
                String remote_host = (String)serv.getSessionData().get("remote-host");
                serv.getSessionData().put("hostname-key", this.getDefHostName().toString());
                serv.getSessionData().put(PORT_ROUTING_TABLE_PROP_KEY, new String[]{remote_host, ".*@" + remote_host, ".*\\." + remote_host});
                String data = "<stream:stream xmlns='tigase:cluster' xmlns:stream='http://etherx.jabber.org/streams' from='" + this.getDefHostName() + "' to='" + remote_host + "'>";
                log.log(Level.INFO, "cid: {0}, sending: {1}", new Object[]{(String)serv.getSessionData().get("cid"), data});
                serv.xmppStreamOpen(data);
                break;
            }
        }
    }

    @Override
    public boolean serviceStopped(XMPPIOService<Object> service) {
        boolean result = super.serviceStopped(service);
        if (result) {
            int minute;
            ConnectionType type;
            ConcurrentMap<String, Object> sessionData = service.getSessionData();
            String[] routings = (String[])sessionData.get(PORT_ROUTING_TABLE_PROP_KEY);
            String addr = (String)sessionData.get("remote-host");
            ClusterConnection conns = this.connectionsPool.get(addr);
            if (conns == null) {
                conns = new ClusterConnection(addr);
                this.connectionsPool.put(addr, conns);
            }
            int size = conns.size();
            conns.removeConn(service);
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "serviceStopped: result={0} / size={1} / connPool={2} / serv={3} / conns={4} / type={5}", new Object[]{result, size, this.connectionsPool, service, conns, service.connectionType()});
            }
            if (size != 0 && conns.size() == 0) {
                if (routings != null) {
                    this.updateRoutings(routings, false);
                }
                log.log(Level.INFO, "Disonnected from: {0}", addr);
                this.updateServiceDiscoveryItem(addr, addr, "tigase:cluster disconnected", true);
                this.clusterController.nodeDisconnected(addr);
            }
            if ((type = service.connectionType()) == ConnectionType.connect) {
                this.addWaitingTask(sessionData);
            }
            ++this.totalNodeDisconnects;
            int hour = TimeUtils.getHourNow();
            if (this.lastDayIdx != hour) {
                this.lastDayIdx = hour;
                this.lastDay[hour] = 0L;
                Arrays.fill(this.lastHour, 0L);
            }
            int n = hour;
            this.lastDay[n] = this.lastDay[n] + 1L;
            int n2 = minute = TimeUtils.getMinuteNow();
            this.lastHour[n2] = this.lastHour[n2] + 1L;
        }
        return result;
    }

    @Override
    public void tlsHandshakeCompleted(XMPPIOService<Object> service) {
    }

    @Override
    public void updateConnectionDetails(Map<String, Object> port_props) {
        String host = (String)port_props.get("remote-host");
        ClusterRepoItem item = this.repo.getItem(host);
        if (item != null) {
            port_props.put(SECRET_PROP_KEY, item.getPassword());
            port_props.put("port-no", item.getPortNo());
        } else {
            port_props.put("max-reconnects", 0);
        }
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "ClusterRepoItem: {0}, port_props: {1}", new Object[]{item, port_props});
        }
    }

    @Override
    public void xmppStreamClosed(XMPPIOService<Object> serv) {
        log.info("Stream closed.");
    }

    @Override
    public String[] xmppStreamOpened(XMPPIOService<Object> service, Map<String, String> attribs) {
        log.log(Level.INFO, "Stream opened: {0}, service: {1}", new Object[]{attribs, service});
        switch (service.connectionType()) {
            case connect: {
                String id = attribs.get("id");
                service.getSessionData().put("sessionID", id);
                ClusterRepoItem item = this.repo.getItem(this.getDefHostName().getDomain());
                String secret = item.getPassword();
                try {
                    String digest = Algorithms.hexDigest(id, secret, "SHA");
                    if (log.isLoggable(Level.FINEST)) {
                        log.log(Level.FINEST, "Calculating digest: id={0}, secret={1}, digest={2}, item={3}", new Object[]{id, secret, digest, item});
                    }
                    return new String[]{"<handshake>" + digest + "</handshake>"};
                }
                catch (NoSuchAlgorithmException e) {
                    log.log(Level.SEVERE, "Can not generate digest for pass phrase.", e);
                    return null;
                }
            }
            case accept: {
                String remote_host = attribs.get("from");
                service.getSessionData().put("hostname-key", this.getDefHostName().toString());
                service.getSessionData().put("remote-host", remote_host);
                service.getSessionData().put(PORT_ROUTING_TABLE_PROP_KEY, new String[]{remote_host, ".*@" + remote_host, ".*\\." + remote_host});
                String id = UUID.randomUUID().toString();
                service.getSessionData().put("sessionID", id);
                this.updateConnectionDetails(service.getSessionData());
                return new String[]{"<stream:stream xmlns='tigase:cluster' xmlns:stream='http://etherx.jabber.org/streams' from='" + this.getDefHostName() + "' to='" + remote_host + "' id='" + id + "'>"};
            }
        }
        return null;
    }

    @Override
    public Map<String, Object> getDefaults(Map<String, Object> params) {
        LinkedHashMap<String, Object> defs = new LinkedHashMap<String, Object>(50);
        String repo_class = (String)params.get(CLCON_REPO_CLASS_PROPERTY);
        String repo_uri = (String)params.get("--user-db-uri");
        if (repo_class != null) {
            defs.put(CLCON_REPO_CLASS_PROP_KEY, repo_class);
        }
        try {
            Class cls = repo_class == null ? RepositoryFactory.getRepoClass(ClConConfigRepository.class, repo_uri) : ModulesManagerImpl.getInstance().forName(repo_class);
            ComponentRepository repoTmp = (ComponentRepository)cls.newInstance();
            repoTmp.getDefaults(defs, params);
            if (this.repo == null) {
                this.repo = repoTmp;
            }
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Can not instantiate items repository for class: " + repo_class, e);
        }
        Map<String, Object> props = super.getDefaults(params);
        props.putAll(defs);
        props.put(RETURN_SERVICE_DISCO_KEY, true);
        props.put(IDENTITY_TYPE_KEY, IDENTITY_TYPE_VAL);
        if (params.get(CONNECT_ALL_PAR) == null || !((String)params.get(CONNECT_ALL_PAR)).equals("true")) {
            props.put(CONNECT_ALL_PROP_KEY, false);
        } else {
            props.put(CONNECT_ALL_PROP_KEY, true);
        }
        if (params.get("--cluster-nodes") != null) {
            String[] cl_nodes = ((String)params.get("--cluster-nodes")).split(",");
            for (int i = 0; i < cl_nodes.length; ++i) {
                cl_nodes[i] = BareJID.parseJID(cl_nodes[i])[1];
            }
            this.nodesNo = cl_nodes.length;
            props.put("cluster-nodes", cl_nodes);
        } else {
            props.put("cluster-nodes", new String[]{this.getDefHostName().getDomain()});
        }
        props.put(CLUSTER_CONTR_ID_PROP_KEY, "cluster-contr@" + this.getDefHostName());
        props.put(COMPRESS_STREAM_PROP_KEY, false);
        props.put(NON_CLUSTER_TRAFFIC_ALLOWED_PROP_KEY, true);
        String conns = (String)params.get(CLUSTER_CONNECTIONS_PER_NODE_PAR);
        int conns_int = Runtime.getRuntime().availableProcessors();
        if (conns != null) {
            try {
                conns_int = Integer.parseInt(conns);
            }
            catch (NumberFormatException e) {
                conns_int = 5;
            }
        }
        props.put(CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY, conns_int);
        props.put("elements-number-limit", ELEMENTS_NUMBER_LIMIT_CLUSTER_PROP_VAL);
        props.put("watchdog_ping_type", (Object)ConnectionManager.WATCHDOG_PING_TYPE.XMPP);
        props.put("watchdog_delay", 30000L);
        props.put("watchdog_timeout", -1000L);
        props.put(CLUSTER_CONNECTIONS_SELECTOR_KEY, DEF_CLUSTER_CONNECTIONS_SELECTOR_VAL);
        props.put(EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_KEY, false);
        if (this.getDefHostName().toString().equalsIgnoreCase("localhost")) {
            TigaseRuntime.getTigaseRuntime().shutdownTigase(new String[]{"ERROR! Tigase is running in Clustered Mode yet the hostname", "of the machine was resolved to *localhost* which will cause", "malfunctioning of Tigase in clustered environment!", "", "To prevent further issues with the clustering Tigase will be shutdown.", "", "Please make sure that FQDN hostname of the machine is set correctly", "and restart the server."});
        }
        return props;
    }

    @Override
    public String getDiscoCategoryType() {
        return this.identity_type;
    }

    @Override
    public String getDiscoDescription() {
        return "Cluster connection manager";
    }

    @Override
    public void getStatistics(StatisticsList list) {
        super.getStatistics(list);
        list.add(this.getName(), "Total disconnects", this.totalNodeDisconnects, Level.FINE);
        list.add(this.getName(), "Service connected time-outs", this.servConnectedTimeouts, Level.FINE);
        list.add(this.getName(), "Last day disconnects", Arrays.toString(this.lastDay), Level.FINE);
        list.add(this.getName(), "Last hour disconnects", Arrays.toString(this.lastHour), Level.FINE);
        this.ioStatsGetter.reset();
        this.doForAllServices(this.ioStatsGetter);
        list.add(this.getName(), "Average compression ratio", this.ioStatsGetter.getAverageCompressionRatio(), Level.FINE);
        list.add(this.getName(), "Average decompression ratio", this.ioStatsGetter.getAverageDecompressionRatio(), Level.FINE);
        list.add(this.getName(), "Waiting to send", this.ioStatsGetter.getWaitingToSend(), Level.FINE);
        list.add(this.getName(), "Max daily cluster nodes count in last month", this.maxNodes, Level.INFO);
        list.add(this.getName(), "Max nodes count within last week", this.maxNodesWithinLastWeek, Level.INFO);
        if (!list.checkLevel(Level.FINEST) && this.getNodesConnected().size() > 0) {
            list.add(this.getName(), "Known cluster nodes", this.getNodesConnected().size(), Level.INFO);
        }
    }

    @Override
    public void setClusterController(ClusterControllerIfc cl_controller) {
        super.setClusterController(cl_controller);
        this.clusterController = cl_controller;
        this.clusterController.removeCommandListener(this.sendPacket);
        this.clusterController.setCommandListener(this.sendPacket);
    }

    @Override
    public void setProperties(Map<String, Object> props) throws ConfigurationException {
        boolean eventbus_enabled;
        if (props.get(IDENTITY_TYPE_KEY) != null) {
            this.identity_type = (String)props.get(IDENTITY_TYPE_KEY);
        }
        if (props.get(COMPRESS_STREAM_PROP_KEY) != null) {
            this.compress_stream = (Boolean)props.get(COMPRESS_STREAM_PROP_KEY);
        }
        if (props.get(CONNECT_ALL_PROP_KEY) != null) {
            this.connect_all = (Boolean)props.get(CONNECT_ALL_PROP_KEY);
        }
        if (props.get(NON_CLUSTER_TRAFFIC_ALLOWED_PROP_KEY) != null) {
            this.nonClusterTrafficAllowed = (Boolean)props.get(NON_CLUSTER_TRAFFIC_ALLOWED_PROP_KEY);
        }
        if (props.get(CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY) != null) {
            this.per_node_conns = (Integer)props.get(CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY);
        }
        if (props.containsKey(CLUSTER_CONNECTIONS_SELECTOR_KEY)) {
            String selectorClsName = (String)props.get(CLUSTER_CONNECTIONS_SELECTOR_KEY);
            try {
                ClusterConnectionSelectorIfc tmp_selector = (ClusterConnectionSelectorIfc)ModulesManagerImpl.getInstance().forName(selectorClsName).newInstance();
                tmp_selector.setClusterConnectionHandler(this);
                tmp_selector.setProperties(props);
                this.connectionSelector = tmp_selector;
            }
            catch (ClassNotFoundException | IllegalAccessException | InstantiationException ex) {
                log.log(Level.SEVERE, "Coulnd not create instance of cluster connection selector of class " + selectorClsName, ex);
            }
        }
        this.connectionDelay = 5000L;
        if (props.size() == 1 || this.isInitializationComplete()) {
            super.setProperties(props);
            return;
        }
        String repo_class = (String)props.get(CLCON_REPO_CLASS_PROP_KEY);
        try {
            String repo_uri = (String)props.get("repo-uri");
            Class cls = repo_class == null ? RepositoryFactory.getRepoClass(ClConConfigRepository.class, repo_uri) : ModulesManagerImpl.getInstance().forName(repo_class);
            ComponentRepository repo_tmp = (ComponentRepository)cls.newInstance();
            repo_tmp.addRepoChangeListener(this);
            repo_tmp.setProperties(props);
            repo_tmp.initRepository(repo_uri, new HashMap<String, String>());
            ComponentRepository<ClusterRepoItem> old_repo = this.repo;
            this.repo = repo_tmp;
            if (old_repo != null) {
                old_repo.destroy();
            }
            this.repo.reload();
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Can not create items repository instance for class: " + repo_class, e);
        }
        if (props.get("elements-number-limit") != null) {
            this.elements_number_limit = (Integer)props.get("elements-number-limit");
        }
        if (props.get(EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_KEY) != null && (eventbus_enabled = ((Boolean)props.get(EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_KEY)).booleanValue())) {
            this.eventBus = EventBusFactory.getInstance();
        }
        super.setProperties(props);
    }

    @Override
    public void start() {
        super.start();
        if (this.clusterEventHandler == null) {
            this.clusterEventHandler = new EventHandler(){

                @Override
                public void onEvent(String name, String xmlns, Element event) {
                    if (log.isLoggable(Level.FINE)) {
                        log.log(Level.FINE, "Setting initialClusterConnectedDone to true (was: {0})", ClusterConnectionManager.this.initialClusterConnectedDone);
                    }
                    ClusterConnectionManager.this.initialClusterConnectedDone = true;
                    EventBusFactory.getInstance().removeHandler(ClusterConnectionManager.CLUSTER_INITIATED_EVENT, ClusterConnectionManager.CLUSTER_INITIATED_EVENT, this);
                }
            };
        }
        EventBusFactory.getInstance().addHandler(CLUSTER_INITIATED_EVENT, CLUSTER_INITIATED_EVENT, this.clusterEventHandler);
    }

    @Override
    public void stop() {
        super.stop();
        EventBusFactory.getInstance().removeHandler(CLUSTER_INITIATED_EVENT, CLUSTER_INITIATED_EVENT, this.clusterEventHandler);
        this.clusterEventHandler = null;
    }

    private void sendEvent(REPO_ITEM_UPDATE_TYPE action, String hostname, String secondary) {
        if (this.eventBus == null || hostname == null) {
            return;
        }
        Element event = new Element(REPO_ITEM_EVENT_NAME, new String[]{"xmlns"}, new String[]{EVENTBUS_REPO_ITEM_EVENT_XMLNS});
        event.setAttribute("local", "true");
        Element repoItem = new Element("repo-item");
        repoItem.setAttribute("action", action.name());
        repoItem.addAttribute("hostname", hostname);
        repoItem.addAttribute("secondary", null != secondary ? secondary : "");
        event.addChild(repoItem);
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Sending event: " + event);
        }
        this.eventBus.fire(event);
    }

    protected void serviceConnected(XMPPIOService<Object> serv) {
        String[] routings = (String[])serv.getSessionData().get(PORT_ROUTING_TABLE_PROP_KEY);
        String addr = (String)serv.getSessionData().get("remote-host");
        ClusterConnection conns = this.connectionsPool.get(addr);
        if (conns == null) {
            conns = new ClusterConnection(addr);
            this.connectionsPool.put(addr, conns);
        }
        int size = conns.size();
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "New service connected: size = {0} / connectionsPool={1} / serv={2} / conns={3}", new Object[]{size, this.connectionsPool, serv, conns});
        }
        serv.setUserJid((String)serv.getSessionData().get("remote-host"));
        conns.addConn(serv);
        if (size == 0 && conns.size() > 0) {
            this.updateRoutings(routings, true);
            log.log(Level.INFO, "Connected to: {0}", addr);
            this.updateServiceDiscoveryItem(addr, addr, "tigase:cluster connected", true);
            this.clusterController.nodeConnected(addr);
        }
        try {
            int connectedSize = this.getNodesConnected().size();
            int repoSize = this.repo.allItems().size();
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "All repo nodes connected! Connected: {0}, repo size: {1}, initialClusterConnectedDone: {2}", new Object[]{connectedSize, repoSize, this.initialClusterConnectedDone});
            }
            if (!this.initialClusterConnectedDone && (repoSize <= 1 || repoSize > 1 && connectedSize >= repoSize - 1)) {
                this.initialClusterConnectedDone = true;
                Element event = new Element(CLUSTER_INITIATED_EVENT);
                event.setXMLNS(CLUSTER_INITIATED_EVENT);
                event.setAttribute("local", "true");
                EventBusFactory.getInstance().fire(event);
            }
        }
        catch (TigaseDBException e) {
            log.log(Level.WARNING, "There was an error while reading size of cluster repository", e);
        }
        ServiceConnectedTimerTask task = (ServiceConnectedTimerTask)serv.getSessionData().get(SERVICE_CONNECTED_TASK_FUTURE);
        if (task == null) {
            log.log(Level.WARNING, "Missing service connected timer task: {0}", serv);
        } else {
            task.cancel();
        }
    }

    @Override
    protected boolean writePacketToSocket(Packet p) {
        String ip = p.getTo().getDomain();
        ClusterConnection conns = this.connectionsPool.get(ip);
        XMPPIOService<Object> serv = this.connectionSelector.selectConnection(p, conns);
        if (serv != null) {
            return super.writePacketToSocket(serv, p);
        }
        log.log(Level.WARNING, "No cluster connection to send a packet: {0}", p);
        return false;
    }

    @Override
    protected int[] getDefPlainPorts() {
        ClusterRepoItem item = this.repo.getItem(this.getDefHostName().getDomain());
        return new int[]{item.getPortNo()};
    }

    @Override
    protected String getDefTrafficThrottling() {
        return "xmpp:25m:0:disc,bin:20000m:0:disc";
    }

    @Override
    protected long getMaxInactiveTime() {
        return 180000L;
    }

    @Override
    protected Integer getMaxQueueSize(int def) {
        return def * 10;
    }

    @Override
    protected Map<String, Object> getParamsForPort(int port) {
        LinkedHashMap<String, Object> defs = new LinkedHashMap<String, Object>(10);
        defs.put("type", (Object)ConnectionType.accept);
        defs.put("socket", (Object)SocketType.plain);
        defs.put("ifc", this.PORT_IFC_PROP_VAL);
        return defs;
    }

    @Override
    protected XMPPIOService<Object> getXMPPIOServiceInstance() {
        return new XMPPIOService<Object>();
    }

    @Override
    protected boolean isHighThroughput() {
        return true;
    }

    private void processHandshake(Packet p, XMPPIOService<Object> serv) {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Processing handshake: packet={0} / service={1} / sessionData={2}", new Object[]{p, serv, serv.getSessionData()});
        }
        String serv_addr = (String)serv.getSessionData().get("remote-host");
        try {
            InetAddress addr = InetAddress.getByName(serv_addr);
            if (addr.isAnyLocalAddress() || addr.isLoopbackAddress() || NetworkInterface.getByInetAddress(addr) != null) {
                log.log(Level.WARNING, "Cluster handshake received from this instance, terminating: {0}", serv_addr);
                serv.stop();
                return;
            }
        }
        catch (Exception ex) {
            log.log(Level.WARNING, "Cluster handshake received from this instance, terminating: " + serv_addr, ex);
            serv.stop();
        }
        switch (serv.connectionType()) {
            case connect: {
                String data = p.getElemCData();
                if (data == null) {
                    this.serviceConnected(serv);
                    break;
                }
                log.log(Level.WARNING, "Incorrect packet received: {0}", p);
                break;
            }
            case accept: {
                String digest = p.getElemCData();
                String id = (String)serv.getSessionData().get("sessionID");
                String secret = (String)serv.getSessionData().get(SECRET_PROP_KEY);
                try {
                    String loc_digest = Algorithms.hexDigest(id, secret, "SHA");
                    if (log.isLoggable(Level.FINEST)) {
                        log.log(Level.FINEST, "Calculating digest: secret={0}, digest={1}, loc_digest={2}, sessionData={3}", new Object[]{secret, digest, loc_digest, serv.getSessionData()});
                    }
                    if (digest != null && digest.equals(loc_digest)) {
                        Packet resp = Packet.packetInstance(new Element("handshake"), null, null);
                        this.writePacketToSocket(serv, resp);
                        this.serviceConnected(serv);
                        break;
                    }
                    if (secret == null) {
                        log.log(Level.WARNING, "Remote hostname not found in local configuration or time difference between cluster nodes is too big. Connection not accepted: {0}", serv);
                        if (log.isLoggable(Level.FINEST)) {
                            log.log(Level.FINEST, "Remote hostname not found in local configuration or time difference between cluster nodes is too big. Connection not accepted! Remote host: {0}, sessionData: {1}, repoItem: {2}, service: {3}", new Object[]{serv_addr, serv.getSessionData(), this.repo.getItem(serv_addr), serv});
                        }
                    } else {
                        log.log(Level.WARNING, "Handshaking password doesn''t match, disconnecting: {0}", serv);
                        if (log.isLoggable(Level.FINEST)) {
                            log.log(Level.WARNING, "Handshaking password doesn''t match, disconnecting! Remote host: {0}, sessionData: {1}, repoItem: {2}, service: {3}", new Object[]{serv_addr, serv.getSessionData(), this.repo.getItem(serv_addr), serv});
                        }
                    }
                    serv.stop();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Handshaking error.", e);
                }
                break;
            }
        }
    }

    private void updateRoutings(String[] routings, boolean add) {
        if (add) {
            for (String route : routings) {
                try {
                    this.addRegexRouting(route);
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Can not add regex routing ''{0}'' : {1}", new Object[]{route, e});
                }
            }
        } else {
            for (String route : routings) {
                try {
                    this.removeRegexRouting(route);
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Can not remove regex routing ''{0}'' : {1}", new Object[]{route, e});
                }
            }
        }
    }

    protected class Watchdog
    extends ConnectionManager.Watchdog {
        protected Watchdog() {
            super(ClusterConnectionManager.this);
        }

        @Override
        protected long getDurationSinceLastTransfer(XMPPIOService service) {
            Long lastTransfer = (Long)service.getSessionData().get("lastConnectivityCheck");
            if (lastTransfer == null) {
                service.getSessionData().put("lastConnectivityCheck", System.currentTimeMillis() - ClusterConnectionManager.this.watchdogTimeout);
                return ClusterConnectionManager.this.watchdogTimeout;
            }
            return System.currentTimeMillis() - lastTransfer;
        }
    }

    private class ServiceConnectedTimerTask
    extends TimerTask {
        private XMPPIOService<Object> serv = null;

        private ServiceConnectedTimerTask(XMPPIOService<Object> serv) {
            this.serv = serv;
        }

        @Override
        public void run() {
            ++ClusterConnectionManager.this.servConnectedTimeouts;
            log.log(Level.INFO, "ServiceConnectedTimer timeout expired, closing connection: {0}", this.serv);
            this.serv.forceStop();
        }
    }

    private class SendPacket
    extends CommandListenerAbstract {
        private SendPacket(String name) {
            super(name, null);
        }

        @Override
        public void executeCommand(JID fromNode, Set<JID> visitedNodes, Map<String, String> data, Queue<Element> packets) throws ClusterCommandException {
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "Called fromNode: {0}, visitedNodes: {1}, data: {2}, packets: {3}", new Object[]{fromNode, visitedNodes, data, packets});
            }
            for (Element element : packets) {
                try {
                    ClusterConnectionManager.this.addPacketNB(Packet.packetInstance(element));
                }
                catch (TigaseStringprepException ex) {
                    log.log(Level.WARNING, "Stringprep exception for packet: {0}", element);
                }
            }
        }
    }

    private class IOServiceStatisticsGetter
    implements ServiceChecker<XMPPIOService<Object>> {
        private int clIOQueue = 0;
        private float compressionRatio = 0.0f;
        private int counter = 0;
        private float decompressionRatio = 0.0f;
        private StatisticsList list = new StatisticsList(Level.ALL);

        private IOServiceStatisticsGetter() {
        }

        @Override
        public void check(XMPPIOService<Object> service) {
            service.getStatistics(this.list, true);
            this.compressionRatio += this.list.getValue("zlibio", "Average compression rate", -1.0f);
            this.decompressionRatio += this.list.getValue("zlibio", "Average decompression rate", -1.0f);
            ++this.counter;
            this.clIOQueue += service.waitingToSendSize();
        }

        public void reset() {
            this.clIOQueue = 0;
            this.counter = 0;
            this.compressionRatio = 0.0f;
            this.decompressionRatio = 0.0f;
        }

        public float getAverageCompressionRatio() {
            return this.compressionRatio / (float)this.counter;
        }

        public float getAverageDecompressionRatio() {
            return this.decompressionRatio / (float)this.counter;
        }

        public int getWaitingToSend() {
            return this.clIOQueue;
        }
    }

    public static enum REPO_ITEM_UPDATE_TYPE {
        ADDED,
        UPDATED,
        REMOVED;

    }
}

