结构图略:和JMS编程模型的结构图类似
ConnectionFactory
由于使用该客户端的应用并不能参与到JNDI的上下文:
所以采用直接实例化的方式:
ConnectionFactory* pFactory = ConnectionFactory::NewInstance(host, port);
也可以为当前生成的连接工厂起一个名字:
ConnectionFactory* pFactory = ConnectionFactory::NewInstance(host, port, name);
Connections
为客户端和消息服务器建立的一个连接,他可以创建Session,并且可以创建多个Session(稍后介绍)。
获取方法:
使用匿名
Connection* pCnn = pFactory->CreateConnection();
给出消息服务器的用户和密码:
Connection* pCnn = pFactory->CreateConnection(username, password);
Sessions
Sessions是单线程的,用来创建消息发送者和消息接受者的上下文。Sessions 同时还可以注册Message Listeners。
获取方法:
Session* pSession = pCnn->createSession(false, AutoAcknowledge); Session* pSession = pCnn->createSession(false, ClientAcknowledge); Session* pSession = pCnn->createSession(false, DupsOKAcknowledge);
三种应答模式的差别,请参照 AMQ 相关概念中的“应答模式”部分。
Destinations
Destinations是指消息发送客户端的消息目标和消息接收客户端的消息来源,它是预先在jms.xml定义好的。
获取方式:
Destination* pDest = pSession->createQueue("testQueue");
Destination* pDest = pSession->createTopic("testTopic");
或者:
//通过下面这种方式创建出来的Queue,代表一个远程 AMQ 服务器上的队列,@符号之前
//是队列的名称,@符号之后是服务器路由名。发送消息到该队列时,会将消息路由到@符号之后
//指定的服务器,并将消息放进@符号之前指定的队列。
Destination* pDest = pSession->createQueue("testQueue@ServerRouterName");
Message Producers
message producer是由Session创建的一个对象,它是用来向 Destinations 发送信息的。
MessageProducer* pSender = pSession->createProducer(pDest);
注意:
在C/C++客户端中,必须在创建MessageProducer时指定Destinations ,不能在发送时再指定。
创建好后,可以继续创建消息,然后调用发送方法发送消息。
char* pcTxt = "Test Message"; TextMessage* msg = pSession->CreateTextMessage(pcTxt); pSender->Send(msg);
Message Consumers
message consumer是指从 destination 接收消息的对象
MessageConsumer* receiver = pSession->createConsumer(pDest);
使用Consumer前应该先调用 connection的start() 方法。
pCnn->Start(); Message* message = receiver->Receive( ); TextMessage* pTxtMsg = message->getTextMessage();
Message Listeners
这是为异步调用设计的一种方式。
它是把消息监听器注册到consumer中。
class CEventHandler : public MessageListener, public ExceptionListener
{
public:
void OnMessage(Message* message)
{
TextMessage* pTxtMsg = message->getTextMessage();
if(pTxtMsg == NULL) return;
printf("\n****************************************\n");
printf("Received a TextMessage!!\n");
printf("Message Content : %s\n", pTxtMsg->get_Text());
printf("Message ID : '%s'\n", pTxtMsg->get_JMSMessageID());
printf("Message Routing Tract : %s\n", pTxtMsg->GetStringProperty("JMSXRoutingTrack"));
printf("Property [Key = BooleanProperty, Value = %d]\n", pTxtMsg->GetBooleanProperty("BooleanProperty"));
printf("Property [Key = FloatProperty, Value = %f]\n", pTxtMsg->GetSingleProperty("FloatProperty"));
printf("****************************************\n");
message->Acknowledge();
}
void OnException(JMSException* exception)
{
printf(">> Error !! OnException Called!!\n");
}
};
CEventHandler handler;
receiver->set_MessageListener(&handler);