/*
 * Decompiled with CFR 0.152.
 */
package org.dacframe.broker;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.dacframe.Agent;
import org.dacframe.AgentBroker;
import org.dacframe.DACException;
import org.dacframe.broker.ActiveMQEmbeddedBroker;
import org.dacframe.cs.CacheServiceHM;
import org.dacframe.worker.WorkerSingleThreaded;

public class ActiveMQBroker
implements AgentBroker {
    private static Logger log = Logger.getLogger(ActiveMQBroker.class);
    private static final String AGENTS_QUEUE = "AB.AGENTS";
    private static final String RESULTS_QUEUE = "AB.RESULTS";
    private static final String RECIPIENT_PROPERTY_NAME = "identString";
    private Connection agentProducerConnection;
    private Connection agentConsumerConnection;
    private Connection resultProducerConnection;
    private Connection resultConsumerConnection;
    private Session agentProducerSession;
    private Session agentConsumerSession;
    private Session resultProducerSession;
    private Session resultConsumerSession;
    private MessageProducer agentProducer;
    private MessageProducer resultProducer;
    private MessageConsumer agentConsumer;
    private MessageConsumer resultConsumer;
    private String brokerURL;

    public ActiveMQBroker(String brokerURL) throws DACException {
        try {
            this.brokerURL = brokerURL;
            this.initJMS();
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    @Override
    public void commit() throws DACException {
        try {
            this.resultProducerSession.commit();
            this.resultConsumerSession.commit();
            this.agentProducerSession.commit();
            this.agentConsumerSession.commit();
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    @Override
    public Agent receiveAgent() throws DACException {
        try {
            Message msg = this.agentConsumer.receive();
            if (!(msg instanceof ObjectMessage)) {
                throw new DACException("Received bad agent message " + msg);
            }
            ObjectMessage omsg = (ObjectMessage)msg;
            return (Agent)omsg.getObject();
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    @Override
    public void rollback() throws DACException {
        try {
            this.resultProducerSession.rollback();
            this.resultConsumerSession.rollback();
            this.agentProducerSession.rollback();
            this.agentConsumerSession.rollback();
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    @Override
    public void startTransaction() {
    }

    @Override
    public void putResult(String addresseeIdentString, Object result) throws DACException {
        log.debug((Object)("Putting result for agent " + addresseeIdentString + "..."));
        if (!(result instanceof Serializable)) {
            throw new IllegalArgumentException("Results must be serializable!");
        }
        try {
            ObjectMessage msg = this.resultProducerSession.createObjectMessage((Serializable)result);
            msg.setStringProperty(RECIPIENT_PROPERTY_NAME, addresseeIdentString);
            if (result instanceof String) {
                msg.setStringProperty("result", (String)result);
            }
            this.resultProducer.send((Message)msg);
            log.debug((Object)"Result put");
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    private List<Object> receiveResultsImpl(String recipientIdentString, int requiredResults) throws DACException {
        ArrayList<Object> res;
        block6: {
            if (recipientIdentString == null) {
                throw new IllegalArgumentException("recipientIdentString cannot be null");
            }
            log.debug((Object)("Receiving agent results for agent " + recipientIdentString + "..."));
            res = new ArrayList<Object>();
            this.initResultConsumer(recipientIdentString);
            try {
                while (true) {
                    Message msg;
                    if ((msg = this.resultConsumer.receiveNoWait()) == null) {
                        if (requiredResults != -1) continue;
                        log.debug((Object)"No message received, returning what I've collected so far");
                        break block6;
                    }
                    ObjectMessage omsg = (ObjectMessage)msg;
                    log.debug((Object)("Consumed result message " + msg));
                    if (msg.propertyExists("result")) {
                        log.info((Object)("result: " + msg.getStringProperty("result")));
                    }
                    res.add(omsg.getObject());
                    if (requiredResults > 0 && res.size() == requiredResults) break;
                }
                log.debug((Object)"Consumed all needed results, returning");
            }
            catch (JMSException e) {
                throw new DACException(e);
            }
        }
        this.closeResultConsumer();
        return res;
    }

    @Override
    public List<Object> receiveAgentResults(String recipientIdentString) throws DACException {
        return this.receiveResultsImpl(recipientIdentString, -1);
    }

    @Override
    public List<Object> receiveAgentResults(String recipientIdentString, int requiredResults) throws DACException {
        return this.receiveResultsImpl(recipientIdentString, requiredResults);
    }

    @Override
    public ExecutorService getExecutorService() throws DACException {
        throw new RuntimeException("NOT IMPLEMENTED!");
    }

    @Override
    public void sendAgent(Agent a, int priority) throws DACException {
        this.sendAgent(a);
    }

    @Override
    public void sendAgent(Agent a) throws DACException {
        log.debug((Object)("Sending agent " + a + "..."));
        try {
            ObjectMessage msg = this.agentProducerSession.createObjectMessage((Serializable)a);
            this.agentProducer.send((Message)msg);
            log.debug((Object)"Agent sent");
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    public void cleanup() throws DACException {
        try {
            this.agentProducer.close();
            this.agentConsumer.close();
            this.resultProducer.close();
            this.closeResultConsumer();
            this.resultProducerSession.close();
            this.resultConsumerSession.close();
            this.agentProducerSession.close();
            this.agentConsumerSession.close();
            this.resultProducerConnection.close();
            this.resultConsumerConnection.close();
            this.agentProducerConnection.close();
            this.agentConsumerConnection.close();
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }

    public List<ObjectMessage> browseAllResults() throws DACException {
        try {
            ArrayList<ObjectMessage> ret = new ArrayList<ObjectMessage>();
            QueueBrowser qb = this.resultProducerSession.createBrowser((Queue)new ActiveMQQueue(RESULTS_QUEUE));
            Enumeration e = qb.getEnumeration();
            while (e.hasMoreElements()) {
                ret.add((ObjectMessage)e.nextElement());
            }
            return ret;
        }
        catch (JMSException ex) {
            throw new DACException(ex);
        }
    }

    private void initJMS() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, this.brokerURL);
        String urlForLazyInitialization = null;
        try {
            this.agentProducerConnection = connectionFactory.createConnection();
        }
        catch (JMSException jmsException) {
            urlForLazyInitialization = this.brokerURL;
            ConsoleAppender appender = new ConsoleAppender((Layout)new PatternLayout());
            log.addAppender((Appender)appender);
            log.info((Object)"Unable to connect to JMS broker - schedule lazy initialization");
            log.removeAppender((Appender)appender);
            try {
                new ActiveMQEmbeddedBroker(this.brokerURL).start();
            }
            catch (Exception e) {
                log.error((Object)"Unable to start Default Environment.", (Throwable)e);
                throw new JMSException("Unable to start Default Environment.");
            }
        }
        if (this.agentProducerConnection == null) {
            this.agentProducerConnection = connectionFactory.createConnection();
        }
        this.agentConsumerConnection = connectionFactory.createConnection();
        this.resultProducerConnection = connectionFactory.createConnection();
        this.resultConsumerConnection = connectionFactory.createConnection();
        this.agentProducerSession = this.agentProducerConnection.createSession(true, 0);
        this.agentConsumerSession = this.agentConsumerConnection.createSession(true, 0);
        this.resultProducerSession = this.resultProducerConnection.createSession(true, 0);
        this.resultConsumerSession = this.resultConsumerConnection.createSession(true, 0);
        this.agentProducer = this.agentProducerSession.createProducer((Destination)new ActiveMQQueue(AGENTS_QUEUE));
        this.resultProducer = this.resultProducerSession.createProducer((Destination)new ActiveMQQueue(RESULTS_QUEUE));
        this.agentConsumer = this.agentConsumerSession.createConsumer((Destination)new ActiveMQQueue("AB.AGENTS?consumer.prefetchSize=0"));
        this.agentProducerConnection.start();
        this.agentConsumerConnection.start();
        this.resultProducerConnection.start();
        this.resultConsumerConnection.start();
        if (urlForLazyInitialization != null) {
            WorkerSingleThreaded worker = new WorkerSingleThreaded();
            try {
                worker.setAgentBroker(new ActiveMQBroker(this.brokerURL));
                worker.setCacheService(new CacheServiceHM());
                new Thread(worker).start();
            }
            catch (DACException e) {
                log.error((Object)"Error while creating Worker for Default Environment: ", (Throwable)e);
            }
        }
    }

    private void closeResultConsumer() {
        if (this.resultConsumer != null) {
            try {
                this.resultConsumer.close();
            }
            catch (JMSException e) {
                log.error((Object)e, null);
            }
        }
    }

    private void initResultConsumer(String addresseeIdentString) throws DACException {
        String selector = String.format("%s = '%s'", RECIPIENT_PROPERTY_NAME, addresseeIdentString);
        log.debug((Object)("Using message selector: " + selector));
        this.closeResultConsumer();
        try {
            this.resultConsumer = this.resultConsumerSession.createConsumer((Destination)new ActiveMQQueue("AB.RESULTS?consumer.prefetchSize=0"), selector);
        }
        catch (JMSException e) {
            throw new DACException(e);
        }
    }
}

