2013. június 28., péntek

CDI event + JMS - szinkron és aszinkron események


Kétféle eseményt különböztetünk meg aszerint, milyen időben (pontosabb Threadben) futhat le:
  • szinkron (a hívó threadben fut),
  • aszinkron (nem a hívó threadben fut).
A CDI eseménykezelése segítségével valósítjuk meg mindkét fajtát.

Szinkron esemény

CDI event fire -> CDI event handle with @Observe
class Producer{
   
    @Inject @InForeground
    Event<MyEvent> myEventToBeFiredInSync;

...
    void myMethod(){
        ...
        //do the business
        myEventToBeFiredInSync.fire(new MyEvent());
        ...
    }
}

class Consumer{

...
    void consumeAndHandleEvent(@Observe @InForeground MyEvent myEvent){
        ...
        //do the business
        workWith(myEvent);
        ...
    }
}
Például:
TransactionEvent -> automatikus history és üzenetkezelés az ügylethez

Aszinkron esemény

Ez JMS messaging segítségével van megvalósítva. A keletkezés helyén azonban lecsatoljuk a JMS üzenetet generáló logikát az üzleti részről, és CDI esemény segítségével pukkan el az aszinkron üzenet is.
CDI event fire -> JMS Producer @Observe and send message -> JMS consumes message and CDI event fire -> CDI event handle with @Observe
class Producer{

    @Inject @InBackground
    Event<MyEvent> myEventToBeFiredAsync;

...
    void myMethod(){
        ...
        //do the business
        myEventToBeFiredInSync.fire(new MyEvent());
        ...
    }
}

class Consumer{

...
    void consumeAndHandleEvent(@Observe @InForeground MyEvent myEvent){
        ...
        //do the business
        workWith(myEvent);
        ...
    }
}
A Consumer nem tudja megenni az eseményt, hiszen az @InForeground eventet vár. Itt jön a JMS:
class JMSMessageProducer{

...
    @Inject
    private QueueSession session;

    @Inject
    private Queue queue;

    public void produceJMSMessage(@Observes @InBackground MyEvent myEvent) {

        try {
            QueueSender sender = session.createSender(queue);
            Message message = session.createMessage();
            message.setObjectProperty("myEvent", myEvent);
            sender.send(message);
        } catch (JMSException e) {
            logger.error("Error sending message to queue", e);
        }
    }
}

@FancyAnnotations
class ConsumerMDB{

...
    @Inject @InForeground
    Event<MyEvent> myEventToBeFiredInSync;
    

    @Override
    public void onMessage(Message rcvMessage) {
        try {
            MyEvent myEvent = (MyEvent)rcvMessage.getObjectProperty("myEvent");
            myEventToBeFiredInSync.fire(myEvent);
        } catch (JMSException e) {
            logger.error("Incorrect message format.", e);
        }
    }
}
Tehát gyakorlatilag visszatranszformálja a JMS üzenetet CDI eseménnyé.
Az előny, hogy az üzleti kódban kizárólag CDI eseményeket találunk, és a szinkron-aszinkron események között egy annotáció lecserélésével tudunk váltani, akár programmatikus módon.
Például:
A kérelem benyújtása után aszinkron módon készítjük fel az ügyletet az alapítvány oldali befogadásra.

JMS erőforrások megadása CDI segítségével

package hu.avhga.business.common.resource;

import javax.annotation.Resource;
import javax.enterprise.inject.Disposes;
import javax.enterprise.inject.Produces;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;

/**
 * JMS erőforrások előállítására és lezárására használt osztály
 *
 * @author bnemeth
 *
 */
public class JMSResources {

    @Resource(mappedName = "java:/JmsXA")
    private QueueConnectionFactory connectionFactory;

    @Produces @AdmissionQueue @Resource(mappedName = "java:jboss/exported/jms/queue/transactionAdmissionMDBQueue")
    private Queue admissionQueue;

    @Produces @AssessmentQueue
    @Resource(mappedName = "java:jboss/exported/jms/queue/transactionAssessmentMDBQueue")
    private Queue assessmentQueue;

    @Produces @AssessmentQueue
    public QueueSession createAssessmentSession(@AssessmentQueue QueueConnection conn) throws JMSException {
        return conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    @Produces @AssessmentQueue
    public QueueConnection createAssessmentConnection() throws JMSException {
        return connectionFactory.createQueueConnection();
    }

    public void closeAssessmentSession(@Disposes @AssessmentQueue QueueConnection conn) throws JMSException {
        conn.close();
    }

    public void closeAssessmentSession(@Disposes @AssessmentQueue QueueSession session) throws JMSException {
        session.close();
    }

    @Produces @AdmissionQueue
    public QueueConnection createOrderConnection() throws JMSException {
        return connectionFactory.createQueueConnection();
    }

    @Produces @AdmissionQueue
    public QueueSession createOrderSession(@AdmissionQueue QueueConnection conn) throws JMSException {
        return conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void closeOrderSession(@Disposes @AdmissionQueue QueueConnection conn) throws JMSException {
        conn.close();
    }

    public void closeOrderSession(@Disposes @AdmissionQueue QueueSession session) throws JMSException {
        session.close();
    }
}

A fenti osztály lehetővé teszi megadott Queue és QueueSession injektálását, továbbá nem kell explicit módon lezárni sem az erőforrásokat, erre való a @Disposes annotáció.

Enter labels to add to this page:
Please wait 
Looking for a label? Just start typing.