6.5. 消息的信息

一个消息对象一般分成三个部分:消息头,消息属性,消息体。

消息头:描述谁创建的消息,什么时候创建的,多长时间有效,描述信息的目标和路由,消息的应答模式。

消息属性:是指由创建者自己建立的消息属性,可以建立多个。

消息体:存放消息的内容。

JMS的接受者可以根据消息头和消息属性的内容有选择的接收消息,这种过滤机制被称为 message selectors。

JMS消息结构

图 6.4. JMS消息结构


6.5.1. 消息头

对应的获取和存储消息头信息的API有:

消息头可以区分为消息系统自动赋值的属性和应用赋值的属性。

public interface Message {

    //获得 Destination 对象,可能为Topic 和 Queue
    public Destination getJMSDestination( ) throws JMSException;
    //设定 Destination 对象,可能为Topic 和 Queue
    public void setJMSDestination(Destination destination) throws JMSException;
    //获得分发模式 持久模式和非持久模式
    public int getJMSDeliveryMode( ) throws JMSException;
    //设定分发模式 持久模式和非持久模式
    public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    //获得消息ID
    public String getJMSMessageID( ) throws JMSException;
    //设定消息ID(消息系统自己使用)
    public void setJMSMessageID(String id) throws JMSException;
    //获得消息时间戳
    public long getJMSTimestamp( ) throws JMSException;
    //设定消息时间戳(消息系统自己使用)
    public void setJMSTimestamp(long timestamp) throws JMSException;
    //获取消息过期时间
    public long getJMSExpiration( ) throws JMSException;
    //设定消息过期时间
    public void setJMSExpiration(long expiration) throws JMSException;
    //获取消息是否重新分发
    public boolean getJMSRedelivered( ) throws JMSException;
    //设定消息是否是重新分发(消息系统自己使用)
    public void setJMSRedelivered(boolean redelivered) throws JMSException;
    //获取消息优先级
    public int getJMSPriority( ) throws JMSException;
    //设定消息优先级
    public void setJMSPriority(int priority) throws JMSException;
    //获取消息回复目标
    public Destination getJMSReplyTo( ) throws JMSException;
    //设定消息回复目标
    public void setJMSReplyTo(Destination replyTo) throws JMSException;
    //获取消息关联ID
    public String getJMSCorrelationID( ) throws JMSException;
    //设定消息关联ID
    public void setJMSCorrelationID(String correlationID) throws JMSException;

    public byte[] getJMSCorrelationIDAsBytes( ) throws JMSException;

    public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    //获取消息体结构和类型
    public String getJMSType( ) throws JMSException;
    //设定消息体结构和类型(系统使用)
    public void setJMSType(String type) throws JMSException;

}
    
  • 由消息系统赋值的消息头

JMSDestination:

Topic destination = (Topic) message.getJMSDestination();

JMSDeliveryMode:

int deliverymode = message.getJMSDeliveryMode();
if (deliverymode = javax.jms.DeliveryMode.PERSISTENT) {
    ...
} else { // equals DeliveryMode.NON_PERSISTENT
    ...
}

TopicPublisher topicPublisher = topicSession.createPublisher(topic);
topicPublisher.setDeliveryMode(javax.jms.DeliveryMode.NON_PERSISTENT);

JMSMessageID:

String messageid = message.getJMSMessageID( );

JMSTimestamp:

long timestamp = message.getJMSTimestamp( );

JMSExpiration:

//消息过期时长的设定和获得(单位毫秒,默认为0,表示永远不过期)
long timeToLive = message.getJMSExpiration( );

TopicPublisher topicPublisher = topicSession.createPublisher(topic);
// Set time to live as 1 hour (1000 millis x 60 sec x 60 min)
topicPublisher.setTimeToLive(3600000);

JMSRedelivered:

//表明该消息是否是重新发送到消息接受者。
boolean isRedelivered = message.getJMSRedelivered()

setJMSRedelivered(boolean isOrNo)(由消息系统本身使用)

JMSPriority:

