一般来说,一个JMS 应用是几个JMS 客户端交换消息,开发JMS 客户端应用由以下几步构成:
用 JNDI 寻找得到ConnectionFactory 对象;
用 JNDI 寻找或者Session的CreateQueue(CreateTopic)方法得到目标队列或主题对象,即Destination 对象;
用ConnectionFactory 创建Connection 对象;
用Connection 对象创建一个或多个JMS Session;
用Session 和Destination 创建MessageProducer 和MessageConsumer;
通知Connection 开始传递消息。
PTP 模型主要包含消息的发送和接收,下面我们举例说明:
发送消息
1. 首先编辑发送节点的 config/jms.xml 文件,加上以下一段配置:
...
<connection-factory>
<description>The default queue connection factory</description>
<display-name>QueueConnectionFactory</display-name>
<jndi-name>jms/QueueConnectionFactory</jndi-name>
</connection-factory>
...
(如果同名的connection-factory 已经存在,就不用再创建)
同时还要编辑消息的目的节点的 config/jms.xml 文件,加上以下一段配置:
...
<queue clustered="false">
<queue-name>testQueue</queue-name>
<jndi-name>testQueue</jndi-name>
</queue>
...
(如果同名的queue 已经存在,就不用再创建)
2. 启动 AMQ
由于 AMQ 目前对配置文件的读取是在启动阶段,所以需要重新启动服务器。
3. 编写发送消息客户端代码:Send.java
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2003
* Company: Apusic
* @author Apusic
* @version 1.0
*/
public class Send{
String queueName = "testQueue";//消息的目的队列的队列名
String routerName = "RouterB";//消息的目的节点的路由名
QueueConnectionFactory queueConnectionFactory = null;
Queue queue = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
QueueSender queueSender = null;
TextMessage message = null;
public static void main(String[] args) throws Exception {
InitialContext ic = getInitialContext();
Send sender = new Send();
sender.init(ic) ;
sender.sendMessage();
sender.close();
}
public void init(InitialContext ctx) throws Exception{
queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
queueConnection = queueConnectionFactory.createQueueConnection();
}
public void sendMessage() throws JMSException,RemoteException{
queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
//队列由:"队列名" + "@" + "目的节点路由名" 在网络中唯一标识,通过该方式创建的队列
//将代表远程节点上的某个队列
queue = queueSession.createQueue(queueName + "@" + routerName);
queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
message = queueSession.createTextMessage();
message.setText("The Message from testQueue");
queueSender.send(message);
}
public void close() throws JMSException{
if (queueConnection!=null)
queueConnection.close();
}
private static InitialContext getInitialContext() throws NamingException{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
return (new InitialContext(env));
}
}
接收消息
该部分建立在发送消息时 jms.xml 已经配置好connection-factory,queue 的前提下 在目的节点编写接收消息客户端代码:Receive.java
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2003
* Company: Apusic
* @author Apusic
* @version 1.0
*/
public class Receive{
String queueName = "testQueue";
QueueConnectionFactory queueConnectionFactory = null;
Queue queue = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
QueueReceiver queueReceiver = null;
TextMessage message = null;
public static void main(String[] args) throws Exception {
InitialContext ic = getInitialContext();
Receive receiver = new Receive();
receiver.init(ic) ;
receiver.TBreceiveMessage();//你可以在此处调用YBreceiveMessage
receiver.close();
}
public void init(InitialContext ctx) throws Exception{
queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
queueConnection = queueConnectionFactory.createQueueConnection();
//采用以下的lookup方法找回来的队列是本地节点的队列
queue = (Queue) ctx.lookup(queueName);
//假如想收取远程节点的队列上的消息时,采用以下的createQueue方法创建一个远程的队列。
//队列由:"队列名" + "@" + "目的节点路由名" 在网络中唯一标识。
//通过该方式创建的队列将代表远程节点上的某个队列,收取消息时将直接收取该节点的该队列上的消息。
//queue = queueSession.createQueue(queueName + "@" + routerName);
}
public void TBreceiveMessage() throws NamingException, JMSException,RemoteException{
queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
for (;;) {
message = (TextMessage) queueReceiver.receive();
System.out.println("Reading message: " + message.getText());
if (message.getText().equals("quit"))
break;
}
}
public void YBreceiveMessage() throws NamingException, JMSException,RemoteException,IOException{
queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
//register my textListener which comes from MessageListener
TextMessageListener textListener = new TextMessageListener();
queueReceiver.setMessageListener(textListener);
queueConnection.start();
System.out.println("To end program, enter Q or q, then ");
InputStreamReader reader = new InputStreamReader(System.in);
char answer = '\0';
while (!((answer == 'q') || (answer == 'Q')))
answer = (char)reader.read();
}
public void close() throws JMSException{
if (queueReceiver!=null)
queueReceiver.close();
if (queueSession!=null)
queueSession.close();
if (queueConnection!=null)
queueConnection.close();
}
private static InitialContext getInitialContext() throws NamingException{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.apusic.naming.jndi.CNContextFactory");
env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
return (new InitialContext(env));
}
}
其中异步接收时使用的TextMessageListener代码如下:
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2003
* Company: Apusic
* @author Apusic
* @version 1.0
*/
public class TextMessageListener implements MessageListener {
public TextMessageListener() {}
public void onMessage(Message m) {
TextMessage msg = (TextMessage) m;
try {
System.out.println("Async reading message: " + msg.getText() +
" (priority=" + msg.getJMSPriority() + ")");
} catch (JMSException e) {
System.out.println("Exception in onMessage(): " + e.toString());
}
}
}
最后,编译好程序,运行程序:打开两个命令行窗口,首先运行Receive 然后运行Send。
如果是异步接收消息,在运行Receive 的窗口中可以看到输出:
To end program, enter Q or q, then Async reading message: The Second Message from testQueue (priority=4)
如果是同步接收消息,在运行Receive 的窗口中可以看到输出:
Reading message: The Message from testQueue
Pub/Sub 模型主要包含消息的发布和订阅,下面我们分别举例说明:
发布消息
1. 先编辑 config/jms.xml 文件,加上以下一段配置:
...
<connection-factory>
<description>The default topic connection factory</description>
<display-name>TopicConnectionFactory</display-name>
<jndi-name>jms/TopicConnectionFactory</jndi-name>
</connection-factory>
...
(如果同名的connection-factory 已经存在,就不用再创建)
...
<queue clustered="false">
<queue-name>testTopic</queue-name>
<jndi-name>testTopic</jndi-name>
</queue>
...
(如果同名的topic 已经存在,就不用再创建)
2. 启动 AMQ
由于 AMQ 目前对配置文件的读取是在启动阶段,所以需要重新启动服务器。
3. 编写发布消息客户端代码:Published.java
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2003
* Company: Apusic
* @author Apusic
* @version 1.0
*/
public class Published{
String topicName = "myTopic";
TopicConnectionFactory topicConnectionFactory = null;
Topic topic = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
TopicPublisher topicPublisher = null;
String msgText = null;
TextMessage message = null;
public static void main(String[] args) throws Exception {
InitialContext ic = getInitialContext();
Published publisher = new Published();
publisher.init(ic) ;
publisher.publish();
publisher.close();
}
public void init(InitialContext ctx) throws Exception{
topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
topic = (Topic) ctx.lookup(topicName);
}
public void publish() throws NamingException, JMSException,RemoteException{
topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
msgText = "This is the published message";
message.setText(msgText);
topicPublisher.publish(message);
}
public void close() throws JMSException{
if (topicConnection!=null)
topicConnection.close();
}
private static InitialContext getInitialContext() throws NamingException{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
return (new InitialContext(env));
}
}
订阅消息
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2003
* Company: Apusic
* @author Apusic
* @version 1.0
*/
public class Subscriber{
String topicName = "myTopic";
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
Topic topic = null;
TopicSession topicSession = null;
TopicSubscriber topicSubscriber = null;
TextMessage message = null;
String id = "durable";
public static void main(String[] args) throws Exception {
InitialContext ic = getInitialContext();
Subscriber subscriber = new Subscriber();
subscriber.init(ic) ;
subscriber.subscribe();
subscriber.close();
}
public void init(InitialContext ctx) throws Exception{
topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setClientID(id) ;
topic = (Topic) ctx.lookup(topicName);
}
public void subscribe() throws NamingException, JMSException,RemoteException{
topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createDurableSubscriber(topic,id);
topicConnection.start();
message = (TextMessage) topicSubscriber.receive();
System.out.println("SUBSCRIBER THREAD: Reading message: " + message.getText());
}
public void close() throws JMSException{
if (topicConnection!=null)
topicConnection.close();
}
private static InitialContext getInitialContext() throws NamingException{
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
return (new InitialContext(env));
}
}
最后,运行程序:打开两个命令行窗口,首先运行Subscriber,然后运行Published,在运行Subscriber的窗口中可以看到输出:
SUBSCRIBER THREAD: Reading message: This is the published message。