/*
 * Decompiled with CFR 0.152.
 */
package tigase.net;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import tigase.net.IOService;

public class SocketReadThread
implements Runnable {
    private static final Logger log = Logger.getLogger("tigase.net.SocketReadThread");
    public static final int DEF_MAX_THREADS_PER_CPU = 5;
    private static SocketReadThread socketReadThread = null;
    private boolean stopping = false;
    private final ConcurrentLinkedQueue<IOService> waiting = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<IOService> for_removal = new ConcurrentLinkedQueue();
    private Selector clientsSel = null;
    private ThreadPoolExecutor executor = null;
    private CompletionService<IOService> completionService = null;

    private SocketReadThread() {
        try {
            this.clientsSel = Selector.open();
            int cpus = Runtime.getRuntime().availableProcessors();
            int nThreads = cpus * 5;
            this.executor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
            this.completionService = new ExecutorCompletionService<IOService>(this.executor);
        }
        catch (Exception e) {
            log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
            this.stopping = true;
        }
        new ResultsListener().start();
    }

    public static SocketReadThread getInstance() {
        if (socketReadThread == null) {
            socketReadThread = new SocketReadThread();
            Thread thrd = new Thread(socketReadThread);
            thrd.setName("SocketReadThread");
            thrd.start();
            log.fine("SocketReadThread started.");
        }
        return socketReadThread;
    }

    public void setMaxThreadPerCPU(int threads) {
        int cpus = Runtime.getRuntime().availableProcessors();
        this.setMaxThread(threads * cpus);
    }

    public void setMaxThread(int threads) {
        this.executor.setCorePoolSize(threads);
        this.executor.setMaximumPoolSize(threads);
    }

    public void addSocketService(IOService s) {
        this.waiting.offer(s);
        this.clientsSel.wakeup();
    }

    public void removeSocketService(IOService s) {
        SelectionKey key = s.getSocketChannel().keyFor(this.clientsSel);
        if (key != null && key.attachment() == s) {
            key.cancel();
        }
    }

    private void addAllWaiting() throws IOException {
        IOService s = null;
        while ((s = this.waiting.poll()) != null) {
            SocketChannel sc = s.getSocketChannel();
            try {
                sc.register(this.clientsSel, 1, s);
                log.finest("ADDED: " + s.getUniqueId());
            }
            catch (Exception e) {
                log.finest("ERROR adding channel for: " + s.getUniqueId() + ", exception: " + e);
            }
        }
    }

    @Override
    public void run() {
        while (!this.stopping) {
            try {
                this.clientsSel.select();
                Iterator<SelectionKey> i = this.clientsSel.selectedKeys().iterator();
                while (i.hasNext()) {
                    SelectionKey sk = i.next();
                    i.remove();
                    IOService s = (IOService)sk.attachment();
                    sk.cancel();
                    log.finest("AWAKEN: " + s.getUniqueId());
                    this.completionService.submit(s);
                }
                this.clientsSel.selectNow();
                this.addAllWaiting();
            }
            catch (Exception e) {
                log.log(Level.SEVERE, "Server I/O error, can't continue my work.", e);
                this.stopping = true;
            }
        }
        System.err.println("ClientThread stopped!");
        System.exit(2);
    }

    protected class ResultsListener
    extends Thread {
        public ResultsListener() {
            this.setName("SocketReadThread$ResultsListener");
        }

        @Override
        public void run() {
            while (true) {
                try {
                    while (true) {
                        IOService service;
                        if ((service = (IOService)SocketReadThread.this.completionService.take().get()).isConnected()) {
                            log.finest("COMPLETED: " + service.getUniqueId());
                            SocketReadThread.this.addSocketService(service);
                            continue;
                        }
                        log.finest("REMOVED: " + service.getUniqueId());
                    }
                }
                catch (ExecutionException e) {
                    log.log(Level.WARNING, "Protocol execution exception.", e);
                    continue;
                }
                catch (InterruptedException e) {
                    log.log(Level.WARNING, "Protocol execution interrupted.", e);
                    continue;
                }
                catch (Exception e) {
                    log.log(Level.WARNING, "Protocol execution unknown exception.", e);
                    continue;
                }
                break;
            }
        }
    }
}

