6.3. 客户端方式使用JMS

6.3.1. 开发JMS Client的步骤

一般来说,一个JMS 应用是几个JMS 客户端交换消息,开发JMS 客户端应用由以下几步构成:

  • 用 JNDI 寻找得到ConnectionFactory 对象;

  • 用 JNDI 寻找或者Session的CreateQueue(CreateTopic)方法得到目标队列或主题对象,即Destination 对象;

  • 用ConnectionFactory 创建Connection 对象;

  • 用Connection 对象创建一个或多个JMS Session;

  • 用Session 和Destination 创建MessageProducer 和MessageConsumer;

  • 通知Connection 开始传递消息。

6.3.2. PTP模型应用

PTP 模型主要包含消息的发送和接收,下面我们举例说明:

  • 发送消息

1. 首先编辑发送节点的 config/jms.xml 文件,加上以下一段配置:

...
<connection-factory>
    <description>The default queue connection factory</description>
    <display-name>QueueConnectionFactory</display-name>
    <jndi-name>jms/QueueConnectionFactory</jndi-name>
</connection-factory>
...

(如果同名的connection-factory 已经存在,就不用再创建)

同时还要编辑消息的目的节点的 config/jms.xml 文件,加上以下一段配置:

...
<queue clustered="false">
    <queue-name>testQueue</queue-name>
    <jndi-name>testQueue</jndi-name>
</queue>
...

(如果同名的queue 已经存在,就不用再创建)

2. 启动 AMQ

由于 AMQ 目前对配置文件的读取是在启动阶段,所以需要重新启动服务器。

3. 编写发送消息客户端代码:Send.java

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
 * Title:        JMS
 * Description:  JMS Test
 * Copyright:    Copyright (c) 2003
 * Company:      Apusic
 * @author Apusic
 * @version 1.0
 */

public class Send{

   String                  queueName = "testQueue";//消息的目的队列的队列名
   String                  routerName = "RouterB";//消息的目的节点的路由名
   QueueConnectionFactory  queueConnectionFactory = null;
   Queue                   queue = null;
   QueueConnection         queueConnection = null;
   QueueSession            queueSession = null;
   QueueSender             queueSender = null;
   TextMessage             message = null;
   
   public static void main(String[] args) throws Exception {

        InitialContext ic = getInitialContext();
        Send sender = new Send();
        sender.init(ic) ;
        sender.sendMessage();
        sender.close();
   }
   
   public void init(InitialContext ctx) throws Exception{
   
        queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
        queueConnection = queueConnectionFactory.createQueueConnection();        
   }
   
   public void sendMessage() throws  JMSException,RemoteException{
   
        queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
        //队列由:"队列名" + "@" + "目的节点路由名" 在网络中唯一标识,通过该方式创建的队列
        //将代表远程节点上的某个队列
        queue = queueSession.createQueue(queueName + "@" + routerName);
        queueSender = queueSession.createSender(queue);
        queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
        message = queueSession.createTextMessage();
        message.setText("The Message from testQueue");
        queueSender.send(message);
   }
   
   public void close() throws JMSException{
   
        if (queueConnection!=null)
            queueConnection.close();
   }
   
   private static InitialContext getInitialContext() throws NamingException{

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
        env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
        return (new InitialContext(env));

   }
}

  • 接收消息

该部分建立在发送消息时 jms.xml 已经配置好connection-factory,queue 的前提下 在目的节点编写接收消息客户端代码:Receive.java

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
 * Title:        JMS
 * Description:  JMS Test
 * Copyright:    Copyright (c) 2003
 * Company:      Apusic
 * @author Apusic
 * @version 1.0
 */

public class Receive{

   String                 queueName = "testQueue";
   QueueConnectionFactory queueConnectionFactory = null;
   Queue                  queue = null;
   QueueConnection        queueConnection = null;
   QueueSession           queueSession = null;
   QueueReceiver          queueReceiver = null;
   TextMessage            message = null;

   public static void main(String[] args) throws Exception {

        InitialContext ic = getInitialContext();
        Receive receiver = new Receive();
        receiver.init(ic) ;
        receiver.TBreceiveMessage();//你可以在此处调用YBreceiveMessage
        receiver.close();
   }
   
   public void init(InitialContext ctx) throws Exception{
   
        queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
        queueConnection = queueConnectionFactory.createQueueConnection();
        //采用以下的lookup方法找回来的队列是本地节点的队列
        queue = (Queue) ctx.lookup(queueName);
        //假如想收取远程节点的队列上的消息时,采用以下的createQueue方法创建一个远程的队列。
        //队列由:"队列名" + "@" + "目的节点路由名" 在网络中唯一标识。
        //通过该方式创建的队列将代表远程节点上的某个队列,收取消息时将直接收取该节点的该队列上的消息。
        //queue = queueSession.createQueue(queueName + "@" + routerName);
   }
   
   public void TBreceiveMessage() throws NamingException, JMSException,RemoteException{
        queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
        queueReceiver = queueSession.createReceiver(queue);
        queueConnection.start();
        for (;;) {
            message = (TextMessage) queueReceiver.receive();
            System.out.println("Reading message: " + message.getText());
            if (message.getText().equals("quit"))
                break;
        }
   }
   
   public void YBreceiveMessage() throws NamingException, JMSException,RemoteException,IOException{
        
        queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
        queueReceiver = queueSession.createReceiver(queue);
        //register my textListener which comes from MessageListener
        TextMessageListener textListener = new TextMessageListener();
        queueReceiver.setMessageListener(textListener);
        queueConnection.start();

        System.out.println("To end program, enter Q or q, then ");
        InputStreamReader reader = new InputStreamReader(System.in);
        char answer = '\0';

        while (!((answer == 'q') || (answer == 'Q')))
            answer = (char)reader.read();
   }

