/*
 * Decompiled with CFR 0.152.
 */
package tigase.net;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Comparator;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import tigase.net.IOService;

public class SocketThread
implements Runnable {
    private static final Logger log = Logger.getLogger(SocketThread.class.getName());
    public static final int DEF_MAX_THREADS_PER_CPU = 8;
    private static final int MAX_EMPTY_SELECTIONS = 10;
    private static SocketThread[] socketReadThread = null;
    private static SocketThread[] socketWriteThread = null;
    private static int cpus = Runtime.getRuntime().availableProcessors();
    private static ThreadPoolExecutor executor = null;
    private static CompletionService<IOService<?>> completionService = null;
    private Selector clientsSel = null;
    private int empty_selections = 0;
    private boolean reading = false;
    private boolean writing = false;
    private ConcurrentSkipListSet<IOService<?>> waiting = new ConcurrentSkipListSet(new IOServiceComparator());
    private boolean stopping = false;
    private ConcurrentSkipListSet<IOService<?>> forCompletion = new ConcurrentSkipListSet(new IOServiceComparator());

    private SocketThread(String name) {
        try {
            this.clientsSel = Selector.open();
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
            this.stopping = true;
        }
        new ResultsListener("ResultsListener-" + name).start();
    }

    public static void addSocketService(IOService<?> s) {
        if (s.waitingToRead()) {
            socketReadThread[s.hashCode() % socketReadThread.length].addSocketServicePriv(s);
        }
        if (s.waitingToSend()) {
            socketWriteThread[s.hashCode() % socketWriteThread.length].addSocketServicePriv(s);
        }
    }

    public static void removeSocketService(IOService<Object> s) {
        Selector clientsSel = SocketThread.socketReadThread[s.hashCode() % SocketThread.socketReadThread.length].clientsSel;
        SelectionKey key = s.getSocketChannel().keyFor(clientsSel);
        if (key != null && key.attachment() == s) {
            key.cancel();
        }
        clientsSel = SocketThread.socketWriteThread[s.hashCode() % SocketThread.socketWriteThread.length].clientsSel;
        key = s.getSocketChannel().keyFor(clientsSel);
        if (key != null && key.attachment() == s) {
            key.cancel();
        }
    }

    public void addSocketServicePriv(IOService<?> s) {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Adding to waiting: {0}", s);
        }
        this.waiting.add(s);
        this.clientsSel.wakeup();
    }

    @Override
    public void run() {
        while (!this.stopping) {
            try {
                Set<SelectionKey> selected;
                int selectedKeys;
                this.clientsSel.select();
                if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "Selector AWAKE: {0}", this.clientsSel);
                }
                if ((selectedKeys = (selected = this.clientsSel.selectedKeys()).size()) == 0 && this.waiting.size() == 0) {
                    if (log.isLoggable(Level.FINEST)) {
                        log.finest("Selected keys = 0!!! a bug again?");
                    }
                    if (++this.empty_selections > 10) {
                        this.recreateSelector();
                    }
                } else {
                    this.empty_selections = 0;
                    if (selectedKeys > 0) {
                        for (SelectionKey sk : selected) {
                            IOService s = (IOService)sk.attachment();
                            try {
                                if (log.isLoggable(Level.FINEST)) {
                                    StringBuilder sb = new StringBuilder("AWAKEN: " + s.getUniqueId());
                                    if (sk.isWritable()) {
                                        sb.append(", ready for WRITING");
                                    }
                                    if (sk.isReadable()) {
                                        sb.append(", ready for READING");
                                    }
                                    sb.append(", readyOps() = ").append(sk.readyOps());
                                    log.finest(sb.toString());
                                }
                                sk.cancel();
                                this.forCompletion.add(s);
                            }
                            catch (CancelledKeyException e) {
                                if (log.isLoggable(Level.FINEST)) {
                                    log.log(Level.FINEST, "CancelledKeyException, stopping the connection: {0}", s.getUniqueId());
                                }
                                try {
                                    s.forceStop();
                                }
                                catch (Exception ex2) {
                                    if (!log.isLoggable(Level.WARNING)) continue;
                                    log.log(Level.WARNING, "got exception during forceStop: {0}", e);
                                }
                            }
                        }
                    }
                    this.clientsSel.selectNow();
                }
                this.addAllWaiting();
                IOService<?> serv = null;
                while ((serv = this.forCompletion.pollFirst()) != null) {
                    completionService.submit(serv);
                }
            }
            catch (CancelledKeyException brokene) {
                log.log(Level.WARNING, "Ups, broken JDK, Apple? ", brokene);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
            catch (IOException ioe) {
                log.log(Level.WARNING, "Problem with the network connection: ", ioe);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
            catch (Exception exe) {
                log.log(Level.SEVERE, "Server I/O error: ", exe);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
        }
    }

    public void setMaxThread(int threads) {
        executor.setCorePoolSize(threads);
        executor.setMaximumPoolSize(threads);
    }

    public void setMaxThreadPerCPU(int threads) {
        this.setMaxThread(threads * cpus);
    }

    private void addAllWaiting() throws IOException {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "waiting.size(): {0}", this.waiting.size());
        }
        IOService<?> s = null;
        while ((s = this.waiting.pollFirst()) != null) {
            SocketChannel sc = s.getSocketChannel();
            try {
                if (sc.isConnected()) {
                    if (this.reading) {
                        sc.register(this.clientsSel, 1, s);
                        if (log.isLoggable(Level.FINEST)) {
                            log.log(Level.FINEST, "ADDED OP_READ: {0}", s.getUniqueId());
                        }
                    }
                    if (!this.writing) continue;
                    sc.register(this.clientsSel, 4, s);
                    if (!log.isLoggable(Level.FINEST)) continue;
                    log.log(Level.FINEST, "ADDED OP_WRITE: {0}", s.getUniqueId());
                    continue;
                }
                if (log.isLoggable(Level.FINEST)) {
                    log.log(Level.FINEST, "Socket not connected: {0}", s.getUniqueId());
                }
                try {
                    if (log.isLoggable(Level.FINER)) {
                        log.log(Level.FINER, "Forcing stopping the service: {0}", s.getUniqueId());
                    }
                    s.forceStop();
                }
                catch (Exception e) {
                    if (!log.isLoggable(Level.FINEST)) continue;
                    log.log(Level.FINEST, "Exception while stopping service: " + s.getUniqueId(), e);
                }
            }
            catch (Exception e) {
                if (log.isLoggable(Level.FINER)) {
                    log.log(Level.FINER, "Forcing stopping the service: " + s.getUniqueId(), e);
                }
                try {
                    s.forceStop();
                }
                catch (Exception ez) {
                    if (!log.isLoggable(Level.FINEST)) continue;
                    log.log(Level.FINEST, "Exception while stopping service: " + s.getUniqueId(), ez);
                }
            }
        }
    }

    private synchronized void recreateSelector() throws IOException {
        if (log.isLoggable(Level.FINEST)) {
            log.log(Level.FINEST, "Recreating selector, opened channels: {0}", this.clientsSel.keys().size());
        }
        this.empty_selections = 0;
        boolean cancelled = false;
        for (SelectionKey sk : this.clientsSel.keys()) {
            IOService serv = (IOService)sk.attachment();
            SocketChannel sc = serv.getSocketChannel();
            if (sc != null && sc.isConnected()) continue;
            cancelled = true;
            sk.cancel();
            try {
                log.log(Level.INFO, "Forcing stopping the service: {0}", serv.getUniqueId());
                serv.forceStop();
            }
            catch (Exception e) {}
        }
        if (cancelled) {
            this.clientsSel.selectNow();
        } else {
            Selector tempSel = this.clientsSel;
            this.clientsSel = Selector.open();
            for (SelectionKey sk : tempSel.keys()) {
                IOService serv = (IOService)sk.attachment();
                sk.cancel();
                this.waiting.add(serv);
            }
            tempSel.close();
        }
    }

    static {
        if (socketReadThread == null) {
            Thread thrd;
            int i;
            int nThreads = cpus * 8 / 2 + 1;
            executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            completionService = new ExecutorCompletionService(executor);
            socketReadThread = new SocketThread[nThreads];
            socketWriteThread = new SocketThread[nThreads];
            for (i = 0; i < socketReadThread.length; ++i) {
                SocketThread.socketReadThread[i] = new SocketThread("socketReadThread-" + i);
                SocketThread.socketReadThread[i].reading = true;
                thrd = new Thread(socketReadThread[i]);
                thrd.setName("socketReadThread-" + i);
                thrd.start();
            }
            log.log(Level.WARNING, "{0} socketReadThreads started.", socketReadThread.length);
            for (i = 0; i < socketWriteThread.length; ++i) {
                SocketThread.socketWriteThread[i] = new SocketThread("socketWriteThread-" + i);
                SocketThread.socketWriteThread[i].writing = true;
                thrd = new Thread(socketWriteThread[i]);
                thrd.setName("socketWriteThread-" + i);
                thrd.start();
            }
            log.log(Level.WARNING, "{0} socketWriteThreads started.", socketWriteThread.length);
        }
    }

    protected class ResultsListener
    extends Thread {
        public ResultsListener(String name) {
            this.setName(name);
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        IOService service;
                        if ((service = (IOService)completionService.take().get()) == null) {
                            continue;
                        }
                        if (service.isConnected()) {
                            if (log.isLoggable(Level.FINEST)) {
                                log.log(Level.FINEST, "COMPLETED: {0}", service.getUniqueId());
                            }
                            SocketThread.addSocketService(service);
                            continue;
                        }
                        if (!log.isLoggable(Level.FINEST)) continue;
                        log.log(Level.FINEST, "REMOVED: {0}", service.getUniqueId());
                    }
                }
                catch (ExecutionException e) {
                    log.log(Level.WARNING, "Protocol execution exception.", e);
                    continue;
                }
                catch (InterruptedException e) {
                    log.log(Level.WARNING, "Protocol execution interrupted.", e);
                    continue;
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Protocol execution unknown exception.", e);
                    continue;
                }
                break;
            }
        }
    }

    private class IOServiceComparator
    implements Comparator<IOService<?>> {
        private IOServiceComparator() {
        }

        @Override
        public int compare(IOService<?> o1, IOService<?> o2) {
            return o1.getUniqueId().compareTo(o2.getUniqueId());
        }
    }
}

