/*
 * Decompiled with CFR 0.152.
 */
package tigase.net;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import tigase.net.IOService;

public class SocketReadThread
implements Runnable {
    private static final Logger log = Logger.getLogger("tigase.net.SocketReadThread");
    public static final int DEF_MAX_THREADS_PER_CPU = 5;
    private static final int MAX_EMPTY_SELECTIONS = 10;
    private static SocketReadThread socketReadThread = null;
    private boolean stopping = false;
    private boolean wakeup_called = false;
    private int empty_selections = 0;
    private final ConcurrentLinkedQueue<IOService> waiting = new ConcurrentLinkedQueue();
    private Selector clientsSel = null;
    private ThreadPoolExecutor executor = null;
    private CompletionService<IOService> completionService = null;
    private static final int READ_ONLY = 1;
    private static final int READ_WRITE = 5;

    private SocketReadThread() {
        try {
            this.clientsSel = Selector.open();
            int cpus = Runtime.getRuntime().availableProcessors();
            int nThreads = cpus * 5;
            this.executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            this.completionService = new ExecutorCompletionService<IOService>(this.executor);
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
            this.stopping = true;
        }
        new ResultsListener().start();
    }

    public static SocketReadThread getInstance() {
        if (socketReadThread == null) {
            socketReadThread = new SocketReadThread();
            Thread thrd = new Thread(socketReadThread);
            thrd.setName("SocketReadThread");
            thrd.start();
            log.fine("SocketReadThread started.");
        }
        return socketReadThread;
    }

    public void setMaxThreadPerCPU(int threads) {
        int cpus = Runtime.getRuntime().availableProcessors();
        this.setMaxThread(threads * cpus);
    }

    public void setMaxThread(int threads) {
        this.executor.setCorePoolSize(threads);
        this.executor.setMaximumPoolSize(threads);
    }

    public synchronized void addSocketService(IOService s) {
        this.waiting.offer(s);
        this.wakeup_called = true;
        this.clientsSel.wakeup();
    }

    public void removeSocketService(IOService s) {
        SelectionKey key = s.getSocketChannel().keyFor(this.clientsSel);
        if (key != null && key.attachment() == s) {
            key.cancel();
        }
    }

    private void addAllWaiting() throws IOException {
        IOService s = null;
        while ((s = this.waiting.poll()) != null) {
            SocketChannel sc = s.getSocketChannel();
            try {
                int sel_key = 1;
                log.finest("ADDED OP_READ: " + s.getUniqueId());
                if (s.waitingToSend()) {
                    sel_key = 5;
                    log.finest("ADDED OP_WRITE: " + s.getUniqueId());
                }
                sc.register(this.clientsSel, sel_key, s);
            }
            catch (Exception e) {
                log.finest("ERROR adding channel for: " + s.getUniqueId() + ", exception: " + e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized void recreateSelector() throws IOException {
        log.finest("Recreating selector, opened channels: " + this.clientsSel.keys().size());
        this.empty_selections = 0;
        SocketReadThread socketReadThread = this;
        synchronized (socketReadThread) {
            Selector tempSel = this.clientsSel;
            this.clientsSel = Selector.open();
            for (SelectionKey sk : tempSel.keys()) {
                IOService serv = (IOService)sk.attachment();
                sk.cancel();
                if (serv.isConnected()) {
                    SocketChannel sc = serv.getSocketChannel();
                    try {
                        int sel_key = 1;
                        log.finest("ADDED OP_READ: " + serv.getUniqueId());
                        if (serv.waitingToSend()) {
                            sel_key = 5;
                            log.finest("ADDED OP_WRITE: " + serv.getUniqueId());
                        }
                        sc.register(this.clientsSel, sel_key, serv);
                    }
                    catch (Exception e) {
                        log.finest("ERROR re-adding channel for: " + serv.getUniqueId() + ", exception: " + e);
                    }
                    continue;
                }
                try {
                    log.info("Forcing stopping the service: " + serv.getUniqueId());
                    serv.forceStop();
                }
                catch (Exception e) {}
            }
            tempSel.close();
        }
    }

    @Override
    public void run() {
        while (!this.stopping) {
            try {
                this.wakeup_called = false;
                int selectedKeys = this.clientsSel.select();
                if (selectedKeys == 0 && !this.wakeup_called) {
                    log.finest("Selected keys = 0!!! a bug again?");
                    if (++this.empty_selections > 10) {
                        this.recreateSelector();
                    }
                } else {
                    this.empty_selections = 0;
                    if (selectedKeys > 0) {
                        for (SelectionKey sk : this.clientsSel.selectedKeys()) {
                            IOService s = (IOService)sk.attachment();
                            try {
                                if (log.isLoggable(Level.FINEST)) {
                                    StringBuilder sb = new StringBuilder("AWAKEN: " + s.getUniqueId());
                                    if (sk.isWritable()) {
                                        sb.append(", ready for WRITING");
                                    }
                                    if (sk.isReadable()) {
                                        sb.append(", ready for READING");
                                    }
                                    sb.append(", readyOps() = " + sk.readyOps());
                                    log.finest(sb.toString());
                                }
                                sk.cancel();
                                this.completionService.submit(s);
                            }
                            catch (CancelledKeyException e) {
                                log.finest("CancelledKeyException, stopping the connection: " + s.getUniqueId());
                                try {
                                    s.forceStop();
                                }
                                catch (Exception ex2) {}
                            }
                        }
                    }
                    this.clientsSel.selectNow();
                }
                this.addAllWaiting();
            }
            catch (CancelledKeyException brokene) {
                log.log(Level.WARNING, "Ups, broken JDK, Apple? ", brokene);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
            catch (IOException ioe) {
                log.log(Level.WARNING, "Problem with the network connection: ", ioe);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
            catch (Exception exe) {
                log.log(Level.SEVERE, "Server I/O error: ", exe);
                try {
                    this.recreateSelector();
                }
                catch (Exception e) {
                    log.log(Level.SEVERE, "Serious problem, can't recreate selector: ", e);
                }
            }
        }
    }

    protected class ResultsListener
    extends Thread {
        public ResultsListener() {
            this.setName("SocketReadThread$ResultsListener");
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        IOService service;
                        if ((service = (IOService)SocketReadThread.this.completionService.take().get()).isConnected()) {
                            log.finest("COMPLETED: " + service.getUniqueId());
                            SocketReadThread.this.addSocketService(service);
                            continue;
                        }
                        log.finest("REMOVED: " + service.getUniqueId());
                    }
                }
                catch (ExecutionException e) {
                    log.log(Level.WARNING, "Protocol execution exception.", e);
                    continue;
                }
                catch (InterruptedException e) {
                    log.log(Level.WARNING, "Protocol execution interrupted.", e);
                    continue;
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Protocol execution unknown exception.", e);
                    continue;
                }
                break;
            }
        }
    }
}

