/*
 * Decompiled with CFR 0.152.
 */
package tigase.cluster;

import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.script.Bindings;
import tigase.cluster.ClusterConnection;
import tigase.cluster.api.ClusterCommandException;
import tigase.cluster.api.ClusterConnectionHandler;
import tigase.cluster.api.ClusterConnectionSelectorIfc;
import tigase.cluster.api.ClusterControllerIfc;
import tigase.cluster.api.ClusterElement;
import tigase.cluster.api.ClusteredComponentIfc;
import tigase.cluster.api.CommandListener;
import tigase.cluster.api.CommandListenerAbstract;
import tigase.cluster.repo.ClusterRepoItem;
import tigase.cluster.repo.ClusterRepoItemEvent;
import tigase.db.DBInitException;
import tigase.db.DataSource;
import tigase.db.DataSourceHelper;
import tigase.db.TigaseDBException;
import tigase.db.beans.DataSourceBean;
import tigase.db.comp.AbstractSDComponentRepositoryBean;
import tigase.db.comp.ComponentRepository;
import tigase.db.comp.ComponentRepositoryDataSourceAware;
import tigase.db.comp.RepositoryChangeListenerIfc;
import tigase.eventbus.EventBus;
import tigase.eventbus.EventBusEvent;
import tigase.eventbus.EventListener;
import tigase.kernel.beans.Bean;
import tigase.kernel.beans.Inject;
import tigase.kernel.beans.config.ConfigField;
import tigase.kernel.beans.selector.ClusterModeRequired;
import tigase.kernel.beans.selector.ConfigType;
import tigase.kernel.beans.selector.ConfigTypeEnum;
import tigase.kernel.core.Kernel;
import tigase.net.ConnectionType;
import tigase.net.SocketType;
import tigase.server.ConnectionManager;
import tigase.server.Packet;
import tigase.server.ServiceChecker;
import tigase.stats.MaxDailyCounterQueue;
import tigase.stats.StatisticsList;
import tigase.sys.TigaseRuntime;
import tigase.util.Algorithms;
import tigase.util.common.TimerTask;
import tigase.util.datetime.TimeUtils;
import tigase.util.reflection.ReflectionHelper;
import tigase.util.stringprep.TigaseStringprepException;
import tigase.xml.Element;
import tigase.xmpp.Authorization;
import tigase.xmpp.PacketErrorTypeException;
import tigase.xmpp.XMPPIOService;
import tigase.xmpp.jid.JID;

