6.4. 在J2EE应用中使用Apuisc MQ

这部分主要描述在J2EE 应用中使用MQ的场景和用法。主要包括:

使用 Session 和 Entity Bean来发送和同步接收消息

使用 message-driven beans 异步的接收信息

管理分布式事务

使用普通的JMS API(在普通应用和Web应用中)

6.4.1. 使用 Session 和 Entity Bean来发送和接收消息

J2EE应用可以使用 Session和Entity Bean 来发送消息,同步接收消息

Session bean使用JMS API

EJB消息处理

图 6.2. EJB消息处理


  • 编写 client

package client;

import javax.ejb.EJBHome;
import javax.naming.*;
import javax.rmi.PortableRemoteObject;
import javax.jms.*;
import sb.*;


/**
 * The MyAppClient class is the client program for this J2EE
 * application.  It obtains a reference to the home interface
 * of the Publisher enterprise bean and creates an instance of
 * the bean.  After calling the publisher's publishNews method
 * twice, it removes the bean.
 */
public class MyAppClient {
    public static void main(String[] args) {
        MyAppClient client = new MyAppClient();
        client.doTest();
        System.exit(0);
    }

    public void doTest() {
        try {
            Context ic = new InitialContext();
            System.out.println("Looking up EJB reference");
            java.lang.Object objref =
                ic.lookup("java:comp/env/ejb/remote/Publisher");
            System.err.println("Looked up home");
            PublisherHome pubHome =
                (PublisherHome) PortableRemoteObject.narrow(objref,
                    PublisherHome.class);
            System.err.println("Narrowed home");
            /*
             * Create bean instance, invoke business method
             * twice, and remove bean instance.
             */
            PublisherRemote phr = pubHome.create();
            System.err.println("Got the EJB");
            phr.publishNews();
            phr.publishNews();
            phr.remove();
            System.out.println("To view the bean output,");
            System.out.println(
                " check <install_dir>/domains/domain1/logs/server.log.");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

  • 编写发送消息的 Session Bean

对于 JNDI的名字需要按照实际的名字进行对应

编写Home接口

package sb;

import java.rmi.RemoteException;
import javax.ejb.EJBHome;
import javax.ejb.CreateException;

/**
 * Home interface for Publisher enterprise bean.
 */
public interface PublisherHome extends EJBHome {
    PublisherRemote create() throws RemoteException, CreateException;
}

编写remote接口

package sb;

import javax.ejb.*;
import java.rmi.RemoteException;


/**
 * Remote interface for Publisher enterprise bean. Declares one
 * business method.
 */
public interface PublisherRemote extends EJBObject {
    void publishNews() throws RemoteException;
}

编写EJB object

package sb;

import java.rmi.RemoteException;
import java.util.*;
import java.util.logging.*;
import javax.ejb.*;
import javax.naming.*;
import javax.jms.*;


/**
 * Bean class for Publisher enterprise bean. Defines publishNews
 * business method as well as required methods for a stateless
 * session bean.
 */
public class PublisherBean implements SessionBean {
    final static String[] messageTypes =
    {
        "Nation/World", "Metro/Region", "Business", "Sports", "Living/Arts",
        "Opinion"
    };
    static final Logger logger = Logger.getLogger("PublisherBean");
    SessionContext sc = null;
    Connection connection = null;
    Topic topic = null;

    public PublisherBean() {
        logger.info("In PublisherBean() (constructor)");
    }

    /**
     * Sets the associated session context. The container calls
     * this method after the instance creation.
     *
     * @param sc    the context to set
     */
    public void setSessionContext(SessionContext sc) {
        this.sc = sc;
    }

    /**
     * Instantiates the enterprise bean.  Creates the
     * connection and looks up the topic.
     */
    public void ejbCreate() {
        Context context = null;
        ConnectionFactory connectionFactory = null;

        logger.info("In PublisherBean.ejbCreate()");

        try {
            context = new InitialContext();
            topic = (Topic) context.lookup("java:comp/env/jms/TopicName");

            // Create a connection
            connectionFactory =
                (ConnectionFactory) context.lookup(
                    "java:comp/env/jms/MyConnectionFactory");
            connection = connectionFactory.createConnection();
        } catch (Throwable t) {
            // JMSException or NamingException could be thrown
            logger.severe("PublisherBean.ejbCreate:" + "Exception: " +
                t.toString());
        }
    }

    /**
     * Chooses a message type by using the random number
     * generator found in java.util.  Called by publishNews().
     *
     * @return   the String representing the message type
     */
    private String chooseType() {
        int whichMsg;
        Random rgen = new Random();

        whichMsg = rgen.nextInt(messageTypes.length);

        return messageTypes[whichMsg];
    }

    /**
     * Creates session, publisher, and message.  Publishes
     * messages after setting their NewsType property and using
     * the property value as the message text. Messages are
     * received by MessageBean, a message-driven bean that uses a
     * message selector to retrieve messages whose NewsType
     * property has certain values.
     */
    public void publishNews() {
        Session session = null;
        MessageProducer publisher = null;
        TextMessage message = null;
        int numMsgs = messageTypes.length * 3;
        String messageType = null;

        try {
            session = connection.createSession(true, 0);
            publisher = session.createProducer(topic);
            message = session.createTextMessage();

            for (int i = 0; i < numMsgs; i++) {
                messageType = chooseType();
                message.setStringProperty("NewsType", messageType);
                message.setText("Item " + i + ": " + messageType);
                logger.info("PUBLISHER: Setting " + "message text to: " +
                    message.getText());
                publisher.send(message);
            }
        } catch (Throwable t) {
            // JMSException could be thrown
            logger.severe("PublisherBean.publishNews: " + "Exception: " +
                t.toString());
            sc.setRollbackOnly();
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException e) {
                }
            }
        }
    }

    /**
     * Closes the connection.
     */
    public void ejbRemove() throws RemoteException {
        System.out.println("In PublisherBean.ejbRemove()");

        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void ejbActivate() {
    }

    public void ejbPassivate() {
    }
}

编写接收消息的Message-Driven Bean

 
package mdb;

import javax.ejb.*;
import javax.naming.*;
import javax.jms.*;
import java.util.logging.*;

/**
 * The MessageBean class is a message-driven bean.  It implements
 * the javax.ejb.MessageDrivenBean and javax.jms.MessageListener
 * interfaces. It is defined as public (but not final or
 * abstract).  It defines a constructor and the methods
 * setMessageDrivenContext, ejbCreate, onMessage, and
 * ejbRemove.
 */
public class MessageBean implements MessageDrivenBean, MessageListener {
    static final Logger logger = Logger.getLogger("MessageBean");
    private transient MessageDrivenContext mdc = null;
    private Context context;

    /**
     * Constructor, which is public and takes no arguments.
     */
    public MessageBean() {
        logger.info("In MessageBean.MessageBean()");
    }

    /**
     * setMessageDrivenContext method, declared as public (but
     * not final or static), with a return type of void, and
     * with one argument of type javax.ejb.MessageDrivenContext.
     *
     * @param mdc    the context to set
     */
    public void setMessageDrivenContext(MessageDrivenContext mdc) {
        logger.info("In MessageBean.setMessageDrivenContext()");
        this.mdc = mdc;
    }

    /**
     * ejbCreate method, declared as public (but not final or
     * static), with a return type of void, and with no
     * arguments.
     */
    public void ejbCreate() {
        logger.info("In MessageBean.ejbCreate()");
    }

    /**
     * onMessage method, declared as public (but not final or
     * static), with a return type of void, and with one argument
     * of type javax.jms.Message.
     *
     * Casts the incoming Message to a TextMessage and displays
     * the text.
     *
     * @param inMessage    the incoming message
     */
    public void onMessage(Message inMessage) {
        TextMessage msg = null;

        try {
            if (inMessage instanceof TextMessage) {
                msg = (TextMessage) inMessage;
                logger.info("MESSAGE BEAN: Message received: " + msg.getText());
            } else {
                logger.warning("Message of wrong type: " +
                    inMessage.getClass().getName());
            }
        } catch (JMSException e) {
            logger.severe("MessageBean.onMessage: JMSException: " +
                e.toString());
            e.printStackTrace();
            mdc.setRollbackOnly();
        } catch (Throwable te) {
            logger.severe("MessageBean.onMessage: Exception: " + te.toString());
            te.printStackTrace();
        }
    }

    /**
     * ejbRemove method, declared as public (but not final or
     * static), with a return type of void, and with no
     * arguments.
     */
    public void ejbRemove() {
        logger.info("In MessageBean.remove()");
    }
}

由于同步接收消息需要客户端等待,堵塞服务器资源。为了解决这种方式带来的弊端,采用异步方式接收是一种不错的选择。

6.4.2. 使用 message-driven beans 异步接收信息

J2EE应用服务器有一种特殊的EJB,就是Message-Driver Bean,它可以异步地处理JMS消息,普通的EJB只能同步的接收消息。

Message-Driver Bean是一个消息监听器,可以从队列或持久注册者可靠地接收消息。消息发送方可以是 J2EE 组件,ejb ,Web组件或者其它客户端。

Message-Driver Bean有onMessage方法,供消息到达时被调用。

Message-Driver Bean和一般的消息监听器的区别是:

Message-Driver Bean由容器来管理和执行

Message-Driver Bean在部署时确定例如 Destination , ConnectionFactory等等,而普通的用法是需要用代码来实现

Message-Driver Bean 注册消息监听器不能用 setMessageListener 方法。

指定消息的应答模式

假如 AMQ 和Apusic 应用服务器采用 resource adapter 整合,可以直接指定connection factory, durable subscription, message selector, acknowledgment mode等等。

Message-Driver Bean必须实现一些固定的接口和方法

必须实现 javax.ejb.MessageDrivenBean 和 javax.jms.MessageListener 接口

ejbCreate() 方法,建立 JMS API connection

ejbRemove() 方法,关闭 JMS API connection

setMessageDrivenContext(MessageDrivenContext mdc)方法,MessageDrivenContext提供一些附加的方法用来做事务管理相关的事情。

Message-Driver Bean和其它EJB不同的是它没有home和remote接口。

从生命周期来看,Message-Driver Bean和Stateless Session Bean有些相似,和客户端没有状态关联。

Message-Driver Bean可以作为池的方式以便加强并行执行的效率,但是这种方式带来的是消息顺序处理需要应用程序自己考虑。

Message-Driver Bean调用过程

首先调用 setMessageDrivenContext 方法,把上下文传给该实例

调用 ejbCreate 方法。

MDB调用过程

图 6.3. MDB调用过程


6.4.3. 管理分布式事务

J2EE应用程序通常使用分布式事务来确保和外部资源的整合。

J2EE应用程序可以包括把数据库更新和消息发送作为一个事务单元,来保证它们发生的原子性。可以把对多个资源的访问包括在一个事务当中。例如:可以在Servlet中调用EJB来更新数据库,然后调用JMS来进行消息发送,然后调用JCA的模块,以上的动作包含在一个事务当中。

但是,一个事务不能包括同一个信息的收发,即一个事务中发送一个信息,同时接收这个信息。这在消息服务器的角度是不可行的。

EJB容器的事务有容器管理和Bean管理两种模式

当使用容器管理方式时:

当异常发生时,需要调用MessageDrivenContext 的 setRollbackOnly()法:该方法标记当前的事务是要Rollback

MessageDrivenContext 的 getRollbackOnly()法可以获得当前事务是否是Rollback状态。

消息应答模式一般不需要指定。

当使用Bean管理模式时:

事务的范围在 UserTransaction.begin 和 UserTransaction.commit 之内,创建Session的动作Connection.createSession需要在事务范围之内。

当 Message-Driver Bean使用Bean管理模式时,消息应答部分在事务范围外的。

当onMessage抛出 RuntimeException时,容器未对应答进行处理,消息服务器会重新发送信息。

6.4.4. 使用普通的JMS API

在J2EE应用中也可以和客户端方式相同的使用JMS。

JSP,Servlet可以直接调用 JMS API,但是在JSP,Servlet中只能使用同步地方式接收消息,发送消息没有区别。