//消息的优先级:发送消息时可以指定消息发送的优先级,0-4表明为正常的优先级,5-9表明为更高的优先级(取值范围0-9)。
//获取优先级
int priority = message.getJMSPriority( );
//设定优先级
TopicPublisher topicPublisher = TopicSession.createPublisher(someTopic);
topicPublisher.setPriority(9);

  • 由应用赋值的消息头

JMSReplyTo:

//有时消息发送者需要接受者回复消息,那么就要指定回复的的 Destination,如Topic,Queue。
message.setJMSReplyTo(topic);
...
Topic topic = (Topic) message.getJMSReplyTo( );

JMSCorrelationID:

//指定该消息的关系ID,例如其它消息的MesseageID,等等
message.setJMSCorrelationID(identifier);
...
String correlationid = message.getJMSCorrelationID();

JMSType:

对消息体的一种描述,描述消息体的结构和类型

6.5.2. 消息属性

消息属性类似消息头的补充,它可以定义更多的信息,根据属性名和值对应的关系,支持的类型有:String, boolean, byte, double, int, long, float。

应用指定属性:

TextMessage message = pubSession.createTextMessage( );
message.setText(text);
message.setStringProperty("username", username);
publisher.publish(message);

Message接口对以上各种类型的消息属性提供访问和操纵方法:

public interface Message {

    public String getStringProperty(String name) throws JMSException, MessageFormatException;
    public void setStringProperty(String name, String value) throws JMSException, MessageNotWriteableException;
    public int getIntProperty(String name) throws JMSException, MessageFormatException;
    public void setIntProperty(String name, int value) throws JMSException, MessageNotWriteableException;
    public boolean getBooleanProperty(String name) throws JMSException, MessageFormatException;
    public void setBooleanProperty(String name, boolean value) throws JMSException, MessageNotWriteableException;
    public double getDoubleProperty(String name) throws JMSException, MessageFormatException;
    public void setDoubleProperty(String name, double value) throws JMSException, MessageNotWriteableException;
    public float getFloatProperty(String name) throws JMSException, MessageFormatException;
    public void setFloatProperty(String name, float value) throws JMSException, MessageNotWriteableException;
    public byte getByteProperty(String name) throws JMSException, MessageFormatException;
    public void setByteProperty(String name, byte value) throws JMSException, MessageNotWriteableException;
    public long getLongProperty(String name) throws JMSException, MessageFormatException;
    public void setLongProperty(String name, long value) throws JMSException, MessageNotWriteableException;
    public short getShortProperty(String name) throws JMSException, MessageFormatException;
    public void setShortProperty(String name, short value) throws JMSException, MessageNotWriteableException;
    public Object getObjectProperty(String name) throws JMSException, MessageFormatException;
    public void setObjectProperty(String name, Object value) throws JMSException, MessageNotWriteableException;
    public void clearProperties( ) throws JMSException;
    public Enumeration getPropertyNames( ) throws JMSException;
    public boolean propertyExists(String name) throws JMSException;
    ...

}

其中 getObjectProperty 方法和 setObjectProperty 方法为允许 Java的String,Boolean,Byte,Short,Integer,Long,Float,Double对象直接作为属性值,其它类型的对象会作为不支持的类型抛出异常。

消息的属性在信息发出后是不可以改变的,假如接收者试图去改变消息的属性,消息系统将抛出javax.jms.MessageNotWriteableException,但是消息接受者可以clearProperties()清空消息属性。

