JMS Parent | PTPDomain | Pub/Sub Domain |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver,QueueBrowser | TopicSubscriber |
消息头 | 由谁设置 |
JMSDestination | send 或 publish 方法 |
JMSDeliveryMode | send 或 publish 方法 |
JMSExpiration | send 或 publish 方法 |
JMSPriority | send 或 publish 方法 |
JMSMessageID | send 或 publish 方法 |
JMSTimestamp | send 或 publish 方法 |
JMSCorrelationID | 客户 |
JMSReplyTo | 客户 |
JMSType | 客户 |
JMSRedelivered | JMS Provider |
消息类型 | 消息体 |
TextMessage | java.lang.String对象,如xml文件内容 |
MapMessage | 名/值对的集合,名是String对象,值类型可以是Java任何基本类型 |
BytesMessage | 字节流 |
StreamMessage | Java中的输入输出流 |
ObjectMessage | Java中的可序列化对象 |
Message | 没有消息体,只有消息头和属性。 |
TextMessage message = queueSession.createTextMessage(); message.setText(msg_text); // msg_text is a String queueSender.send(message);
Message m = queueReceiver.receive(); if (m instanceof TextMessage) { TextMessage message = (TextMessage) m; System.out.println("Reading message: " + message.getText()); } else { // Handle error }
名称 | 描述 |
Queue | 由JMS Provider 管理,队列由队列名识别,客户端可以通过JNDI 接口用队列名得到一个队列对象。 |
TemporaryQueue | 由QueueConnection 创建,而且只能由创建它的QueueConnection 使用。 |
QueueConnectionFactory | 客户端用QueueConnectionFactory 创建QueueConnection 对象。 |
QueueConnection | 一个到JMS PTP provider 的连接,客户端可以用QueueConnection 创建QueueSession 来发送和接收消息。 |
QueueSession | 提供一些方法创建QueueReceiver 、QueueSender、QueueBrowser 和TemporaryQueue。如果在QueueSession 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当接收者下次连接到相同的队列时,这些消息还会被再次接收。 |
QueueReceiver | 客户端用QueueReceiver 接收队列中的消息,如果用户在QueueReceiver 中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。 |
QueueSender | 客户端用QueueSender 发送消息到队列。 |
QueueBrowser | 客户端可以QueueBrowser 浏览队列中的消息,但不会收走消息。 |
QueueRequestor | JMS 提供QueueRequestor 类简化消息的收发过程。QueueRequestor 的构造函数有两个参数:QueueSession 和queue,QueueRequestor 通过创建一个临时队列来完成最终的收发消息请求。 |
可靠性(Reliability) | 队列可以长久地保存消息直到接收者收到消息。接收者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。 |
名称 | 描述 |
订阅(subscription) | 消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。 |
Topic | 主题由JMS Provider 管理,主题由主题名识别,客户端可以通过JNDI 接口用主题名得到一个主题对象。JMS 没有给出主题的组织和层次结构的定义,由JMS Provider 自己定义。 |
TemporaryTopic | 临时主题由TopicConnection 创建,而且只能由创建它的TopicConnection 使用。临时主题不能提供持久订阅功能。 |
TopicConnectionFactory | 客户端用TopicConnectionFactory 创建TopicConnection 对象。 |
TopicConnection | TopicConnection 是一个到JMS Pub/Sub provider 的连接,客户端可以用TopicConnection创建TopicSession 来发布和订阅消息。 |
TopicSession | TopicSession 提供一些方法创建TopicPublisher、TopicSubscriber、TemporaryTopic 。它还提供unsubscribe 方法取消消息的持久订阅。 |
TopicPublisher | 客户端用TopicPublisher 发布消息到主题。 |
TopicSubscriber | 客户端用TopicSubscriber 接收发布到主题上的消息。可以在TopicSubscriber 中设置消息过滤功能,这样,不符合要求的消息不会被接收。 |
Durable TopicSubscriber | 如果一个客户端需要持久订阅消息,可以使用Durable TopicSubscriber,TopSession 提供一个方法createDurableSubscriber创建Durable TopicSubscriber 对象。 |
恢复和重新派送(Recovery and Redelivery) | 非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重新派送一个未签收的消息。 |
TopicRequestor | JMS 提供TopicRequestor 类简化消息的收发过程。TopicRequestor 的构造函数有两个参数:TopicSession 和topic。TopicRequestor 通过创建一个临时主题来完成最终的发布和接收消息请求。 |
可靠性(Reliability) | 当所有的消息必须被接收,则用持久订阅模式。当丢失消息能够被容忍,则用非持久订阅模式。 |
发送消息
第一,启动Apusic应用服务器。
第二,打开管理工具:在浏览器中输入http://localhost:6882,缺省管理员为admin,密码也是admin。
第三,配置队列连接创建器:在管理工具中点击“基本配置”-->“消息”,点击“队列连接创建器”,如果存在JNDI名为“jms/QueueConnectionFactory”的队列连接创建器,则无须创建,否则点击“创建”,添加必要信息,“队列连接创建器名称”为“QueueConnectionFactory”,“队列连接创建器JNDI名”为“jms/QueueConnectionFactory”,“连接是否可以匿名访问”为“是”,其它项为缺省设置,点击“保存”。如下:
第四,配置队列:在管理工具中点击“基本配置”-->“消息”,点击“队列”,如果存在JNDI名为“jms/QueueConnectionFactory”的队列,则无须创建,否则点击“创建”,添加必要信息,“队列名称”为“testQueue”,“队列JNDI名”为“myQueue”,其它项为缺省设置,点击“保存”。如下:
第五,编写发送消息客户端代码:Send.java
import javax.jms.*; import javax.naming.*; import java.io.*; import java.util.*; import java.rmi.RemoteException;/** * Title: JMS * Description: JMS Test * Copyright: Copyright (c) 2002 * Company: Apusic * @author Michael * @version 1.0 */ public class Send{ String queueName = "myQueue"; QueueConnectionFactory queueConnectionFactory = null; Queue queue = null; QueueConnection queueConnection = null; QueueSession queueSession = null; QueueSender queueSender = null; TextMessage message = null; public static void main(String[] args) throws Exception { InitialContext ic = getInitialContext(); Send sender = new Send(); sender.init(ic) ; sender.sendMessage(); sender.close(); } public void init(InitialContext ctx) throws Exception{ queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory"); queueConnection = queueConnectionFactory.createQueueConnection(); queue = (Queue) ctx.lookup(queueName); } public void sendMessage() throws JMSException,RemoteException{ queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); queueSender = queueSession.createSender(queue); queueSender.setDeliveryMode(DeliveryMode.PERSISTENT); message = queueSession.createTextMessage(); message.setText("The Message from myQueue"); queueSender.send(message); } public void close() throws JMSException{ if(queueConnection!=null) queueConnection.close(); } private static InitialContext getInitialContext() throws NamingException{ Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.jndi.InitialContextFactory"); env.put(Context.PROVIDER_URL, "rmi://localhost:6888"); return (new InitialContext(env)); } }
如果管理工具中没有JNDI名为“jms/QueueConnectionFactory”的队列连接创建器和“myQueue”的队列,则依照上面一到四步进行设置,否则可直接编写接收消息客户端代码:Receive.java
import javax.jms.*; import javax.naming.*; import java.io.*; import java.util.*; import java.rmi.RemoteException;/** * Title: JMS * Description: JMS Test * Copyright: Copyright (c) 2002 * Company: Apusic * @author Michael * @version 1.0 */ public class Receive{ String queueName = "myQueue"; QueueConnectionFactory queueConnectionFactory = null; Queue queue = null; QueueConnection queueConnection = null; QueueSession queueSession = null; QueueReceiver queueReceiver = null; TextMessage message = null; public static void main(String[] args) throws Exception { InitialContext ic = getInitialContext(); Receive receiver = new Receive(); receiver.init(ic) ; receiver.TBreceiveMessage();//你可以在此处调用YBreceiveMessage receiver.close(); } public void init(InitialContext ctx) throws Exception{ queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory"); queueConnection = queueConnectionFactory.createQueueConnection(); queue = (Queue) ctx.lookup(queueName); } public void TBreceiveMessage() throws NamingException, JMSException,RemoteException{ queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); queueReceiver = queueSession.createReceiver(queue); queueConnection.start(); for (;;) { message = (TextMessage) queueReceiver.receive(); System.out.println("Reading message: " + message.getText()); if (message.getText().equals("quit")) break; } } public void YBreceiveMessage() throws NamingException, JMSException,RemoteException,IOException{ queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE); queueReceiver = queueSession.createReceiver(queue); //register my textListener which comes from MessageListener TextMessageListener textListener = new TextMessageListener(); queueReceiver.setMessageListener(textListener); queueConnection.start(); System.out.println("To end program, enter Q or q, then "); InputStreamReader reader = new InputStreamReader(System.in); char answer = '\0'; while (!((answer == 'q') || (answer == 'Q'))) answer = (char)reader.read(); } public void close() throws JMSException{ if(queueReceiver!=null) queueReceiver.close(); if(queueSession!=null) queueSession.close(); if(queueConnection!=null) queueConnection.close(); } private static InitialContext getInitialContext() throws NamingException{ Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY, "com.apusic.jndi.InitialContextFactory"); env.put(Context.PROVIDER_URL, "rmi://localhost:6888"); return (new InitialContext(env)); } }
import javax.jms.MessageListener; import javax.jms.Message; import javax.jms.TextMessage; import javax.jms.JMSException; /** * Title: JMS * Description: JMS Test * Copyright: Copyright (c) 2002 * Company: Apusic * @author Michael * @version 1.0 */ public class TextMessageListener implements MessageListener { public TextMessageListener() {} public void onMessage(Message m) { TextMessage msg = (TextMessage) m; try { System.out.println("Async reading message: " + msg.getText() + " (priority=" + msg.getJMSPriority() + ")"); } catch (JMSException e) { System.out.println("Exception in onMessage(): " + e.toString()); } } }
如果是异步接收消息,在运行Receive
的窗口中可以看到输出:
To end program, enter Q or q, then
Async reading message: The Second Message from testQueue (priority=4)
如果是同步接收消息,在运行Receive
的窗口中可以看到输出:
Reading message: The Message from testQueue
发布消息
如果管理工具中没有JNDI名为“jms/TopicConnectionFactory”的队列连接创建器和“myTopic”的队列,则依照上面一到四步进行设置,否则可直接编写发布消息客户端代码:Published.java
import javax.jms.*; import javax.naming.*; import java.io.*; import java.util.*; import java.rmi.RemoteException;/** * Title: JMS * Description: JMS Test * Copyright: Copyright (c) 2002 * Company: Apusic * @author Michael * @version 1.0 */ public class Published{ String topicName = "myTopic"; TopicConnectionFactory topicConnectionFactory = null; Topic topic = null; TopicConnection topicConnection = null; TopicSession topicSession = null; TopicPublisher topicPublisher = null; String msgText = null; TextMessage message = null; public static void main(String[] args) throws Exception { InitialContext ic = getInitialContext(); Published publisher = new Published(); publisher.init(ic) ; publisher.publish(); publisher.close(); } public void init(InitialContext ctx) throws Exception{ topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); topic = (Topic) ctx.lookup(topicName); } public void publish() throws NamingException, JMSException,RemoteException{ topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); topicPublisher = topicSession.createPublisher(topic); message = topicSession.createTextMessage(); msgText = "This is the published message"; message.setText(msgText); topicPublisher.publish(message); } public void close() throws JMSException{ if(topicConnection!=null) topicConnection.close(); } private static InitialContext getInitialContext() throws NamingException{ Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.jndi.InitialContextFactory"); env.put(Context.PROVIDER_URL, "rmi://localhost:6888"); return (new InitialContext(env)); } }
import javax.jms.*; import javax.naming.*; import java.io.*; import java.util.*; import java.rmi.RemoteException;/** * Title: JMS * Description: JMS Test * Copyright: Copyright (c) 2002 * Company: Apusic * @author Michael * @version 1.0 */ public class Subscriber{ String topicName = "myTopic"; TopicConnectionFactory topicConnectionFactory = null; TopicConnection topicConnection = null; Topic topic = null; TopicSession topicSession = null; TopicSubscriber topicSubscriber = null; TextMessage message = null; String id = "durable"; public static void main(String[] args) throws Exception { InitialContext ic = getInitialContext(); Subscriber subscriber = new Subscriber(); subscriber.init(ic) ; subscriber.subscribe(); subscriber.close(); } public void init(InitialContext ctx) throws Exception{ topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory"); topicConnection = topicConnectionFactory.createTopicConnection(); topicConnection.setClientID(id) ; topic = (Topic) ctx.lookup(topicName); } public void subscribe() throws NamingException, JMSException,RemoteException{ topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); topicSubscriber = topicSession.createDurableSubscriber(topic,id); topicConnection.start(); message = (TextMessage) topicSubscriber.receive(); System.out.println("SUBSCRIBER THREAD: Reading message: " + message.getText()); } public void close() throws JMSException{ if(topicConnection!=null) topicConnection.close(); } private static InitialContext getInitialContext() throws NamingException{ Hashtable env = new Hashtable(); env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.jndi.InitialContextFactory"); env.put(Context.PROVIDER_URL, "rmi://localhost:6888"); return (new InitialContext(env)); } }
SUBSCRIBER THREAD: Reading message: This is the published message
首先创建testQueue队列,如果Apusic不存在testQueue队列,则通过管理工具创建,方法同上。
建立安全角色(security-role)rbt,将该角色映射到用户larry,使larry
拥有该角色包含的所有权限。如果Apuisc应用服务器不存在larry用户,在管理工具中“用户管理”-->“用户/组”
中创建larry用户。然后可参考apusic.jar中jms.dtd的定义在jms.xml中加入如下片段:
<security-role>
<role-name>rbt</role-name>
<principal>larry</principal>
</security-role>
设置安全角色rbt的权限
<destination-permission>
<role-name>rbt</role-name>
<destination-method>
<queue-name>testQueue</queue-name>
<method-name>receive</method-name>
<method-name>browse</method-name>
</destination-method>
</destination-permission>
Client----->Computer A(Router x)----------Computer B(Router z) | Connector 3 | | Connector 1 |Connector 2 | | Computer C(Router y)--------------------
... <SERVICE CLASS="com.apusic.jms.server.JMSServer"> <ATTRIBUTE NAME="BackStoreDirectory" VALUE="store/jms"/> </SERVICE> ...
... <SERVICE CLASS="com.apusic.jms.server.JMSServer"> <ATTRIBUTE NAME="BackStoreDirectory" VALUE="store/jms"/> </SERVICE> ...
...
<SERVICE
CLASS="com.apusic.jms.server.JMSServer">
<ATTRIBUTE NAME="BackStoreDirectory" VALUE="store/jms"/>
<ATTRIBUTE NAME="RemoteRouters" VALUE="computerB,
computerC,computerD,computerE"/>
</SERVICE>
...
...
<SERVICE
CLASS="com.apusic.jms.server.JMSServer">
<ATTRIBUTE NAME="BackStoreDirectory" VALUE="store/jms"/>
<ATTRIBUTE NAME="RouterName" VALUE="routerA"/>
<ATTRIBUTE NAME="RemoteRouters" VALUE="routerB,
routerC,routerD,routerE"/>
</SERVICE>
...
... <SERVICE CLASS="com.apusic.jms.routing.RoutingConnector" NAME="Connector:Name=toB"> <ATTRIBUTE NAME="RemoteHost" VALUE="computerB"/> <ATTRIBUTE NAME="RemotePort" VALUE="6888"/> </SERVICE> ...
属性 | 描述 | 值类型 | 缺省值 |
type | 连接创建器的消息模型(PTP和Pub/Sub)类型 | “QueueConnectionFactory”或“TopicConnectionFactory”,分别对应PTP和Pub/Sub模型,此属性必须定义 | 无 |
pooled | 指定此连接创建器是否对其管理的连接使用连接池 | “True”或“False” | “False” |
secure | 指定连接创建器所提供连接的通讯方式 | “True”或“False” | “False” |
anonymous | 是否授权匿名用户访问此连接创建器 | “True”或“False” | “True” |
client-id | 由于标识连接客户状态的标识符,通常被用于Pub/Sub模型中的持久订阅(Durable subscription) | 字符串,此属性是可选的 | 无 |
default-delivery-mode | 使用由此连接创建器生成的连接发送消息时,缺省的发送方式 | “persistent”或“non-persistent” | “non-persistent” |
default-priority | 使用由此连接创建器生成的连接发送消息时,缺省的优先级 | 数字(0~9) | “4” |
default-time-to-live | 使用由此连接创建器生成的连接发送消息时,对于已发送的消息,消息系统保留此消息的缺省时间长度,单位为毫秒。 | 整型 | 0 |
min-pool-size | 此连接创建器对应的连接池中,所保持的最少连接数 | 整型 | 5 |
min-pool-size | 此连接创建器对应的连接池中,所保持的最少连接数 | 整型 | 30 |
idle-timeout | 连接等待超时时间。当连接池中的某个连接等待被使用的实际时间超过此属性数值时,连接池自动关闭此连接 | 整型,单位是秒 | 300 |
属性 | 描述 | 值类型 | 缺省值 |
cache-size | 队列缓冲中保留的消息个数 | 整型 | 20 |
expiry-check-interval | 系统检测消息队列中消息是否过期的时间间隔,单位是秒 | 整型 | 60 |
属性 | 描述 | 值类型 | 缺省值 |
cache-size | 主题缓冲中保留的消息个数 | 整型 | 20 |
expiry-check-interval | 系统检测消息主题中消息是否过期的时间间隔,单位是秒 | 整型 | 60 |
... <queue> <queue-name>testQueue</queue-name> </queue> ...
... <security-role> <role-name>foo</role-name> <principal>admin</principal> </security-role> ...
... <destination-permission> <role-name>foo</role-name> <destination-method> <queue-name>testQueue</queue-name> <method-name>receive</method-name> <method-name>browse</method-name> </destination-method> </destination-permission> ...