ConnectionFactory
ConnectionFactory是在jms.xml文件事先定义好的,用来创建Connection的工厂
Context ctx = new InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory)
ctx.lookup("JNDI_NAME ");
如果应用程序与服务器不在同一个虚拟机时
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.naming.jndi.CNContextFactory");
env.put(Context.PROVIDER_URL, "iiop://hostname:4888"); //取决于地址和端口
new InitialContext(env);
ConnectionFactory connectionFactory = (ConnectionFactory)
ctx.lookup("JNDI_NAME");
Destinations
Destinations是指消息发送客户端的消息目标和消息接收客户端的消息来源,它也是预先在jms.xml定义好的。
对于PTP类型,Destinations对应的类型为Queue,对于Pub/Sub 类型,Destinations对应的类型为Topic。
获取方式:
Destination myDest = (Destination) ctx.lookup("testTopic");
Queue myQueue = (Queue) ctx.lookup("testQueue");
或者:
//通过下面这种方式创建出来的Queue,代表一个远程 AMQ 服务器上的队列,@符号之前
//是队列的名称,@符号之后是服务器路由名。发送消息到该队列时,会将消息路由到@符号之后
//指定的服务器,并将消息放进@符号之前指定的队列。
Queue messageQueue = queueSession.createQueue("testQueue@ServerRouterName");
Connections
为客户端和消息服务器建立的一个连接,它可以创建Session,并且可以创建多个Session(稍后介绍)。
获取方法:
Connection connection = connectionFactory.createConnection();
当使用完连接的时候,你需要关闭连接,否则该连接会一直保持。关闭连接的时候它同时会关闭该连接产生的 Session。
connection.close();
连接创建好后,一般先调用 start() 方法。
Sessions
Sessions是单线程的,用来创建消息发送者和消息接受者的上下文。Sessions 同时还可以执行 Message Listeners
获取方法:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
三种应答模式的差别,请参照 AMQ 相关概念中的应答模式。
Message Producers
message producer是由Session创建的一个对象,它是用来向 Destinations 发送信息的。
MessageProducer producer = session.createProducer(myDest); MessageProducer producer = session.createProducer(myTopic); 或者: QueueSender sender = queueSession.createSender(messageQueue);
也可以创建时不指定Destinations ,发送时再指定。
创建好后,可以继续创建消息,然后调用发送方法发送消息。
TextMessage message = session.createTextMessage(); message.setText(msg_text); producer.send(message);
当发送时指定 Destinations ,你的使用方法如下:
MessageProducer anon_prod = session.createProducer(null); anon_prod.send(myQueue, message);
Message Consumers
message consumer是指从 destination 接收消息的对象
MessageConsumer consumer = session.createConsumer(myQueue); MessageConsumer consumer = session.createConsumer(myTopic);
你还能使用 Session.createDurableSubscriber 创建一个持久的Topic注册对象。
它和普通的Topic注册对象的区别是不管当前是否在线,均能收到对应Destination的全部信息
String subName = "MySub"; MessageConsumer topicSubscriber = session.createDurableSubscriber(myTopic, subName);
停止持久的Topic注册对象使用以下方法。
topicSubscriber.close();
session.unsubscribe("MySub");
使用Consumer前应该先调用 connection.start() 方法
connection.start(); Message m = consumer.receive(); //或者 connection.start(); Message m = consumer.receive(3000); // 3秒后超时
Message Listeners
这是为异步调用设计的一种方式。
它是把 消息监听器 注册到consumer中,然后消息监听器实现一个onMesseag()方法,当消息到达是,会调用消息监听器的onMesseag()方法。
Listener myListener = new Listener(); consumer.setMessageListener(myListener);
Message Selectors
AMQ 允许消息选择性的接收,语法符合SQL92。
例如:客户只收取消息属性 NewsType 为 Sports和 Opinion的消息。
NewsType = 'Sports' OR NewsType = 'Opinion'
然后再创建 Consumer时指定 Selectors。
例如
createConsumer(Destination destination, String messageSelector) createConsumer(Destination destination, String messageSelector, boolean NoLocal) createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
把其中的messageSelector参数变成NewsType = 'Sports' OR NewsType = 'Opinion'即可。