6.2. JMS编程模型

JMS编程模型

图 6.1. JMS编程模型


ConnectionFactory是在jms.xml文件事先定义好的,用来创建Connection的工厂

Context ctx = new InitialContext(); 
ConnectionFactory connectionFactory = (ConnectionFactory)
ctx.lookup("JNDI_NAME ");

如果应用程序与服务器不在同一个虚拟机时

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
env.put(Context.PROVIDER_URL, "iiop://hostname:4888"); //取决于地址和端口
new InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory)
ctx.lookup("JNDI_NAME");

Destinations是指消息发送客户端的消息目标和消息接收客户端的消息来源,它也是预先在jms.xml定义好的。

对于PTP类型,Destinations对应的类型为Queue,对于Pub/Sub 类型,Destinations对应的类型为Topic。

获取方式:

Destination myDest = (Destination) ctx.lookup("testTopic"); 
Queue myQueue = (Queue) ctx.lookup("testQueue");
或者:
//通过下面这种方式创建出来的Queue,代表一个远程 AMQ 服务器上的队列,@符号之前
//是队列的名称,@符号之后是服务器路由名。发送消息到该队列时,会将消息路由到@符号之后
//指定的服务器,并将消息放进@符号之前指定的队列。
Queue messageQueue = queueSession.createQueue("testQueue@ServerRouterName");

为客户端和消息服务器建立的一个连接,它可以创建Session,并且可以创建多个Session(稍后介绍)。

获取方法:

Connection connection = connectionFactory.createConnection();

当使用完连接的时候,你需要关闭连接,否则该连接会一直保持。关闭连接的时候它同时会关闭该连接产生的 Session。

connection.close();

连接创建好后,一般先调用 start() 方法。

Sessions是单线程的,用来创建消息发送者和消息接受者的上下文。Sessions 同时还可以执行 Message Listeners

获取方法:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);

三种应答模式的差别,请参照 AMQ 相关概念中的应答模式。

message producer是由Session创建的一个对象,它是用来向 Destinations 发送信息的。

MessageProducer producer = session.createProducer(myDest); 
MessageProducer producer = session.createProducer(myTopic); 
或者:
QueueSender sender = queueSession.createSender(messageQueue);

也可以创建时不指定Destinations ,发送时再指定。

创建好后,可以继续创建消息,然后调用发送方法发送消息。

TextMessage message = session.createTextMessage();
message.setText(msg_text);
producer.send(message); 

当发送时指定 Destinations ,你的使用方法如下:

MessageProducer anon_prod = session.createProducer(null); 
anon_prod.send(myQueue, message); 

message consumer是指从 destination 接收消息的对象

MessageConsumer consumer = session.createConsumer(myQueue); 
MessageConsumer consumer = session.createConsumer(myTopic);

你还能使用 Session.createDurableSubscriber 创建一个持久的Topic注册对象。

它和普通的Topic注册对象的区别是不管当前是否在线,均能收到对应Destination的全部信息

String subName = "MySub";
MessageConsumer topicSubscriber = session.createDurableSubscriber(myTopic, subName); 

停止持久的Topic注册对象使用以下方法。

topicSubscriber.close();
session.unsubscribe("MySub");

使用Consumer前应该先调用 connection.start() 方法

connection.start();
Message m = consumer.receive(); 
//或者
connection.start();
Message m = consumer.receive(3000); // 3秒后超时

这是为异步调用设计的一种方式。

它是把 消息监听器 注册到consumer中,然后消息监听器实现一个onMesseag()方法,当消息到达是,会调用消息监听器的onMesseag()方法。

Listener myListener = new Listener();
consumer.setMessageListener(myListener);

AMQ 允许消息选择性的接收,语法符合SQL92。

例如:客户只收取消息属性 NewsType 为 Sports和 Opinion的消息。

NewsType = 'Sports' OR NewsType = 'Opinion'

然后再创建 Consumer时指定 Selectors。

例如

createConsumer(Destination destination, String messageSelector) 
createConsumer(Destination destination, String messageSelector, boolean NoLocal) 
createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) 

把其中的messageSelector参数变成NewsType = 'Sports' OR NewsType = 'Opinion'即可。