这部分主要描述在J2EE 应用中使用MQ的场景和用法。主要包括:
使用 Session 和 Entity Bean来发送和同步接收消息
使用 message-driven beans 异步的接收信息
管理分布式事务
使用普通的JMS API(在普通应用和Web应用中)
J2EE应用可以使用 Session和Entity Bean 来发送消息,同步接收消息
Session bean使用JMS API
编写 client
package client;
import javax.ejb.EJBHome;
import javax.naming.*;
import javax.rmi.PortableRemoteObject;
import javax.jms.*;
import sb.*;
/**
* The MyAppClient class is the client program for this J2EE
* application. It obtains a reference to the home interface
* of the Publisher enterprise bean and creates an instance of
* the bean. After calling the publisher's publishNews method
* twice, it removes the bean.
*/
public class MyAppClient {
public static void main(String[] args) {
MyAppClient client = new MyAppClient();
client.doTest();
System.exit(0);
}
public void doTest() {
try {
Context ic = new InitialContext();
System.out.println("Looking up EJB reference");
java.lang.Object objref =
ic.lookup("java:comp/env/ejb/remote/Publisher");
System.err.println("Looked up home");
PublisherHome pubHome =
(PublisherHome) PortableRemoteObject.narrow(objref,
PublisherHome.class);
System.err.println("Narrowed home");
/*
* Create bean instance, invoke business method
* twice, and remove bean instance.
*/
PublisherRemote phr = pubHome.create();
System.err.println("Got the EJB");
phr.publishNews();
phr.publishNews();
phr.remove();
System.out.println("To view the bean output,");
System.out.println(
" check <install_dir>/domains/domain1/logs/server.log.");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
编写发送消息的 Session Bean
对于 JNDI的名字需要按照实际的名字进行对应
编写Home接口
package sb;
import java.rmi.RemoteException;
import javax.ejb.EJBHome;
import javax.ejb.CreateException;
/**
* Home interface for Publisher enterprise bean.
*/
public interface PublisherHome extends EJBHome {
PublisherRemote create() throws RemoteException, CreateException;
}
编写remote接口
package sb;
import javax.ejb.*;
import java.rmi.RemoteException;
/**
* Remote interface for Publisher enterprise bean. Declares one
* business method.
*/
public interface PublisherRemote extends EJBObject {
void publishNews() throws RemoteException;
}
编写EJB object
package sb;
import java.rmi.RemoteException;
import java.util.*;
import java.util.logging.*;
import javax.ejb.*;
import javax.naming.*;
import javax.jms.*;
/**
* Bean class for Publisher enterprise bean. Defines publishNews
* business method as well as required methods for a stateless
* session bean.
*/
public class PublisherBean implements SessionBean {
final static String[] messageTypes =
{
"Nation/World", "Metro/Region", "Business", "Sports", "Living/Arts",
"Opinion"
};
static final Logger logger = Logger.getLogger("PublisherBean");
SessionContext sc = null;
Connection connection = null;
Topic topic = null;
public PublisherBean() {
logger.info("In PublisherBean() (constructor)");
}
/**
* Sets the associated session context. The container calls
* this method after the instance creation.
*
* @param sc the context to set
*/
public void setSessionContext(SessionContext sc) {
this.sc = sc;
}
/**
* Instantiates the enterprise bean. Creates the
* connection and looks up the topic.
*/
public void ejbCreate() {
Context context = null;
ConnectionFactory connectionFactory = null;
logger.info("In PublisherBean.ejbCreate()");
try {
context = new InitialContext();
topic = (Topic) context.lookup("java:comp/env/jms/TopicName");
// Create a connection
connectionFactory =
(ConnectionFactory) context.lookup(
"java:comp/env/jms/MyConnectionFactory");
connection = connectionFactory.createConnection();
} catch (Throwable t) {
// JMSException or NamingException could be thrown
logger.severe("PublisherBean.ejbCreate:" + "Exception: " +
t.toString());
}
}
/**
* Chooses a message type by using the random number
* generator found in java.util. Called by publishNews().
*
* @return the String representing the message type
*/
private String chooseType() {
int whichMsg;
Random rgen = new Random();
whichMsg = rgen.nextInt(messageTypes.length);
return messageTypes[whichMsg];
}
/**
* Creates session, publisher, and message. Publishes
* messages after setting their NewsType property and using
* the property value as the message text. Messages are
* received by MessageBean, a message-driven bean that uses a
* message selector to retrieve messages whose NewsType
* property has certain values.
*/
public void publishNews() {
Session session = null;
MessageProducer publisher = null;
TextMessage message = null;
int numMsgs = messageTypes.length * 3;
String messageType = null;
try {
session = connection.createSession(true, 0);
publisher = session.createProducer(topic);
message = session.createTextMessage();
for (int i = 0; i < numMsgs; i++) {
messageType = chooseType();
message.setStringProperty("NewsType", messageType);
message.setText("Item " + i + ": " + messageType);
logger.info("PUBLISHER: Setting " + "message text to: " +
message.getText());
publisher.send(message);
}
} catch (Throwable t) {
// JMSException could be thrown
logger.severe("PublisherBean.publishNews: " + "Exception: " +
t.toString());
sc.setRollbackOnly();
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
}
}
}
}
/**
* Closes the connection.
*/
public void ejbRemove() throws RemoteException {
System.out.println("In PublisherBean.ejbRemove()");
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void ejbActivate() {
}
public void ejbPassivate() {
}
}
编写接收消息的Message-Driven Bean
package mdb;
import javax.ejb.*;
import javax.naming.*;
import javax.jms.*;
import java.util.logging.*;
/**
* The MessageBean class is a message-driven bean. It implements
* the javax.ejb.MessageDrivenBean and javax.jms.MessageListener
* interfaces. It is defined as public (but not final or
* abstract). It defines a constructor and the methods
* setMessageDrivenContext, ejbCreate, onMessage, and
* ejbRemove.
*/
public class MessageBean implements MessageDrivenBean, MessageListener {
static final Logger logger = Logger.getLogger("MessageBean");
private transient MessageDrivenContext mdc = null;
private Context context;
/**
* Constructor, which is public and takes no arguments.
*/
public MessageBean() {
logger.info("In MessageBean.MessageBean()");
}
/**
* setMessageDrivenContext method, declared as public (but
* not final or static), with a return type of void, and
* with one argument of type javax.ejb.MessageDrivenContext.
*
* @param mdc the context to set
*/
public void setMessageDrivenContext(MessageDrivenContext mdc) {
logger.info("In MessageBean.setMessageDrivenContext()");
this.mdc = mdc;
}
/**
* ejbCreate method, declared as public (but not final or
* static), with a return type of void, and with no
* arguments.
*/
public void ejbCreate() {
logger.info("In MessageBean.ejbCreate()");
}
/**
* onMessage method, declared as public (but not final or
* static), with a return type of void, and with one argument
* of type javax.jms.Message.
*
* Casts the incoming Message to a TextMessage and displays
* the text.
*
* @param inMessage the incoming message
*/
public void onMessage(Message inMessage) {
TextMessage msg = null;
try {
if (inMessage instanceof TextMessage) {
msg = (TextMessage) inMessage;
logger.info("MESSAGE BEAN: Message received: " + msg.getText());
} else {
logger.warning("Message of wrong type: " +
inMessage.getClass().getName());
}
} catch (JMSException e) {
logger.severe("MessageBean.onMessage: JMSException: " +
e.toString());
e.printStackTrace();
mdc.setRollbackOnly();
} catch (Throwable te) {
logger.severe("MessageBean.onMessage: Exception: " + te.toString());
te.printStackTrace();
}
}
/**
* ejbRemove method, declared as public (but not final or
* static), with a return type of void, and with no
* arguments.
*/
public void ejbRemove() {
logger.info("In MessageBean.remove()");
}
}
由于同步接收消息需要客户端等待,堵塞服务器资源。为了解决这种方式带来的弊端,采用异步方式接收是一种不错的选择。
J2EE应用服务器有一种特殊的EJB,就是Message-Driver Bean,它可以异步地处理JMS消息,普通的EJB只能同步的接收消息。
Message-Driver Bean是一个消息监听器,可以从队列或持久注册者可靠地接收消息。消息发送方可以是 J2EE 组件,ejb ,Web组件或者其它客户端。
Message-Driver Bean有onMessage方法,供消息到达时被调用。
Message-Driver Bean和一般的消息监听器的区别是:
Message-Driver Bean由容器来管理和执行
Message-Driver Bean在部署时确定例如 Destination , ConnectionFactory等等,而普通的用法是需要用代码来实现
Message-Driver Bean 注册消息监听器不能用 setMessageListener 方法。
指定消息的应答模式
假如 AMQ 和Apusic 应用服务器采用 resource adapter 整合,可以直接指定connection factory, durable subscription, message selector, acknowledgment mode等等。
Message-Driver Bean必须实现一些固定的接口和方法
必须实现 javax.ejb.MessageDrivenBean 和 javax.jms.MessageListener 接口
ejbCreate() 方法,建立 JMS API connection
ejbRemove() 方法,关闭 JMS API connection
setMessageDrivenContext(MessageDrivenContext mdc)方法,MessageDrivenContext提供一些附加的方法用来做事务管理相关的事情。
Message-Driver Bean和其它EJB不同的是它没有home和remote接口。
从生命周期来看,Message-Driver Bean和Stateless Session Bean有些相似,和客户端没有状态关联。
Message-Driver Bean可以作为池的方式以便加强并行执行的效率,但是这种方式带来的是消息顺序处理需要应用程序自己考虑。
Message-Driver Bean调用过程
首先调用 setMessageDrivenContext 方法,把上下文传给该实例
调用 ejbCreate 方法。
J2EE应用程序通常使用分布式事务来确保和外部资源的整合。
J2EE应用程序可以包括把数据库更新和消息发送作为一个事务单元,来保证它们发生的原子性。可以把对多个资源的访问包括在一个事务当中。例如:可以在Servlet中调用EJB来更新数据库,然后调用JMS来进行消息发送,然后调用JCA的模块,以上的动作包含在一个事务当中。
但是,一个事务不能包括同一个信息的收发,即一个事务中发送一个信息,同时接收这个信息。这在消息服务器的角度是不可行的。
EJB容器的事务有容器管理和Bean管理两种模式
当使用容器管理方式时:
当异常发生时,需要调用MessageDrivenContext 的 setRollbackOnly()法:该方法标记当前的事务是要Rollback
MessageDrivenContext 的 getRollbackOnly()法可以获得当前事务是否是Rollback状态。
消息应答模式一般不需要指定。
当使用Bean管理模式时:
事务的范围在 UserTransaction.begin 和 UserTransaction.commit 之内,创建Session的动作Connection.createSession需要在事务范围之内。
当 Message-Driver Bean使用Bean管理模式时,消息应答部分在事务范围外的。
当onMessage抛出 RuntimeException时,容器未对应答进行处理,消息服务器会重新发送信息。