getPropertyNames( )方法会返回所有消息名字的枚举对象,这样可以方便的一次将所有的消息属性名得到。

 
public void onMessage(Message message) {
    Enumeration propertyNames = message.getPropertyNames( );
    while(propertyNames.hasMoreElements( )){
        String name = (String)propertyNames.nextElement( );
        Object value = getObjectProperty(name);
        System.out.println("\nname+" = "+value);
    }
}

6.5.3. 消息选择对象(Message Selectors)

消息选择对象是指消息接受者可以根据某些条件过滤有选择的接收消息,目前是采用消息头和消息属性的内容作为过滤的条件。

以下的例子是指应用指定username属性,然后接受者不接收 username 为 William 的消息。

 
protected void writeMessage(String text) throws JMSException{
    TextMessage message = session.createTextMessage( );
    message.setText(text);
    message.setStringProperty("username",username);
    publisher.publish(message);
}

TopicSubscriber subscriber = session.createSubscriber(chatTopic, " username != 'William' ",false);

过滤的条件可以比较复杂,写法和SQL 92标准一致。例如:

 
String selector =
    "PhysicianType IN ('Chiropractor', 'Psychologist', 'Dermatologist') "
    + "AND PatientGroupID LIKE 'ACME%'";
TopicSubscriber subscriber = session.createSubscriber(topic, selector,false);

String selector =
    "TotalCharge > 500.00 AND ((TotalCharge / ItemCount) >= 75.00) "
    + "AND State IN ('MN', 'WI', 'MI', 'OH')";
TopicSubscriber subscriber = session.createSubscriber(topic, selector,false);

String selector =
    "InventoryID = 'S93740283-02' AND Quantity BETWEEN 1000 AND 13000";
TopicSubscriber subscriber = session.createSubscriber(topic, selector,false);

等等

6.5.4. 消息类型

总共有六种消息类型:

Message, TextMessage, StreamMessage, MapMessage, ObjectMessage,BytesMessage,其中Message是其它五种的上层抽象。

Message:

这种消息只能包括消息头和消息属性,一般使用在事件通知等场合。

 
// Create and deliver a Message
Message message = session.createMessage();
publisher.publish(message);
...
// Receive a message on the consumer
public void onMessage(Message message){
// No payload, process event notification
}

TextMessage:

这种类型使用 java.lang.String 作为消息体,使用方法如下:

 
TextMessage textMessage = session.createTextMessage( );
textMessage.setText("Hello!");
topicPublisher.publish(textMessage);
...
TextMessage textMessage = session.createTextMessage("Hello!");
queueSender.send(textMessage);

StreamMessage:

这种消息类型用来发送流消息,StreamMessage消息接口对各种数据类型的接口如下:

 
package javax.jms;

public interface StreamMessage extends Message {

    public boolean readBoolean( ) throws JMSException;
    public void writeBoolean(boolean value) throws JMSException;
    public byte readByte( ) throws JMSException;
    public int readBytes(byte[] value) throws JMSException;
    public void writeByte(byte value) throws JMSException;
    public void writeBytes(byte[] value) throws JMSException;
    public void writeBytes(byte[] value, int offset, int length)
    throws JMSException;
    public short readShort( ) throws JMSException;
    public void writeShort(short value) throws JMSException;
    public char readChar( ) throws JMSException;
    public void writeChar(char value) throws JMSException;
    public int readInt( ) throws JMSException;
    public void writeInt(int value) throws JMSException;
    public long readLong( ) throws JMSException;
    public void writeLong(long value) throws JMSException;
    public float readFloat( ) throws JMSException;
    public void writeFloat(float value) throws JMSException;
    public double readDouble( ) throws JMSException;
    public void writeDouble(double value) throws JMSException;
    public String readString( ) throws JMSException;
    public void writeString(String value) throws JMSException;
    public Object readObject( ) throws JMSException;
    public void writeObject(Object value) throws JMSException;
    public void reset( ) throws JMSException;

}

看起来 StreamMessage 和 BytesMessage 看起来比较相似,其实他们之间还是有差别,StreamMessage接收时要求对原生的数据类型必须匹配。

StreamMessage streamMessage = session.createStreamMessage( );
streamMessage.writeLong(2938302);
// 以下接收方法将会抛出异常
short value = streamMessage.readShort( );

如果以上的情况发生在BytesMessage的情况下,程序能正常运行。

MapMessage:

 
public interface MapMessage extends Message {

    public boolean getBoolean(String name) throws JMSException;
    public void setBoolean(String name, boolean value)throws JMSException;
    public byte getByte(String name) throws JMSException;
    public void setByte(String name, byte value) throws JMSException;
    public byte[] getBytes(String name) throws JMSException;
    public void setBytes(String name, byte[] value)throws JMSException;
    public void setBytes(String name, byte[] value, int os, int length)throws JMSException;
    public short getShort(String name) throws JMSException;
    public void setShort(String name, short value) throws JMSException;
    public char getChar(String name) throws JMSException;
    public void setChar(String name, char value) throws JMSException;
    public int getInt(String name) throws JMSException;
    public void setInt(String name, int value) throws JMSException;
    public long getLong(String name) throws JMSException;
    public void setLong(String name, long value) throws JMSException;
    public float getFloat(String name) throws JMSException;
    public void setFloat(String name, float value)throws JMSException;
    public double getDouble(String name) throws JMSException;
    public void setDouble(String name, double value)throws JMSException;
    public String getString(String name) throws JMSException;
    public void setString(String name, String value)throws JMSException;
    public Object getObject(String name) throws JMSException;
    public void setObject(String name, Object value)throws JMSException;
    public Enumeration getMapNames( ) throws JMSException;
    public boolean itemExists(String name) throws JMSException;

}

//生成MapMessage:
MapMessage mapMessage = session.createMapMessage( );
mapMessage.setInt("Age", 88);
mapMessage.setFloat("Weight", 234);
mapMessage.setString("Name", "Smith");
mapMessage.setObject("Height", new Double(150.32));
....
int age = mapMessage.getInt("Age");
float weight = mapMessage.getFloat("Weight");
String name = mapMessage.getString("Name");
Double height = (Double)mapMessage.getObject("Height");

//接收消息:
public void onMessage(Message message) {
    MapMessage mapMessage = (MapMessage)message;
    Enumeration names = mapMessage.getMapNames( );
    while(names.hasMoreElements( )){
        String name = (String)names.nextElement( );
        Object value = mapMessage.getObject(name);
        System.out.println("Name = "+name+", Value = "+value);
    }
}

ObjectMessage:

这种消息类型传输Java对象,要求对象必须为可序列化,这是专门针对Java语言的类型,其它种类的客户端不能使用该类型。

 
//发送:
ObjectMessage objectMessage = session.createObjectMessage( );
objectMessage.setObject(order);
queueSender.send(objectMessage);
...
ObjectMessage objectMessage = session.createObjectMessage(order);
topicPublisher.publish(objectMessage);

//接收:
public void onMessage(Message message) {
    try {
        ObjectMessage objectMessage = (ObjectMessage)message;
        Order order = (Order)objectMessage.getObject( );
        ...
    }
    catch (JMSException jmse){
        ...
    }
}

BytesMessage:

这种消息类型用来发送字节消息,BytesMessage消息接口对各种数据类型的接口如下:

 
package javax.jms;

public interface BytesMessage extends Message {

    public byte readByte( ) throws JMSException;
    public void writeByte(byte value) throws JMSException;
    public int readUnsignedByte( ) throws JMSException;
    public int readBytes(byte[] value) throws JMSException;
    public void writeBytes(byte[] value) throws JMSException;
    public int readBytes(byte[] value, int length)
    throws JMSException;
    public void writeBytes(byte[] value, int offset, int length)
    throws JMSException;
    public boolean readBoolean( ) throws JMSException;
    public void writeBoolean(boolean value) throws JMSException;
    public char readChar( ) throws JMSException;
    public void writeChar(char value) throws JMSException;
    public short readShort( ) throws JMSException;
    public void writeShort(short value) throws JMSException;
    public int readUnsignedShort( ) throws JMSException;
    public void writeInt(int value) throws JMSException;
    public int readInt( ) throws JMSException;
    public void writeLong(long value) throws JMSException;
    public long readLong( ) throws JMSException;
    public float readFloat( ) throws JMSException;
    public void writeFloat(float value) throws JMSException;
    public double readDouble( ) throws JMSException;
    public void writeDouble(double value) throws JMSException;
    public String readUTF( ) throws JMSException;
    public void writeUTF(String value) throws JMSException;
    public void writeObject(Object value) throws JMSException;
    public void reset( ) throws JMSException;

}

//发送:
BytesMessage bytesMessage = session.createBytesMessage( );
bytesMessage.writeChar('R');
bytesMessage.writeInt(10);
bytesMessage.writeUTF("Test");
queueSender.send(bytesMessage);

//接收:
public void onMessage(Message message) {
    try {
        BytesMessage bytesMessage = (BytesMessage)message;
        char c = bytesMessage.readChar( );
        int i = bytesMessage.readInt( );
        String s = bytesMessage.readUTF( );
    } catch (JMSException jmse){
        ...
    }
}