@Bean(name="cl-comp", parent=Kernel.class, active=true)
@ConfigType(value={ConfigTypeEnum.DefaultMode, ConfigTypeEnum.SessionManagerMode, ConfigTypeEnum.ConnectionManagersMode, ConfigTypeEnum.ComponentMode})
@ClusterModeRequired(active=true)
public class ClusterConnectionManager
extends ConnectionManager<XMPPIOService<Object>>
implements ClusteredComponentIfc,
RepositoryChangeListenerIfc<ClusterRepoItem>,
ClusterConnectionHandler {
    public static final String CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY = "cluster-connections-per-node";
    public static final int CLUSTER_CONNECTIONS_PER_NODE_VAL = 5;
    public static final String CLUSTER_CONTR_ID_PROP_KEY = "cluster-controller-id";
    public static final String COMPRESS_STREAM_PROP_KEY = "compress-stream";
    public static final String CONNECT_ALL_PAR = "--cluster-connect-all";
    public static final String CONNECT_ALL_PROP_KEY = "connect-all";
    public static final String NON_CLUSTER_TRAFFIC_ALLOWED_PROP_KEY = "non-cluster-traffic-allowed";
    public static final boolean NON_CLUSTER_TRAFFIC_ALLOWED_PROP_VAL = true;
    public static final String IDENTITY_TYPE_KEY = "identity-type";
    public static final String IDENTITY_TYPE_VAL = "generic";
    public static final String PORT_ROUTING_TABLE_PROP_KEY = "routing-table";
    public static final String RETURN_SERVICE_DISCO_KEY = "service-disco";
    public static final String SECRET_PROP_KEY = "secret";
    public static final String XMLNS = "tigase:cluster";
    public static final boolean RETURN_SERVICE_DISCO_VAL = true;
    public static final boolean CONNECT_ALL_PROP_VAL = false;
    public static final boolean COMPRESS_STREAM_PROP_VAL = false;
    public static final String EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_KEY = "eventbus-repository-notifications";
    public static final boolean EVENTBUS_REPOSITORY_NOTIFICATIONS_ENABLED_VALUE = false;
    private static final Logger log = Logger.getLogger(ClusterConnectionManager.class.getName());
    public static int ELEMENTS_NUMBER_LIMIT_CLUSTER_PROP_VAL = 100000;
    @Inject
    private ClusterControllerIfc clusterController = null;
    private EventListener<ClusterInitializedEvent> clusterEventHandler = null;
    @ConfigField(desc="Compress stream", alias="compress-stream")
    private boolean compress_stream = false;
    @ConfigField(desc="Connect to all nodes", alias="connect-all")
    private boolean connect_all = false;
    @Inject
    private ClusterConnectionSelectorIfc connectionSelector = null;
    private Map<String, ClusterConnection> connectionsPool = new ConcurrentSkipListMap<String, ClusterConnection>();
    @Inject
    private DataSourceBean dataSourceBean = null;
    @Inject
    private EventBus eventBus = null;
    private String identity_type = "generic";
    private boolean initialClusterConnectedDone = false;
    private IOServiceStatisticsGetter ioStatsGetter = new IOServiceStatisticsGetter();
    private long[] lastDay = new long[24];
    private int lastDayIdx = 0;
    private long[] lastHour = new long[60];
    private int lastHourIdx = 0;
    private MaxDailyCounterQueue<Integer> maxNodes = new MaxDailyCounterQueue(31);
    private int maxNodesWithinLastWeek = 0;
    private int nodesNo = 0;
    @ConfigField(desc="Allow non cluster traffic over cluster connection", alias="non-cluster-traffic-allowed")
    private boolean nonClusterTrafficAllowed = true;
    @ConfigField(desc="Number of connections to open per node", alias="connections-per-node")
    private int per_node_conns = 5;
    @Inject
    private ComponentRepository<ClusterRepoItem> repo = null;
    private final TimerTask repoReloadTimerTask = new TimerTask(){

        @Override
        public void run() {
            try {
                if (ClusterConnectionManager.this.repo != null) {
                    ClusterConnectionManager.this.repo.reload();
                }
            }
            catch (TigaseDBException ex) {
                log.log(Level.WARNING, "Items reloading failed", ex);
            }
        }
    };
    private CommandListener sendPacket = new SendPacket("deliver-cluster-packet-cmd");
    private long servConnectedTimeouts = 0L;
    private long totalNodeDisconnects = 0L;

    public ClusterConnectionManager() {
        this.serviceConnectedTimeout = 10;
        this.elements_number_limit = ELEMENTS_NUMBER_LIMIT_CLUSTER_PROP_VAL;
        if (this.getDefHostName().toString().equalsIgnoreCase("localhost")) {
            TigaseRuntime.getTigaseRuntime().shutdownTigase(new String[]{"ERROR! Tigase is running in Clustered Mode yet the hostname", "of the machine was resolved to *localhost* which will cause", "malfunctioning of Tigase in clustered environment!", "", "To prevent further issues with the clustering Tigase will be shutdown.", "", "Please make sure that FQDN hostname of the machine is set correctly", "and restart the server."});
        }
        this.connectionDelay = 5000L;
        this.watchdogPingType = ConnectionManager.WATCHDOG_PING_TYPE.XMPP;
        this.watchdogDelay = 30000L;
        this.watchdogTimeout = -1000L;
    }

    @Override
    protected boolean enableServiceConnectedTimeout(XMPPIOService<Object> service) {
        return true;
    }

    @Override
    public int hashCodeForPacket(Packet packet) {
        if (packet.getElemName() == "cluster") {
            ClusterElement clel = new ClusterElement(packet.getElement());
            String userId = clel.getMethodParam("userId");
            if (userId != null) {
                return userId.hashCode();
            }
            Queue<Element> children = clel.getDataPackets();
            if (children != null && children.size() > 0) {
                Element child = children.peek();
                String stanzaAdd = child.getAttributeStaticStr("to");
                if (stanzaAdd != null) {
                    return stanzaAdd.hashCode();
                }
                stanzaAdd = child.getAttributeStaticStr("from");
                if (stanzaAdd != null) {
                    return stanzaAdd.hashCode();
                }
                log.log(Level.FINE, "No stanzaTo or from for cluster packet: {0}", packet);
            }
        }
        if (packet.getStanzaTo() != null) {
            return packet.getStanzaTo().hashCode();
        }
        return packet.getTo() != null ? packet.getTo().hashCode() : packet.toString().hashCode();
    }

    @Override
    public void initBindings(Bindings binds) {
        super.initBindings(binds);
        binds.put("clusterCM", (Object)this);
        binds.put("comp_repo", (Object)this.repo);
    }

    @Override
    public void itemAdded(ClusterRepoItem repoItem) {
        log.log(Level.INFO, "Loaded repoItem: {0}", repoItem.toString());
        String host = repoItem.getHostname();
        boolean isCorrect = false;
        try {
            InetAddress addr = InetAddress.getByName(host);
            boolean bl = isCorrect = !addr.isAnyLocalAddress() && !addr.isLoopbackAddress() && NetworkInterface.getByInetAddress(addr) == null;
            if (!isCorrect && log.isLoggable(Level.CONFIG)) {
                log.log(Level.CONFIG, "ClusterRepoItem of local machine, skipping connection attempt: {0}", repoItem);
            }
        }
        catch (SocketException | UnknownHostException ex) {
            log.log(Level.WARNING, "Incorrect ClusterRepoItem, skipping connection attempt: " + repoItem, ex);
        }
        if (isCorrect) {
            for (int i = 0; i < this.per_node_conns; ++i) {
                log.log(Level.CONFIG, "Trying to connect to cluster node: {0}", host);
                LinkedHashMap<String, Object> port_props = new LinkedHashMap<String, Object>(12);
                port_props.put(SECRET_PROP_KEY, repoItem.getPassword());
                port_props.put("local-host", this.getDefHostName());
                port_props.put("type", (Object)ConnectionType.connect);
                port_props.put("socket", (Object)SocketType.plain);
                port_props.put("remote-host", host);
                port_props.put("ifc", new String[]{host});
                port_props.put("max-reconnects", 99999999);
                port_props.put("port-no", repoItem.getPortNo());
                this.addWaitingTask(port_props);
            }
            this.sendEvent(REPO_ITEM_UPDATE_TYPE.ADDED, repoItem);
        }
    }

    @Override
    public void itemRemoved(ClusterRepoItem item) {
        ClusterConnection clusterConnection = this.connectionsPool.get(item.getHostname());
        if (clusterConnection != null && clusterConnection.size() > 0) {
            for (XMPPIOService<Object> service : clusterConnection.getConnections()) {
                clusterConnection.removeConn(service);
                service.stop();
            }
        }
        this.sendEvent(REPO_ITEM_UPDATE_TYPE.REMOVED, item);
    }

    @Override
    public void itemUpdated(ClusterRepoItem item) {
        this.sendEvent(REPO_ITEM_UPDATE_TYPE.UPDATED, item);
    }

    @Override
    public void nodeConnected(String node) {
        super.nodeConnected(node);
        this.maxNodes.add(this.getNodesConnectedWithLocal().size());
        this.maxNodesWithinLastWeek = this.maxNodes.getMaxValueInRange(7).orElse(-1);
    }

    @Override
    public synchronized void everyHour() {
        super.everyHour();
        this.maxNodes.add(this.getNodesConnectedWithLocal().size());
        this.maxNodesWithinLastWeek = this.maxNodes.getMaxValueInRange(7).orElse(-1);
    }

    @Override
    public void nodeDisconnected(String node) {
        super.nodeDisconnected(node);
        this.maxNodes.add(this.getNodesConnectedWithLocal().size());
        this.maxNodesWithinLastWeek = this.maxNodes.getMaxValueInRange(7).orElse(-1);
    }

    @Override
    public int processingInThreads() {
        return Math.max(Runtime.getRuntime().availableProcessors(), this.nodesNo) * 8;
    }

    @Override
    public int processingOutThreads() {
        return Math.max(Runtime.getRuntime().availableProcessors(), this.nodesNo) * 8;
    }

    @Override
    public void processOutPacket(Packet packet) {
        if (packet.getElemName() == "cluster") {
            this.clusterController.handleClusterPacket(packet.getElement());
        } else {
            if (log.isLoggable(Level.FINER)) {
                log.log(Level.FINER, "Unexpected packet on cluster connection: {0}", packet);
            }
            super.processOutPacket(packet);
        }
    }

    @Override
    public void processPacket(Packet packet) {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Processing packet: {0}", packet);
        }
        if (packet.getStanzaTo() != null && packet.getStanzaTo().equals((Object)this.getComponentId())) {
            try {
                this.addOutPacket(Authorization.FEATURE_NOT_IMPLEMENTED.getResponseMessage(packet, "Not implemented", true));
            }
            catch (PacketErrorTypeException e) {
                log.log(Level.WARNING, "Packet processing exception: {0}", e);
            }
            return;
        }
        if (packet.getElemName() == "cluster" || packet.getElemName() == "route") {
            this.writePacketToSocket(packet);
        } else if (this.nonClusterTrafficAllowed) {
            this.writePacketToSocket(packet.packRouted());
        } else if (log.isLoggable(Level.FINER)) {
            log.log(Level.FINER, "Unexpected packet for the cluster connetcion: {0}", packet);
        }
    }

    @Override
    public Queue<Packet> processSocketData(XMPPIOService<Object> serv) {
        Packet p = null;
        while ((p = serv.getReceivedPackets().poll()) != null) {
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "Processing socket data: {0}", p);
            }
            if (p.getElemName().equals("handshake")) {
                this.processHandshake(p, serv);
                continue;
            }
            if (p.getAttributeStaticStr(new String[]{"iq", "ping"}, "xmlns") == "urn:xmpp:ping" && p.getStanzaTo() != null && this.getDefHostName().getDomain().equals(p.getStanzaTo().getDomain()) && p.getStanzaFrom() != null && p.getStanzaFrom().getDomain().equals(serv.getSessionData().get("remote-host"))) {
                if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "Received XMPP ping [{0}]", serv);
                }
                serv.getSessionData().put("lastConnectivityCheck", System.currentTimeMillis());
                continue;
            }
            Packet result = p;
            if (p.isRouted()) {
                try {
                    result = p.unpackRouted();
                }
                catch (TigaseStringprepException ex) {
                    log.log(Level.WARNING, "Packet stringprep addressing problem, dropping packet: {0}", p);
                    return null;
                }
            }
            this.addOutPacket(result);
        }
        return null;
    }

    @Override
    public boolean processUndeliveredPacket(Packet packet, Long stamp, String errorMessage) {
        try {
            this.addPacket(packet);
        }
        catch (NullPointerException ex) {
            log.log(Level.WARNING, "could not redeliver cluster packet on broken cluster connection:", packet.toString());
        }
        return true;
    }

    @Override
    public void reconnectionFailed(Map<String, Object> port_props) {
    }

    @Override
    public int schedulerThreads() {
        return 4;
    }

    @Override
    public void serviceStarted(XMPPIOService<Object> serv) {
        if (!this.repoReloadTimerTask.isScheduled()) {
            this.addTimerTaskWithTimeout(this.repoReloadTimerTask, 0L, 15000L);
        }
        super.serviceStarted(serv);
        log.log(Level.INFO, "Cluster connection opened: {0}, type: {1}, id={2}", new Object[]{serv.getRemoteAddress(), serv.connectionType().toString(), serv.getUniqueId()});
        if (this.compress_stream) {
            log.log(Level.INFO, "Starting stream compression for: {0}", serv.getUniqueId());
            serv.startZLib(9);
        }
        switch (serv.connectionType()) {
            case connect: {
                String remote_host = (String)serv.getSessionData().get("remote-host");
                serv.getSessionData().put("hostname-key", this.getDefHostName().toString());
                serv.getSessionData().put(PORT_ROUTING_TABLE_PROP_KEY, new String[]{remote_host, ".*@" + remote_host, ".*\\." + remote_host});
                String data = "<stream:stream xmlns='tigase:cluster' xmlns:stream='http://etherx.jabber.org/streams' from='" + this.getDefHostName() + "' to='" + remote_host + "'>";
                log.log(Level.INFO, "cid: {0}, sending: {1}", new Object[]{(String)serv.getSessionData().get("cid"), data});
                serv.xmppStreamOpen(data);
                break;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean serviceStopped(XMPPIOService<Object> service) {
        boolean result = super.serviceStopped(service);
        if (result) {
            int minute;
            ClusterRepoItem item;
            ConcurrentMap<String, Object> sessionData = service.getSessionData();
            String[] routings = (String[])sessionData.get(PORT_ROUTING_TABLE_PROP_KEY);
            String addr = (String)sessionData.get("remote-host");
            ClusterConnection conns = this.connectionsPool.get(addr);
            if (conns == null) {
                conns = new ClusterConnection(addr);
                this.connectionsPool.put(addr, conns);
            }
            ClusterConnection clusterConnection = conns;
            synchronized (clusterConnection) {
                int size = conns.size();
                conns.removeConn(service);
                if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "serviceStopped: result={0} / size={1} / connPool={2} / serv={3} / conns={4} / connsSize: {5}, / type={6}", new Object[]{result, size, this.connectionsPool, service, conns, conns.size(), service.connectionType()});
                }
                if (size != 0 && conns.size() == 0) {
                    if (routings != null) {
                        this.updateRoutings(routings, false);
                    }
                    log.log(Level.INFO, "Disconnected from: {0}", addr);
                    this.updateServiceDiscoveryItem(addr, addr, "tigase:cluster disconnected", true);
                    this.clusterController.nodeDisconnected(addr);
                }
            }
            ConnectionType type = service.connectionType();
            if (type == ConnectionType.connect && (item = this.repo.getItem(addr)) != null) {
                this.addWaitingTask(sessionData);
            }
            ++this.totalNodeDisconnects;
            int hour = TimeUtils.getHourNow();
            if (this.lastDayIdx != hour) {
                this.lastDayIdx = hour;
                this.lastDay[hour] = 0L;
                Arrays.fill(this.lastHour, 0L);
            }
            int n = hour;
            this.lastDay[n] = this.lastDay[n] + 1L;
            int n2 = minute = TimeUtils.getMinuteNow();
            this.lastHour[n2] = this.lastHour[n2] + 1L;
        }
        return result;
    }

    public void setRepo(ComponentRepository<ClusterRepoItem> repo) {
        if (this.repo != null) {
            this.repo.removeRepoChangeListener(this);
        }
        this.repo = repo;
        if (this.repo != null) {
            this.repo.addRepoChangeListener(this);
        }
    }

    @Override
    public void tlsHandshakeCompleted(XMPPIOService<Object> service) {
    }

    @Override
    public void updateConnectionDetails(Map<String, Object> port_props) {
        String host = (String)port_props.get("remote-host");
        ClusterRepoItem item = this.repo.getItem(host);
        if (item != null) {
            port_props.put(SECRET_PROP_KEY, item.getPassword());
            port_props.put("port-no", item.getPortNo());
        } else {
            port_props.put("max-reconnects", 0);
        }
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "ClusterRepoItem: {0}, port_props: {1}", new Object[]{item, port_props});
        }
    }

    @Override
    public void xmppStreamClosed(XMPPIOService<Object> serv) {
        log.info("Stream closed.");
    }

    @Override
    public String[] xmppStreamOpened(XMPPIOService<Object> service, Map<String, String> attribs) {
        log.log(Level.INFO, "Stream opened: {0}, service: {1}", new Object[]{attribs, service});
        switch (service.connectionType()) {
            case connect: {
                String id = attribs.get("id");
                service.getSessionData().put("sessionID", id);
                ClusterRepoItem item = this.repo.getItem(this.getDefHostName().getDomain());
                String secret = item.getPassword();
                try {
                    String digest = Algorithms.hexDigest((String)id, (String)secret, (String)"SHA");
                    if (log.isLoggable(Level.FINEST)) {
                        log.log(Level.FINEST, "Calculating digest: id={0}, secret={1}, digest={2}, item={3}", new Object[]{id, secret, digest, item});
                    }
                    return new String[]{"<handshake>" + digest + "</handshake>"};
                }
                catch (NoSuchAlgorithmException e) {
                    log.log(Level.SEVERE, "Can not generate digest for pass phrase.", e);
                    return null;
                }
            }
            case accept: {
                String remote_host = attribs.get("from");
                service.getSessionData().put("hostname-key", this.getDefHostName().toString());
                service.getSessionData().put("remote-host", remote_host);
                service.getSessionData().put(PORT_ROUTING_TABLE_PROP_KEY, new String[]{remote_host, ".*@" + remote_host, ".*\\." + remote_host});
                String id = UUID.randomUUID().toString();
                service.getSessionData().put("sessionID", id);
                this.updateConnectionDetails(service.getSessionData());
                return new String[]{"<stream:stream xmlns='tigase:cluster' xmlns:stream='http://etherx.jabber.org/streams' from='" + this.getDefHostName() + "' to='" + remote_host + "' id='" + id + "'>"};
            }
        }
        return null;
    }

    @Override
    public String getDiscoCategoryType() {
        return this.identity_type;
    }

    @Override
    public String getDiscoDescription() {
        return "Cluster connection manager";
    }

    @Override
    public void getStatistics(StatisticsList list) {
        super.getStatistics(list);
        list.add(this.getName(), "Total disconnects", this.totalNodeDisconnects, Level.FINE);
        list.add(this.getName(), "Service connected time-outs", this.servConnectedTimeouts, Level.FINE);
        list.add(this.getName(), "Last day disconnects", Arrays.toString(this.lastDay), Level.FINE);
        list.add(this.getName(), "Last hour disconnects", Arrays.toString(this.lastHour), Level.FINE);
        this.ioStatsGetter.reset();
        this.doForAllServices(this.ioStatsGetter);
        list.add(this.getName(), "Average compression ratio", this.ioStatsGetter.getAverageCompressionRatio(), Level.FINE);
        list.add(this.getName(), "Average decompression ratio", this.ioStatsGetter.getAverageDecompressionRatio(), Level.FINE);
        list.add(this.getName(), "Waiting to send", this.ioStatsGetter.getWaitingToSend(), Level.FINE);
        list.add(this.getName(), "Max daily cluster nodes count in last month", this.maxNodes, Level.INFO);
        list.add(this.getName(), "Max nodes count within last week", this.maxNodesWithinLastWeek, Level.INFO);
        if (!list.checkLevel(Level.INFO) && this.getNodesConnected().size() > 0) {
            list.add(this.getName(), "Known cluster nodes", this.getNodesConnected().size(), Level.INFO);
        }
    }

    @Override
    public void setClusterController(ClusterControllerIfc cl_controller) {
        super.setClusterController(cl_controller);
        this.clusterController = cl_controller;
        this.clusterController.removeCommandListener(this.sendPacket);
        this.clusterController.setCommandListener(this.sendPacket);
    }

    @Override
    public void start() {
        super.start();
        if (this.clusterEventHandler == null) {
            this.clusterEventHandler = event -> {
                if (log.isLoggable(Level.FINE)) {
                    log.log(Level.FINE, "Setting initialClusterConnectedDone to true (was: {0})", this.initialClusterConnectedDone);
                }
                this.initialClusterConnectedDone = true;
                this.eventBus.removeListener(this.clusterEventHandler);
            };
        }
        this.eventBus.addListener(ClusterInitializedEvent.class, this.clusterEventHandler);
    }

    @Override
    public void stop() {
        super.stop();
        this.eventBus.removeListener(this.clusterEventHandler);
        this.clusterEventHandler = null;
    }

    boolean isInitialClusterConnectedDone() {
        return this.initialClusterConnectedDone;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void serviceConnected(XMPPIOService<Object> serv) {
        String[] routings = (String[])serv.getSessionData().get(PORT_ROUTING_TABLE_PROP_KEY);
        String addr = (String)serv.getSessionData().get("remote-host");
        ClusterConnection conns = this.connectionsPool.get(addr);
        if (conns == null) {
            conns = new ClusterConnection(addr);
            this.connectionsPool.put(addr, conns);
        }
        ClusterConnection clusterConnection = conns;
        synchronized (clusterConnection) {
            int size = conns.size();
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "New service connected: size = {0} / connectionsPool={1} / serv={2} / conns={3}, connsSize={4}", new Object[]{size, this.connectionsPool, serv, conns, conns.size()});
            }
            serv.setUserJid((String)serv.getSessionData().get("remote-host"));
            conns.addConn(serv);
            if (size == 0 && conns.size() > 0) {
                this.updateRoutings(routings, true);
                log.log(Level.INFO, "Connected to: {0}", addr);
                this.updateServiceDiscoveryItem(addr, addr, "tigase:cluster connected", true);
                this.clusterController.nodeConnected(addr);
            }
        }
        try {
            int connectedSize = this.getNodesConnected().size();
            int repoSize = this.repo.allItems().size();
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "All repo nodes connected! Connected: {0}, repo size: {1}, initialClusterConnectedDone: {2}", new Object[]{connectedSize, repoSize, this.initialClusterConnectedDone});
            }
            ClusterConnectionManager clusterConnectionManager = this;
            synchronized (clusterConnectionManager) {
                if (!this.initialClusterConnectedDone && (repoSize <= 1 || repoSize > 1 && connectedSize >= repoSize - 1)) {
                    this.initialClusterConnectedDone = true;
                    this.eventBus.fire(new ClusterInitializedEvent());
                }
            }
        }
        catch (TigaseDBException e) {
            log.log(Level.WARNING, "There was an error while reading size of cluster repository", e);
        }
        super.serviceConnected(serv);
    }

    @Override
    protected boolean writePacketToSocket(Packet p) {
        String ip = p.getTo().getDomain();
        ClusterConnection conns = this.connectionsPool.get(ip);
        XMPPIOService<Object> serv = this.connectionSelector.selectConnection(p, conns);
        if (serv != null) {
            return super.writePacketToSocket(serv, p);
        }
        log.log(Level.WARNING, "No cluster connection to send a packet: {0}", p);
        return false;
    }

    @Override
    protected int[] getDefPlainPorts() {
        if (this.repo == null) {
            return new int[]{5277};
        }
        ClusterRepoItem item = this.repo.getItem(this.getDefHostName().getDomain());
        return new int[]{item.getPortNo()};
    }

    @Override
    protected String getDefTrafficThrottling() {
        return "xmpp:25m:0:disc,bin:20000m:0:disc";
    }

    @Override
    protected long getMaxInactiveTime() {
        return 180000L;
    }

    @Override
    protected Integer getMaxQueueSize(int def) {
        return def * 10;
    }

    @Override
    protected Map<String, Object> getParamsForPort(int port) {
        LinkedHashMap<String, Object> defs = new LinkedHashMap<String, Object>(10);
        defs.put("type", (Object)ConnectionType.accept);
        defs.put("socket", (Object)SocketType.plain);
        defs.put("ifc", this.PORT_IFC_PROP_VAL);
        return defs;
    }

    @Override
    protected XMPPIOService<Object> getXMPPIOServiceInstance() {
        return new XMPPIOService<Object>();
    }

    @Override
    protected boolean isHighThroughput() {
        return true;
    }

    private void sendEvent(REPO_ITEM_UPDATE_TYPE action, ClusterRepoItem item) {
        if (this.eventBus == null || item == null) {
            return;
        }
        ClusterRepoItemEvent event = new ClusterRepoItemEvent(item, action);
        this.eventBus.fire(event);
    }

    private void processHandshake(Packet p, XMPPIOService<Object> serv) {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Processing handshake: packet={0} / service={1} / sessionData={2}", new Object[]{p, serv, serv.getSessionData()});
        }
        String serv_addr = (String)serv.getSessionData().get("remote-host");
        try {
            InetAddress addr = InetAddress.getByName(serv_addr);
            if (addr.isAnyLocalAddress() || addr.isLoopbackAddress() || NetworkInterface.getByInetAddress(addr) != null) {
                log.log(Level.WARNING, "Cluster handshake received from this instance, terminating: {0}", serv_addr);
                serv.stop();
                return;
            }
        }
        catch (Exception ex) {
            log.log(Level.WARNING, "Cluster handshake received from this instance, terminating: " + serv_addr, ex);
            serv.stop();
        }
        switch (serv.connectionType()) {
            case connect: {
                String data = p.getElemCData();
                if (data == null) {
                    this.serviceConnected(serv);
                    break;
                }
                log.log(Level.WARNING, "Incorrect packet received: {0}", p);
                break;
            }
            case accept: {
                String digest = p.getElemCData();
                String id = (String)serv.getSessionData().get("sessionID");
                String secret = (String)serv.getSessionData().get(SECRET_PROP_KEY);
                try {
                    String loc_digest = Algorithms.hexDigest((String)id, (String)secret, (String)"SHA");
                    if (log.isLoggable(Level.FINEST)) {
                        log.log(Level.FINEST, "Calculating digest: secret={0}, digest={1}, loc_digest={2}, sessionData={3}", new Object[]{secret, digest, loc_digest, serv.getSessionData()});
                    }
                    if (digest != null && digest.equals(loc_digest)) {
                        Packet resp = Packet.packetInstance(new Element("handshake"), null, null);
                        this.writePacketToSocket(serv, resp);
                        this.serviceConnected(serv);
                        break;
                    }
                    if (secret == null) {
                        log.log(Level.WARNING, "Remote hostname not found in local configuration or time difference between cluster nodes is too big. Connection not accepted: {0}", serv);
                        if (log.isLoggable(Level.FINEST)) {
                            log.log(Level.FINEST, "Remote hostname not found in local configuration or time difference between cluster nodes is too big. Connection not accepted! Remote host: {0}, sessionData: {1}, repoItem: {2}, service: {3}", new Object[]{serv_addr, serv.getSessionData(), this.repo.getItem(serv_addr), serv});
                        }
                    } else {
                        log.log(Level.WARNING, "Handshaking password doesn''t match, disconnecting: {0}", serv);
                        if (log.isLoggable(Level.FINEST)) {
                            log.log(Level.WARNING, "Handshaking password doesn''t match, disconnecting! Remote host: {0}, sessionData: {1}, repoItem: {2}, service: {3}", new Object[]{serv_addr, serv.getSessionData(), this.repo.getItem(serv_addr), serv});
                        }
                    }
                    serv.stop();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Handshaking error.", e);
                }
                break;
            }
        }
    }

    private void updateRoutings(String[] routings, boolean add) {
        if (add) {
            for (String route : routings) {
                try {
                    this.addRegexRouting(route);
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Can not add regex routing ''{0}'' : {1}", new Object[]{route, e});
                }
            }
        } else {
            for (String route : routings) {
                try {
                    this.removeRegexRouting(route);
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Can not remove regex routing ''{0}'' : {1}", new Object[]{route, e});
                }
            }
        }
    }

    protected class Watchdog
    extends ConnectionManager.Watchdog {
        protected Watchdog() {
            super(ClusterConnectionManager.this);
        }

        @Override
        protected long getDurationSinceLastTransfer(XMPPIOService service) {
            Long lastTransfer = (Long)service.getSessionData().get("lastConnectivityCheck");
            if (lastTransfer == null) {
                service.getSessionData().put("lastConnectivityCheck", System.currentTimeMillis() - ClusterConnectionManager.this.watchdogTimeout);
                return ClusterConnectionManager.this.watchdogTimeout;
            }
            return System.currentTimeMillis() - lastTransfer;
        }
    }

    private class SendPacket
    extends CommandListenerAbstract {
        private SendPacket(String name) {
            super(name, null);
        }

        @Override
        public void executeCommand(JID fromNode, Set<JID> visitedNodes, Map<String, String> data, Queue<Element> packets) throws ClusterCommandException {
            if (log.isLoggable(Level.FINEST)) {
                log.log(Level.FINEST, "Called fromNode: {0}, visitedNodes: {1}, data: {2}, packets: {3}", new Object[]{fromNode, visitedNodes, data, packets});
            }
            for (Element element : packets) {
                try {
                    ClusterConnectionManager.this.addPacketNB(Packet.packetInstance(element));
                }
                catch (TigaseStringprepException ex) {
                    log.log(Level.WARNING, "Stringprep exception for packet: {0}", element);
                }
            }
        }
    }

    private class IOServiceStatisticsGetter
    implements ServiceChecker<XMPPIOService<Object>> {
        private int clIOQueue = 0;
        private float compressionRatio = 0.0f;
        private int counter = 0;
        private float decompressionRatio = 0.0f;
        private StatisticsList list = new StatisticsList(Level.ALL);

        private IOServiceStatisticsGetter() {
        }

        @Override
        public void check(XMPPIOService<Object> service) {
            service.getStatistics(this.list, true);
            this.compressionRatio += this.list.getValue("zlibio", "Average compression rate", -1.0f);
            this.decompressionRatio += this.list.getValue("zlibio", "Average decompression rate", -1.0f);
            ++this.counter;
            this.clIOQueue += service.waitingToSendSize();
        }

        public void reset() {
            this.clIOQueue = 0;
            this.counter = 0;
            this.compressionRatio = 0.0f;
            this.decompressionRatio = 0.0f;
        }

        public float getAverageCompressionRatio() {
            return this.compressionRatio / (float)this.counter;
        }

        public float getAverageDecompressionRatio() {
            return this.decompressionRatio / (float)this.counter;
        }

        public int getWaitingToSend() {
            return this.clIOQueue;
        }
    }

    @Bean(name="clConRepositoryBean", parent=ClusterConnectionManager.class, active=true)
    public static class DefClConRepositoryBean
    extends AbstractSDComponentRepositoryBean<ClusterRepoItem> {
        private static DataSourceHelper.Matcher matcher = clazz -> ReflectionHelper.classMatchesClassWithParameters(clazz, ComponentRepositoryDataSourceAware.class, new Type[]{ClusterRepoItem.class, DataSource.class});
        private ComponentRepository<ClusterRepoItem> repo = null;

        @Override
        protected Class<? extends ComponentRepositoryDataSourceAware<ClusterRepoItem, DataSource>> findClassForDataSource(DataSource dataSource) throws DBInitException {
            Class<ComponentRepository> cls = DataSourceHelper.getDefaultClass(ComponentRepository.class, dataSource.getResourceUri(), matcher);
            return cls;
        }
    }

    public static class ClusterInitializedEvent
    implements EventBusEvent {
    }

    public static enum REPO_ITEM_UPDATE_TYPE {
        ADDED,
        UPDATED,
        REMOVED;

    }
}