   public void close() throws JMSException{
   
       if (queueReceiver!=null)
           queueReceiver.close();
       if (queueSession!=null)
           queueSession.close();
       if (queueConnection!=null)
           queueConnection.close();
   }
   
   private static InitialContext getInitialContext() throws NamingException{

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY,
               "com.apusic.naming.jndi.CNContextFactory");
        env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
        return (new InitialContext(env));

   }
}

其中异步接收时使用的TextMessageListener代码如下:

import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;

/**
 * Title:        JMS
 * Description:  JMS Test
 * Copyright:    Copyright (c) 2003
 * Company:      Apusic
 * @author Apusic
 * @version 1.0
 */

public class TextMessageListener implements MessageListener {

    public TextMessageListener() {}

    public void onMessage(Message m) {
        
        TextMessage msg = (TextMessage) m;
        try {
            System.out.println("Async reading message: " + msg.getText() +
                " (priority=" + msg.getJMSPriority() + ")");
        } catch (JMSException e) {
            System.out.println("Exception in onMessage(): " + e.toString());
        }
    }
}

最后,编译好程序,运行程序:打开两个命令行窗口,首先运行Receive 然后运行Send。

如果是异步接收消息,在运行Receive 的窗口中可以看到输出:

 
To end program, enter Q or q, then
Async reading message: The Second Message from testQueue (priority=4)

如果是同步接收消息,在运行Receive 的窗口中可以看到输出:

 
Reading message: The Message from testQueue

6.3.3. PUB/SUB模型应用

Pub/Sub 模型主要包含消息的发布和订阅,下面我们分别举例说明:

  • 发布消息

1. 先编辑 config/jms.xml 文件,加上以下一段配置:

...
<connection-factory>
    <description>The default topic connection factory</description>
    <display-name>TopicConnectionFactory</display-name>
    <jndi-name>jms/TopicConnectionFactory</jndi-name>
</connection-factory>
...

(如果同名的connection-factory 已经存在,就不用再创建)

...
<queue clustered="false">
    <queue-name>testTopic</queue-name>
    <jndi-name>testTopic</jndi-name>
</queue>
...

(如果同名的topic 已经存在,就不用再创建)

2. 启动 AMQ

由于 AMQ 目前对配置文件的读取是在启动阶段,所以需要重新启动服务器。

3. 编写发布消息客户端代码:Published.java

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
 * Title:        JMS
 * Description:  JMS Test
 * Copyright:    Copyright (c) 2003
 * Company:      Apusic
 * @author Apusic
 * @version 1.0
 */

public class Published{

    String                       topicName = "myTopic";
    TopicConnectionFactory       topicConnectionFactory = null;
    Topic                        topic = null;
    TopicConnection              topicConnection = null;
    TopicSession                 topicSession = null;
    TopicPublisher               topicPublisher = null;
    String                       msgText = null;
    TextMessage                  message = null;

    public static void main(String[] args) throws Exception {

        InitialContext ic = getInitialContext();
        Published publisher = new Published();
        publisher.init(ic) ;
        publisher.publish();
        publisher.close();
    }
    
    public void init(InitialContext ctx) throws Exception{
    
        topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory");
        topicConnection = topicConnectionFactory.createTopicConnection();
        topic = (Topic) ctx.lookup(topicName);
    }
    
    public void publish() throws NamingException, JMSException,RemoteException{
    
        topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        topicPublisher = topicSession.createPublisher(topic);
        message = topicSession.createTextMessage();
        msgText = "This is the published message";
        message.setText(msgText);
        topicPublisher.publish(message);
    }
    
    public void close() throws JMSException{
    
        if (topicConnection!=null)
            topicConnection.close();
    }
    
    private static InitialContext getInitialContext() throws NamingException{
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
        env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
        return (new InitialContext(env));
    }
}

  • 订阅消息

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
 * Title:        JMS
 * Description:  JMS Test
 * Copyright:    Copyright (c) 2003
 * Company:      Apusic
 * @author Apusic
 * @version 1.0
 */

public class Subscriber{

    String                       topicName = "myTopic";
    TopicConnectionFactory       topicConnectionFactory = null;
    TopicConnection              topicConnection = null;
    Topic                        topic = null;
    TopicSession                 topicSession = null;
    TopicSubscriber              topicSubscriber = null;
    TextMessage                  message = null;
    String id = "durable";

    public static void main(String[] args) throws Exception {
        InitialContext ic = getInitialContext();
        Subscriber subscriber = new Subscriber();
        subscriber.init(ic) ;
        subscriber.subscribe();
        subscriber.close();
    }
    
    public void init(InitialContext ctx) throws Exception{
        topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory");
        topicConnection = topicConnectionFactory.createTopicConnection();
        topicConnection.setClientID(id) ;
        topic = (Topic) ctx.lookup(topicName);
    }
    
    public void subscribe() throws NamingException, JMSException,RemoteException{
        topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
        topicSubscriber = topicSession.createDurableSubscriber(topic,id);
        topicConnection.start();
        message = (TextMessage) topicSubscriber.receive();
        System.out.println("SUBSCRIBER THREAD: Reading message: " + message.getText());
    }

    public void close() throws JMSException{
        if (topicConnection!=null)
            topicConnection.close();
    }
    
    private static InitialContext getInitialContext() throws NamingException{
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
        env.put(Context.PROVIDER_URL, "iiop://localhost:4888");
        return (new InitialContext(env));
    }
}

最后,运行程序:打开两个命令行窗口,首先运行Subscriber,然后运行Published,在运行Subscriber的窗口中可以看到输出:

SUBSCRIBER THREAD: Reading message: This is the published message。