/*
 * Decompiled with CFR 0.152.
 */
package tigase.net;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Comparator;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import tigase.net.IOService;

public class SocketReadThread
implements Runnable {
    private static final Logger log = Logger.getLogger("tigase.net.SocketReadThread");
    public static final int DEF_MAX_THREADS_PER_CPU = 5;
    private static final int MAX_EMPTY_SELECTIONS = 10;
    private static SocketReadThread[] socketReadThread = null;
    private static int cpus = Runtime.getRuntime().availableProcessors();
    private boolean stopping = false;
    private int empty_selections = 0;
    private final ConcurrentSkipListSet<IOService> waiting = new ConcurrentSkipListSet<IOService>(new IOServiceComparator());
    private ConcurrentSkipListSet<IOService> forCompletion = new ConcurrentSkipListSet<IOService>(new IOServiceComparator());
    private Selector clientsSel = null;
    private static ThreadPoolExecutor executor = null;
    private static CompletionService<IOService> completionService = null;
    private static int threadNo = 0;
    private static final int READ_ONLY = 1;
    private static final int READ_WRITE = 5;

    private SocketReadThread() {
        try {
            this.clientsSel = Selector.open();
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
            this.stopping = true;
        }
        new ResultsListener().start();
    }

    public static SocketReadThread getInstance() {
        if (socketReadThread == null) {
            int nThreads = cpus * 5;
            executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            completionService = new ExecutorCompletionService<IOService>(executor);
            socketReadThread = new SocketReadThread[cpus];
            for (int i = 0; i < socketReadThread.length; ++i) {
                SocketReadThread.socketReadThread[i] = new SocketReadThread();
                Thread thrd = new Thread(socketReadThread[i]);
                thrd.setName("SocketReadThread_CPU-" + i);
                thrd.start();
            }
            log.warning("" + socketReadThread.length + " SocketReadThreads started.");
        }
        return socketReadThread[0];
    }

    public void setMaxThreadPerCPU(int threads) {
        this.setMaxThread(threads * cpus);
    }

    public void setMaxThread(int threads) {
        executor.setCorePoolSize(threads);
        executor.setMaximumPoolSize(threads);
    }

    public void addSocketService(IOService s) {
        socketReadThread[s.hashCode() % socketReadThread.length].addSocketServicePriv(s);
    }

    public void addSocketServicePriv(IOService s) {
        this.waiting.add(s);
        this.clientsSel.wakeup();
    }

    public void removeSocketService(IOService s) {
        SelectionKey key = s.getSocketChannel().keyFor(this.clientsSel);
        if (key != null && key.attachment() == s) {
            key.cancel();
        }
    }

    private void addAllWaiting() throws IOException {
        IOService s = null;
        while ((s = this.waiting.pollFirst()) != null) {
            SocketChannel sc = s.getSocketChannel();
            try {
                if (sc.isConnected()) {
                    int sel_key = 1;
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest("ADDED OP_READ: " + s.getUniqueId());
                    }
                    if (s.waitingToSend()) {
                        sel_key = 5;
                        if (log.isLoggable(Level.FINEST)) {
                            log.finest("ADDED OP_WRITE: " + s.getUniqueId());
                        }
                    }
                    sc.register(this.clientsSel, sel_key, s);
                    continue;
                }
                if (log.isLoggable(Level.FINEST)) {
                    log.finest("Socket not connected: " + s.getUniqueId());
                }
                try {
                    if (log.isLoggable(Level.FINER)) {
                        log.finer("Forcing stopping the service: " + s.getUniqueId());
                    }
                    s.forceStop();
                }
                catch (Exception e) {
                    if (!log.isLoggable(Level.FINEST)) continue;
                    log.log(Level.FINEST, "Exception while stopping service: " + s.getUniqueId(), e);
                }
            }
            catch (Exception e) {
                if (log.isLoggable(Level.FINER)) {
                    log.log(Level.FINER, "Forcing stopping the service: " + s.getUniqueId(), e);
                }
                try {
                    s.forceStop();
                }
                catch (Exception ez) {
                    if (!log.isLoggable(Level.FINEST)) continue;
                    log.log(Level.FINEST, "Exception while stopping service: " + s.getUniqueId(), ez);
                }
            }
        }
    }

    private synchronized void recreateSelector() throws IOException {
        if (log.isLoggable(Level.FINEST)) {
            log.finest("Recreating selector, opened channels: " + this.clientsSel.keys().size());
        }
        this.empty_selections = 0;
        boolean cancelled = false;
        for (SelectionKey sk : this.clientsSel.keys()) {
            IOService serv = (IOService)sk.attachment();
            SocketChannel sc = serv.getSocketChannel();
            if (sc != null && sc.isConnected()) continue;
            cancelled = true;
            sk.cancel();
            try {
                log.info("Forcing stopping the service: " + serv.getUniqueId());
                serv.forceStop();
            }
            catch (Exception e) {}
        }
        if (cancelled) {
            this.clientsSel.selectNow();
        } else {
            Selector tempSel = this.clientsSel;
            this.clientsSel = Selector.open();
            for (SelectionKey sk : tempSel.keys()) {
                IOService serv = (IOService)sk.attachment();
                sk.cancel();
                this.waiting.add(serv);
            }
            tempSel.close();
        }
    }

    @Override
    public void run() {
        while (!this.stopping) {
            try {
                this.clientsSel.select();
                Set<SelectionKey> selected = this.clientsSel.selectedKeys();
                int selectedKeys = selected.size();
                if (selectedKeys == 0 && this.waiting.size() == 0) {
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest("Selected keys = 0!!! a bug again?");
                    }
                    if (++this.empty_selections > 10) {
                        this.recreateSelector();
                    }
                } else {
                    this.empty_selections = 0;
                    if (selectedKeys > 0) {
                        for (SelectionKey sk : selected) {
                            IOService s = (IOService)sk.attachment();
                            try {
                                if (log.isLoggable(Level.FINEST)) {
                                    StringBuilder sb = new StringBuilder("AWAKEN: " + s.getUniqueId());
                                    if (sk.isWritable()) {
                                        sb.append(", ready for WRITING");
                                    }
                                    if (sk.isReadable()) {
                                        sb.append(", ready for READING");
                                    }
                                    sb.append(", readyOps() = " + sk.readyOps());
                                    log.finest(sb.toString());
                                }
                                sk.cancel();
                                this.forCompletion.add(s);
                            }
                            catch (CancelledKeyException e) {
                                if (log.isLoggable(Level.FINEST)) {
                                    log.finest("CancelledKeyException, stopping the connection: " + s.getUniqueId());
                                }
                                try {
                                    s.forceStop();
                                }
                                catch (Exception ex2) {
                                    if (!log.isLoggable(Level.WARNING)) continue;
                                    log.warning("got exception during forceStop: " + e);
                                }
                            }
                        }
                    }
                    this.clientsSel.selectNow();
                }
                this.addAllWaiting();
                IOService serv = null;
                while ((serv = this.forCompletion.pollFirst()) != null) {
                    completionService.submit(serv);
                }
            }
            catch (CancelledKeyException brokene) {
                log.log(Level.WARNING, "Ups, broken JDK, Apple? ", brokene);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
            catch (IOException ioe) {
                log.log(Level.WARNING, "Problem with the network connection: ", ioe);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
            catch (Exception exe) {
                log.log(Level.SEVERE, "Server I/O error: ", exe);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
        }
    }

    private class IOServiceComparator
    implements Comparator<IOService> {
        private IOServiceComparator() {
        }

        @Override
        public int compare(IOService o1, IOService o2) {
            return o1.getUniqueId().compareTo(o2.getUniqueId());
        }
    }

    protected class ResultsListener
    extends Thread {
        public ResultsListener() {
            this.setName("SocketReadThread$ResultsListener");
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        IOService service;
                        if ((service = (IOService)completionService.take().get()).isConnected()) {
                            if (log.isLoggable(Level.FINEST)) {
                                log.finest("COMPLETED: " + service.getUniqueId());
                            }
                            SocketReadThread.this.addSocketService(service);
                            continue;
                        }
                        if (!log.isLoggable(Level.FINEST)) continue;
                        log.finest("REMOVED: " + service.getUniqueId());
                    }
                }
                catch (ExecutionException e) {
                    log.log(Level.WARNING, "Protocol execution exception.", e);
                    continue;
                }
                catch (InterruptedException e) {
                    log.log(Level.WARNING, "Protocol execution interrupted.", e);
                    continue;
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Protocol execution unknown exception.", e);
                    continue;
                }
                break;
            }
        }
    }
}

