关于ActiveMQ中怎么实现一对多activemq异步发送消息息讨论

关于ActiveMQ中怎么实现一对多发送消息讨论_百度知道<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中记录了如何使用原生的方式从ActiveMQ中收发消息。可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS。我们通过一个例子展开讲述。包括队列、主题消息的收发相关的Spring配置、代码、测试。
&& & & ActiveMQ学习笔记(四)&中记录了如何使用原生的方式从ActiveMQ中收发消息。可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS。我们通过一个例子展开讲述。包括队列、主题消息的收发相关的Spring配置、代码、测试。
& & & &本例中,消息的收发都写在了一个工程里。
1.使用maven管理依赖包
&dependencies&
&dependency&
&groupId&junit&/groupId&
&artifactId&junit&/artifactId&
&version&4.12&/version&
&scope&test&/scope&
&/dependency&
&dependency&
&groupId&org.apache.activemq&/groupId&
&artifactId&activemq-all&/artifactId&
&version&5.11.0&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-jms&/artifactId&
&version&4.1.4.RELEASE&/version&
&/dependency&
&dependency&&&
&&&&&&&&&groupId&org.springframework&/groupId&&&
&&&&&&&&&artifactId&spring-test&/artifactId&&&
&&&&&&&&&version&4.1.4.RELEASE&/version&&&
&&&&&/dependency&&
&/dependencies&
2.队列消息的收发
2.1Spring配置文件
&?xml&version="1.0"&encoding="UTF-8"?&
&beans&xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
&&&&&&&&http://www.springframework.org/schema/beans/spring-beans.xsd"&
&!--&配置JMS连接工厂&--&
&bean&id="connectionFactory"&class="org.apache.activemq.ActiveMQConnectionFactory"&
&property&name="brokerURL"&value="failover:(tcp://localhost:61616)"&/&
&!--&定义消息队列(Queue)&--&
&bean&id="queueDestination"&class="org.mand.ActiveMQQueue"&
&!--&设置消息队列的名字&--&
&constructor-arg&
&value&queue1&/value&
&/constructor-arg&
&!--&配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。&--&
&bean&id="jmsTemplate"&class="org.springframework.jms.core.JmsTemplate"&
&property&name="connectionFactory"&ref="connectionFactory"&/&
&property&name="defaultDestination"&ref="queueDestination"&/&
&property&name="receiveTimeout"&value="10000"&/&
&!--queue消息生产者&--&
&bean&id="producerService"&class="guo.examples.mq02.queue.ProducerServiceImpl"&
&property&name="jmsTemplate"&ref="jmsTemplate"&&/property&
&!--queue消息消费者&--&
&bean&id="consumerService"&class="guo.examples.mq02.queue.ConsumerServiceImpl"&
&property&name="jmsTemplate"&ref="jmsTemplate"&&/property&
2.2消息生产者代码
从下面的代码可以出,使用Spring JMS,可以减少重复代码(接口类ProducerService代码省略)。
package&guo.examples.mq02.
import&javax.jms.D
import&javax.jms.JMSE
import&javax.jms.M
import&javax.jms.S
import&org.springframework.jms.core.JmsT
import&org.springframework.jms.core.MessageC
public&class&ProducerServiceImpl&implements&ProducerService&{
&&private&JmsTemplate&jmsT
&&&*&向指定队列发送消息
&&public&void&sendMessage(Destination&destination,&final&String&msg)&{
&&&&System.out.println("向队列"&+&destination.toString()&+&"发送了消息------------"&+&msg);
&&&&jmsTemplate.send(destination,&new&MessageCreator()&{
&&&&&&public&Message&createMessage(Session&session)&throws&JMSException&{
&&&&&&&&return&session.createTextMessage(msg);
&*&向默认队列发送消息
&&public&void&sendMessage(final&String&msg)&{
String&destination&=&&jmsTemplate.getDefaultDestination().toString();
&&&&System.out.println("向队列"&+destination+&"发送了消息------------"&+&msg);
&&&&jmsTemplate.send(new&MessageCreator()&{
&&&&&&public&Message&createMessage(Session&session)&throws&JMSException&{
&&&&&&&&return&session.createTextMessage(msg);
&&public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{
&&&&this.jmsTemplate&=&jmsT
2.3消息消费者代码
package&guo.examples.mq02.
import&javax.jms.D
import&javax.jms.JMSE
import&javax.jms.TextM
import&org.springframework.jms.core.JmsT
public&class&ConsumerServiceImpl&implements&ConsumerService&{
private&JmsTemplate&jmsT
&*&接受消息
public&void&receive(Destination&destination)&{
TextMessage&tm&=&(TextMessage)&jmsTemplate.receive(destination);
System.out.println("从队列"&+&destination.toString()&+&"收到了消息:\t"
+&tm.getText());
}&catch&(JMSException&e)&{
e.printStackTrace();
public&void&setJmsTemplate(JmsTemplate&jmsTemplate)&{
this.jmsTemplate&=&jmsT
2.4队列消息监听
接受消息的时候,可以不用2.3节中的方式,Spring JMS同样提供了消息监听的模式,下面给出对应的配置和代码。
Spring配置
&!--&定义消息队列(Queue),我们监听一个新的队列,queue2&--&
&bean&id="queueDestination2"&class="org.mand.ActiveMQQueue"&
&!--&设置消息队列的名字&--&
&constructor-arg&
&value&queue2&/value&
&/constructor-arg&
&!--&配置消息队列监听者(Queue),代码下面给出,只有一个onMessage方法&--&
&bean&id="queueMessageListener"&class="guo.examples.mq02.queue.QueueMessageListener"&/&
&!--&消息监听容器(Queue),配置连接工厂,监听的队列是queue2,监听器是上面定义的监听器&--&
&bean&id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"&
&property&name="connectionFactory"&ref="connectionFactory"&/&
&property&name="destination"&ref="queueDestination2"&/&
&property&name="messageListener"&ref="queueMessageListener"&/&
监听类代码
package&guo.examples.mq02.
import&javax.jms.JMSE
import&javax.jms.M
import&javax.jms.MessageL
import&javax.jms.TextM
public&class&QueueMessageListener&implements&MessageListener&{
&&&&&&&&//当收到消息时,自动调用该方法。
public&void&onMessage(Message&message)&{
TextMessage&tm&=&(TextMessage)&
System.out.println("ConsumerMessageListener收到了文本消息:\t"
+&tm.getText());
}&catch&(JMSException&e)&{
e.printStackTrace();
3.主题消息收发
& & &在使用Spring JMS的时候,主题(Topic)和队列消息的主要差异体现在JmsTemplate中"pubSubDomain"是否设置为True。如果为True,则是Topic;如果是false或者默认,则是queue。
&property&name="pubSubDomain"&value="true"&/&
3.1Spring配置
&!--&定义消息主题(Topic)&--&
&bean&id="topicDestination"&class="org.mand.ActiveMQTopic"&
&constructor-arg&
&value&guo_topic&/value&
&/constructor-arg&
&!--&配置JMS模板(Topic),pubSubDomain="true"--&
&bean&id="topicJmsTemplate"&class="org.springframework.jms.core.JmsTemplate"&
&property&name="connectionFactory"&ref="connectionFactory"&/&
&property&name="defaultDestination"&ref="topicDestination"&/&
&property&name="pubSubDomain"&value="true"&/&
&property&name="receiveTimeout"&value="10000"&/&
&!--topic消息发布者&--&
&bean&id="topicProvider"&class="guo.examples.mq02.topic.TopicProvider"&
&property&name="topicJmsTemplate"&ref="topicJmsTemplate"&&/property&
&!--&消息主题监听者&和&主题监听容器&可以配置多个,即多个订阅者&--&
&!--&消息主题监听者(Topic)&--&
&bean&id="topicMessageListener"&class="guo.examples.mq02.topic.TopicMessageListener"&/&
&!--&主题监听容器&(Topic)&--&
&bean&id="topicJmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"&
&property&name="connectionFactory"&ref="connectionFactory"&/&
&property&name="destination"&ref="topicDestination"&/&
&property&name="messageListener"&ref="topicMessageListener"&/&
3.2消息发布者
package&guo.examples.mq02.
import&javax.jms.D
import&javax.jms.JMSE
import&javax.jms.M
import&javax.jms.S
import&org.springframework.jms.core.JmsT
import&org.springframework.jms.core.MessageC
public&class&TopicProvider&{
private&JmsTemplate&topicJmsT
&*&向指定的topic发布消息
&*&@param&topic
&*&@param&msg
public&void&publish(final&Destination&topic,&final&String&msg)&{
topicJmsTemplate.send(topic,&new&MessageCreator()&{
public&Message&createMessage(Session&session)&throws&JMSException&{
System.out.println("topic&name&是"&+&topic.toString()
+&",发布消息内容为:\t"&+&msg);
return&session.createTextMessage(msg);
public&void&setTopicJmsTemplate(JmsTemplate&topicJmsTemplate)&{
this.topicJmsTemplate&=&topicJmsT
3.3消息订阅者(监听)
package&guo.examples.mq02.
import&javax.jms.JMSE
import&javax.jms.M
import&javax.jms.MessageL
import&javax.jms.TextM
&*和队列监听的代码一样。
public&class&TopicMessageListener&implements&MessageListener&{
public&void&onMessage(Message&message)&{
TextMessage&tm&=&(TextMessage)&
System.out.println("TopicMessageListener&\t"&+&tm.getText());
}&catch&(JMSException&e)&{
e.printStackTrace();
4.1 测试代码
package&guo.examples.mq02;
import&javax.jms.D
import&guo.examples.mq02.queue.ConsumerS
import&guo.examples.mq02.queue.ProducerS
import&guo.examples.mq02.topic.TopicP
import&org.junit.T
import&org.junit.runner.RunW
import&org.springframework.beans.factory.annotation.A
import&org.springframework.beans.factory.annotation.Q
import&org.springframework.test.context.ContextC
import&org.springframework.test.context.junit4.SpringJUnit4ClassR
&*&测试Spring&JMS
&*&1.测试生产者发送消息
&*&2.&测试消费者接受消息
&*&3.&测试消息监听
&*&4.测试主题监听
@RunWith(SpringJUnit4ClassRunner.class)
//&ApplicationContext&context&=&new
//&ClassPathXmlApplicationContext("applicationContext.xml");
@ContextConfiguration("/applicationContext.xml")
public&class&SpringJmsTest&{
&*&队列名queue1
@Autowired
private&Destination&queueD
&*&队列名queue2
@Autowired
private&Destination&queueDestination2;
&*&主题&guo_topic
@Autowired
@Qualifier("topicDestination")
private&Destination&
&*&主题消息发布者
@Autowired
private&TopicProvider&topicP
&*&队列消息生产者
@Autowired
@Qualifier("producerService")
private&ProducerService&
&*&队列消息生产者
@Autowired
@Qualifier("consumerService")
private&ConsumerService&
&*&测试生产者向queue1发送消息
public&void&testProduce()&{
String&msg&=&"Hello&world!";
producer.sendMessage(msg);
&*&测试消费者从queue1接受消息
public&void&testConsume()&{
consumer.receive(queueDestination);
&*&测试消息监听
&*&1.生产者向队列queue2发送消息
&*&2.ConsumerMessageListener监听队列,并消费消息
public&void&testSend()&{
producer.sendMessage(queueDestination2,&"Hello&China~~~~~~~~~~~~~~~");
&*&测试主题监听
&*&1.生产者向主题发布消息
&*&2.ConsumerMessageListener监听主题,并消费消息
public&void&testTopic()&throws&Exception&{
topicProvider.publish(topic,&"Hello&T-To-Top-Topi-Topic!");
4.2 测试结果
topic&name&是topic://guo_topic,发布消息内容为: Hello&T-To-Top-Topi-Topic!
TopicMessageListener& Hello&T-To-Top-Topi-Topic!
向队列queue://queue2发送了消息------------Hello&China~~~~~~~~~~~~~~~
ConsumerMessageListener收到了文本消息: Hello&China~~~~~~~~~~~~~~~
向队列queue://queue1发送了消息------------Hello&world!
从队列queue://queue1收到了消息: Hello&world!
5.代码地址
/s/1gdvPpWf
文中最后给出了全部示例代码地址。
& 开源中国(OSChina.NET) |
开源中国社区(OSChina.net)是工信部
指定的官方社区&&精spring整合apache activemq实现消息发送的三种方式代码配置实例我们项目中发送事件告警要用到消息队列,所以学习了下activemq,整理如下:activemq的介绍就不用说了,官网上大家可以详细的看到。1.下载并安装activemq:地址,我下面的例子用的是5.9.0的版本。下载后解压就完成安装了。进入解压目录的bin目录,选择windows位数(32/64),启动activemq.bat就可以开启activemq服务了。登陆就可以进入mq的界面了,官方默认登录名/密码是:admin/admin,也可以在conf/jetty-realm.properties中自行修改,activemq界面如下:简单介绍下导航栏:Queues:队列方式消息。Topics:主题方式消息。Subscribers:消息订阅监控查询。Connections:查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network:网络链接数监控。Scheduled:没有用到,不太清楚。Send:发送消息数据2.发送和接受消息的步骤:&a.发送消息(1)创建连接使用的工厂类JMS ConnectionFactory(2)使用管理对象JMS ConnectionFactory建立连接Connection,并启动(3)使用连接Connection 建立会话Session(4)使用会话Session和管理对象Destination创建消息生产者MessageSender(5)使用消息生产者MessageSender发送消息b.接收消息(1)创建连接使用的工厂类JMS ConnectionFactory(2)使用管理对象JMS ConnectionFactory建立连接Connection,并启动(3)使用连接Connection建立会话Session(4)使用会话Session和管理对象Destination创建消息接收者MessageReceiver(5)使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver,消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。3.spring的集成:spring集成所需要的jar包我已经在pom.xml中配置了,大家可以看看都需要哪些jar。当然下载的activemq的lib下也有这些jar。spring的整合比较简单,只需在spring的配置文件中配置消息模板JmsTemplete就可以了,具体如下:
&?xml version=&1.0& encoding=&UTF-8&?&
&beans xmlns=&http://www.springframework.org/schema/beans&
xmlns:context=&http://www.springframework.org/schema/context&
xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&
xsi:schemaLocation=&http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd&&
&!-- 连接池
&bean id=&pooledConnectionFactory& class=&org.apache.activemq.pool.PooledConnectionFactory& destroy-method=&stop&&
&property name=&connectionFactory&&
&bean class=&org.apache.activemq.ActiveMQConnectionFactory&&
&property name=&brokerURL& value=&tcp://localhost:61616& /&
&/property&
&!-- 连接工厂 --&
&bean id=&activeMQConnectionFactory& class=&org.apache.activemq.ActiveMQConnectionFactory&&
&property name=&brokerURL& value=&tcp://localhost:61616& /&
&!-- 配置消息目标 --&
&bean id=&destination& class=&org.mand.ActiveMQQueue&&
&constructor-arg index=&0& value=&com.zuidaima.spring& /&
&!-- 消息模板 --&
&bean id=&jmsTemplate& class=&org.springframework.jms.core.JmsTemplate&&
&property name=&connectionFactory& ref=&activeMQConnectionFactory& /&
&property name=&defaultDestination& ref=&destination& /&
&property name=&messageConverter&&
&bean class=&org.springframework.jms.support.converter.SimpleMessageConverter& /&
&/property&
&/beans&4.项目运行截图:JMS其他几种方式我就不一一截图了,请具体运行项目查看。5.开发环境:win7 32位+eclipse kepler + jdk7 + maven由编辑于 9:44:159个牛币请下载代码后再发表评论//zuidaima_activemq/zuidaima_activemq/.classpath/zuidaima_activemq/.project/zuidaima_activemq/.settings/zuidaima_activemq/.settings/org.eclipse.jdt.core.prefs/zuidaima_activemq/.settings/org.eclipse.m2e.core.prefs/zuidaima_activemq/pom.xml/zuidaima_activemq/src/zuidaima_activemq/src/applicationContext.xml/zuidaima_activemq/src/com/zuidaima_activemq/src/com/zuidamai/zuidaima_activemq/src/com/zuidamai/jms/zuidaima_activemq/src/com/zuidamai/pointToPoint精精精原原原原原原原最热搜索分享话题编程语言基础Web开发数据库开发客户端开发脚本工具游戏开发服务器软硬件开源组件类库相关分享最近下载暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级最近浏览暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级扫描二维码关注最代码为好友"/>扫描二维码关注最代码为好友7803人阅读
ActiveMQ(3)
1) 点对点通讯:点对点方式是最为传统和常见的通讯方式,它支持一对一、一对多、多对多、多对一等多种配置方式,支持树状、网状等多种拓扑结构。
2) 多点广播:MQ适用于不同类型的应用。其中重要的,也是正在发展中的是&多点广播&应用,即能够将消息发送到多个目标站点(Destination List)。可以使用一条MQ指令将单一消息发送到多个目标站点,并确保为每一站点可靠地提供信息。MQ不仅提供了多点广播的功能,而且还拥有智能消息分发功能,在将一条消息发送到同一系统上的多个用户时,MQ将消息的一个复制版本和该系统上接收者的名单发送到目标MQ系统。目标MQ系统在本地复制这些消息,并将它们发送到名单上的队列,从而尽可能减少网络的传输量。
3) 发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破目的队列地理指向的限制,使消息按照特定的主题甚至内容进行分发,用户或应用程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关心接收者的目的地址,而接收者也不必关心消息的发送地址,而只是根据消息的主题进行消息的收发。在MQ家族产品中,MQ Event Broker是专门用于使用发布/订阅技术进行数据通讯的产品,它支持基于队列和直接基于TCP/IP两种方式的发布和订阅。
4) 群集(Cluster):为了简化点对点通讯模式中的系统配置,MQ提供Cluster(群集)的解决方案。群集类&#20284;于一个域(Domain),群集内部的队列管理器之间通讯时,不需要两两之间建立消息通道,而是采用群集(Cluster)通道与其它成员通讯,从而大大简化了系统配置。此外,群集中的队列管理器之间能够自动进行负载均衡,当某一队列管理器出现故障时,其它队列管理器可以接管它的工作,从而大大提高系统的高可靠性。
MQ Server和MQ Client
MQ产品分为Server和Client 两种版本,在MQ服务器的运行环境下,有队列管理器、队列、消息通道等对象,它提供全面的消息服务;MQ Client为我们提供了一个MQ应用程序的开发和运行环境,它是MQ API的Client实现。在客户端环境下,没有队列管理器、队列等对象,它通过MQI通道与服务器之间建立通讯,并将消息从客户端发往服务器端的队列,或从Server端的队列中取得消息,它比较适合于网络条件较好或实时通讯的情况。同时要指出的是:采用MQ
Client并不会导致数据的丢失或不完整性。MQ Client提供下列好处:适合同步处理的工作模式;减少系统负担;减少系统管理开销;减少磁盘空间要求等。
JMS定义了五种不同的消息正文&#26684;式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息&#26684;式的一些级别的兼容性。
  · StreamMessage -- Java原始&#20540;的数据流
  · MapMessage--一套名称-&#20540;对
  · TextMessage--一个字符串对象
  · ObjectMessage--一个序列化的 Java对象
  · BytesMessage--一个未解释字节的数据流
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:429073次
积分:1847
积分:1847
排名:第15314名
原创:41篇
转载:39篇
评论:24条
(4)(2)(2)(1)(1)(4)(2)(1)(5)(7)(4)(1)(2)(1)(10)(1)(6)(8)(16)(1)(1)

我要回帖

更多关于 代码实现qq消息发送 的文章

 

随机推荐