activemq是以什么形式两学一做部署时间形式

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&ActiveMQ的配置与使用
1.什么是ActiveMQ
&&&& MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。
&&&& JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息。
JMS和MQ的关系:JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。
&&&& ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
2.ActiveMQ与Spring的集成
首先将ActiveMQ如下的jar包导入项目中。
配置activemq的spring配置文件
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"&
&!--(嵌入配置)activeMq消息容器--&
&amq:broker useJmx="true" persistent="true" &
&amq:managementContext&
&amq:managementContext createConnector="false"/&
&/amq:managementContext&
&amq:persistenceAdapter&
&amq:kahaDB directory="${jmsDir}" &
&/amq:kahaDB&
&/amq:persistenceAdapter&
&amq:transportConnectors&
&amq:transportConnector uri="tcp://${jms.ip}:${jms.port}" /&
&/amq:transportConnectors&
&/amq:broker&
&!-- wireFormat.maxInactivityDuration=0& --&
&bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"&
&property name="brokerURL" value="vm://${jms.ip}?jms.useAsyncSend=true" /&
&bean id="simpleJmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"&
&constructor-arg ref="amqConnectionFactory"/&
&property name="sessionCacheSize" value="100" /&
&property name="cacheConsumers" value="true"&&/property&
&property name="exceptionListener" ref="jmsExceptionListener"/&
&bean id="jmsExceptionListener" class="com.ibms.core.jms.JmsExceptionListener"&&/bean&
Message 转换器
&bean id="activeMqMessageConverter" class="com.ibms.core.jms.ActiveMqMessageConverter"/&
&bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"&
&property name="connectionFactory" ref="simpleJmsConnectionFactory" /&
&property name="defaultDestinationName" value="messageQueue"/&
&!-- 消息对象队列
&amq:queue id="messageQueue" name="messageQueue" physicalName="messageQueue" /&
&!-- 消息生产者
&bean id="messageProducer" class="com.ibms.core.jms.MessageProducer"/&
消息消费者
map配置的是队列中消息处理类。
键:队列中消息类 对应的类 全路径 如: com.ibms.core.model.MailModel
值:消息处理类,需要实现接口类IJmsHandler 。如:com.ibms.oa.service.jms.impl.MailHandler
用户也可以配置自己的处理方式,配置到这里。
&bean name="messageConsumer" class="com.ibms.core.jms.MessageConsumer"&
&property name="handlers"&
key="com.ibms.oa.service.jms.MessageModel"&
&bean class="com.ibms.oa.service.jms.MessageHandler"&&/bean&
&/property&
map配置的是队列中消息处理类。
值:消息处理类,需要实现接口类IMessageHandler 。
用户也可以配置自己的处理方式,配置到这里。
每增加一种消息方式的时候,需要增加对应的处理器(如下,如mailMessageHandler实现IMessageHandler 接口)--&
&bean id="mailMessageHandler"
class="com.ibms.oa.service.jms.MailMessageHandler"&&/bean&
&bean id="innerMessageHandler"
class="com.ibms.oa.service.jms.InnerMessageHandler"&&/bean&
&bean id="messageHandlerContainer " class="com.ibms.oa.service.jms.MessageHandlerContainer"&
&property name="handlersMap" ref="handlersMap"/&
&bean id="handlersMap" class="java.util.LinkedHashMap"&
&constructor-arg&
&entry key="1" value-ref="mailMessageHandler" /&
&entry key="3" value-ref="innerMessageHandler" /&
&/constructor-arg&
&!--消息监听容器 --&
&bean id="messageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"&
&property name="connectionFactory" ref="simpleJmsConnectionFactory" /&
&property name="destination" ref="messageQueue" /&
&property name="messageListener" ref="messageMsgListener" /&
&!-- 邮件消息消费监听器
&bean id="messageMsgListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"&
&constructor-arg&
&ref bean="messageConsumer"/&
&/constructor-arg&
&property name="messageConverter" ref="activeMqMessageConverter" /&
&property name="defaultListenerMethod" value="sendMessage" /&
&bean id="messageEngine" class="com.ibms.core.engine.MessageEngine"&
&property name="mailSender" ref="mailSender"/&
&property name="fromUser" value="${mail.from}"/&
然后依次实现消息的转换器ActiveMqMessageConverter
public class ActiveMqMessageConverter implements MessageConverter {
* 转换发送消息。
public Message toMessage(Object object, Session session)
throws JMSException {
* 转换接收消息。
public Object fromMessage(Message msg) throws JMSException {
消息的生产者MessageProducer
public class MessageProducer
private static final Log logger=LogFactory.getLog(MessageProducer.class);
@Resource(name="messageQueue")
private Queue messageQ
private JmsTemplate jmsT
public void send(Object model)
logger.debug("procduce the message");
//产生邮件\短信\消息发送的消息,加到消息队列中去
jmsTemplate.convertAndSend(messageQueue, model);
消息的消费者MessageConsumer(可扩展)
* 从消息队列中读取对象,并且进行消息发送。
* @author zxh
public class MessageConsumer {
* 处理消息
private Map&String, IJmsHandler& handlers = new HashMap&String, IJmsHandler&();
protected Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
public void setHandlers(Map&String, IJmsHandler& handlers) {
this.handlers =
* 发送消息
* @param model
发送的对象
* @throws Exception
public void sendMessage(Object model) throws Exception {
if (BeanUtils.isNotEmpty(handlers) && BeanUtils.isNotEmpty(model)) {
IJmsHandler jmsHandler = handlers.get(model.getClass().getName());
if(jmsHandler!=null){
jmsHandler.handMessage(model);
(model.toString());
throw new Exception("Object:[" + model + "] is not
entity Object ");
3.ActiveMQ应用场景
a.非均匀应用集成 &&&&& ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ& 也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。
b.作为RPC的替代
&&&&&&&& 应用使用RPC同步调用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。系统依赖同步需求典型地限制了扩展,因为最终需求将开始起作用,从而放慢整个系统。取而代之这种不好的体验,使用异步消息,附加的消息接收器可以轻松添加,假设你的应用可以解耦。
& c.两个应用之间解耦
&&&&&&&& 正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。你不见可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。取代使用同步方案的组件交互,组件利用异步通信。这样的松耦合遍及系统被称之为事件驱动架构(EDA)。
& d.作为事件驱动架构的主干
&&&&&&&& 在之前的观点中,解耦、异步风格架构允许软件本身进一步扩展(水平的可扩展性),而不是依赖硬件的可扩展性(垂直的可扩展)。想象一下一种难以置信的流量、电子商务网站像亚马逊。但一个用户在亚马逊上购买,有许多分开的阶段贯穿,订单需要履行包括订单配置、创建发票、支付流程、订单完成、运输等。然而,但一个用户实际上提交了一个订单,用户立即得到一个页面说明,&感谢您的订单&不仅如此,没有任何延迟。用户也收到了订单已经收到的邮件说明,订单配置流程由亚马逊雇佣就是个很好的例子,第一步在一种更大的、异步流程中。每一个订单步骤直接由分开的服务奋力地处理。但用户下了订单,异步调用提交订单,但是全部订单流程不会落后于通过网页浏览器进行的同步调用。反之,订单被接受并立即被确认。这个流程中剩余的步骤一步地被处理。如果发生了问题。组织流程进行,用户会被通知。这样的异步流程提供大量的可扩展性。
& e.改善应用可扩展性
&&&&&&&& 许多应用利用事件驱动架构,为了提供大量的可扩展性,包括像电子商务、政府、制造业和在线游戏等领域。使用异步消息在业务领域分离一个应用,许多其它可能性开始合并。考虑使用服务为特定任务设计应用的能力。这正是面向服务架构(SOA)的主干。每一个服务实现一个独立的功能,而且只是那个功能。应用通过这些服务构成来创建,在服务间使用异步消息实现通信。这种风格的应用设计被称之为复杂事件处理(CEP)。使用CEP,系统中组件之间的交互可以被进一步的分析跟踪。在考虑异步消息在系统的组件之间添加一种迂回的时候,这些可能性是无止境的。
> 本站内容系网友提交或本网编辑转载,其目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及作品内容、版权和其它问题,请及时与本网联系,我们将在第一时间删除内容!
简介 上一篇/topic/15317介绍了ActiveMQ5.0的安装,这一篇将介绍的配置.ActiveMQ包含了很多features(详见http://activemq.apache.org/features.html ),
不同的需求,不同的环境,需要不同的features ...
小菜一只,上面安排优化ActiveMQ的配置...我到处瞎鼓捣,现在发送者的速度贼快达到 5000条/s, 可是消费者的速度太慢了,而且在后台查看,1000条消息只被消费了94条.我勒个去啊,那么多被积压啊...我把导出一个jar包当做消费者运行,这样就不会积压,但是速度是 15条/s
这大坑... 我看了网上许多资料,说采用两个连接池来区分.. 求大神给 ...
Activemq xmpp配置
1.xmpp server
2. activemq 1.配置xmpp server
http://www.igniterealtime.org/downloads/index.jsp
Openfire 3.6.4(server)
Spark 2.5.8(TestClient ...
本文作者:Zhang Phil 原文链接:http://blog.csdn.net/zhangphil/article/details/ ActiveMQ安装配置和使用简例 ActiveMQ是一套JMS(Java Message Service)开源消息服务实现的组件.以Windows操作系统为例,本文简述了ActiveMQ的安装配置和使用简 ...
前言 ActiveMQ他是Apache出品的一个JMS提供者,管理会话和队列,运行在JVM下,支持多种语言,如JAVA,C++,C#,应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP,是一个消息的接受和转发的容器,可用于消息推送: 最近因为有项目支付场景用到这个,所以就临时看了下资料,粗糙的写了的笔记 配 ...
当从webservice接收到信息的时候,消息生产者producer立刻把收到的消息放入到jms里面,消费者cusomer这时要设置一个监听,当生产者发送消息时,只要消息被发出来,消费者就会接收到消息,然后进行相应的操作. 在tomcat里面,要进行配置. 首先在tomcat安装目录里面,对conf/context.xml进行配置,加入以下代码: &R ...
一.下载ActiveMQ 去官方网站下载:http://activemq.apache.org/ 二.运行ActiveMQ 将apache-activemq-5.11.1-bin.zip解压,由于本系统是32位,所以进入apache-activemq-5.11.1\bin\win32目录. 1.安装InstallService.bat,如果出现下图,也许是你 ...
配置 ActiveMQ默认使用的是XML格式配置,从4.0版本开始用MBean的方式实现XML配置,配置文件在${activemq.home}/conf目录下,文件名为activemq.xml.最新的默认配置见 http://svn.apache.org/repos/asf/activemq/trunk/assembly/src/release/conf/a ...您现在的位置: >
activemq 5.11.1 win8(x64)环境配置
&&& &&& &&&
目录:《》
大家好,我是IT学习者-螃蟹,近段时间由于更换了电脑,系统由win7(x86)更换为win8(x64),activemq也由5.10升级到5.11,前两篇也只是讲解了环境的配置,那么这里螃蟹将直接用activemq的最新版本来进行讲解。 本文来自
首先就是关于新环境新系统的配置了。
打开下载解压的activemq文件夹,在目录 ..\apache-activemq-5.11.1-bin\apache-activemq-5.11.1\bin\ 中启动对应的版本,比如螃蟹的电脑为64位,如果已经安装了安装服务(InstallService.bat),直接运行warpper.exe即可
如下图所示,则启动成功:
这时我们就可以运行测试官方给我们提供的示例(Example)了。
但是有不少朋友在运行的时候不能够到达预期效果,这里螃蟹就简单归纳几点可能出现问题的地方: 内容来自
1、端口占用
Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize= due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind
这是由于端口占用引起的,可参考《》一文解决 IT学习者()
2、环境变量
现在apache部分开源项目的jdk为1.7,所以在使用1.6及以下进行编译的时候会提示版本异常的错误提示:
Unsupported major.minor version 51.0
或者在eclipse中运行时则出现如下错误:
Exception in thread &main& javax.jms.JMSException: Connection refused: no further information
at org.fusesource.stomp.jms.StompJmsExceptionSupport.create(StompJmsExceptionSupport.java:59)
at org.fusesource.stomp.jms.StompChannel.connect(StompChannel.java:138)
at org.fusesource.stomp.jms.StompJmsConnection.getChannel(StompJmsConnection.java:353)
at org.fusesource.stomp.jms.StompJmsConnection.connect(StompJmsConnection.java:400)
at org.fusesource.stomp.jms.StompJmsConnection.start(StompJmsConnection.java:200)
at example.Publisher.main(Publisher.java:45)
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
at org.fusesource.hawtdispatch.transport.TcpTransport$2$1$1.run(TcpTransport.java:485)
at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
只要版环境变量更换为jdk1.7即可:
这时候再运行example中的示例(如:stomp-example)便可正常运行了:
细心的朋友可以看到在eclipse中螃蟹依然使用的是jdk1.6,但可以正常运行,这是为什么呢?
这是因为在eclipse中运行的activemq的代码只需要1.6+就可以了,但是安装activemq的服务是需要jdk1.7的支持的。
& IT学习者()
文章除注明转载外,均为原创或编译
欢迎任何形式的转载,但务必请以超链接形式注明出处
本文出自:IT学习者
链接地址:
下一篇:没有了
更多相关资讯
apache-activemq-5.11.1教程
apache-activemq-5.11.
apache-activemq-5.11.1教程
评论列表(网友评论仅供网友表达个人看法,并不表明本站同意其观点或证实其描述)
Copyright & 2014
All rights reserved.&&
联系方式(QQ): |ActiveMQ使用与部署
JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,中文称Java消息服务。JMS是一种与厂商无关的 API,用来访问消息收发系统消息;就像JDBC(Java Database Connectivity)可以用来访问许多不同关系数据库一样,JMS则提供同样与厂商无关的访问方法,以访问消息收发服务。
JMS是在Java标准化组织(JCP)内开发的标准(代号JSR 914)。日,发布了JMS 1.0.2b,日发布了JMS1.1,统一了消息域。
JMS规范约定了两种消息方式:P2P(Point To Point)和发布/订阅(Publish/Subscribe)。P2P是用来进行两个节点之间的点对点通信;发布/订阅则是用于一个发布者和多个订阅者之间的通信。
MQ是Message Queue的缩写,中文称消息队列,MQ是在消息的传输过程中保存消息的容器。消息管理器在将消息从它的源中继到它的目标时充当中间人;队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
MQ有很多不同厂商和语言实现的产品,如Sun MQ、Microsoft MQ、IBM MQ的等商业产品;Java语言实现的MQ,则有ActiveMQ、基于JMS标准的OpenJMS,以及新近的迁移项目Jafka等开源项目。
1.3 二者关系
JMS是一个用于提供消息服务的技术规范,TA制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。
2 MQ产品介绍
2.1 ActiveMQ
ActiveMQ是Apache的一个顶级Java开源项目,也是目前最流行的,能力强劲的开源消息队列。ActiveMQ是比较老牌的MQ,2004年由Apache开源孵化,2007年成为Apache顶级项目。最新版本已经更新到5.8.0,TA拥有众多特性:
&#61618; 支持Java,C/C++,C#,Ruby,Perl,Python,PHP等多种语言实现客户端和协议。
&#61618; 完全支持企业集成模式。
&#61618; 支持消息分组、虚拟目标及复合目标等高级特性。
&#61618; 完全支持JMS1.1,和J2EE1.4(持久化、事务,及XA消息)。
&#61618; 提供对Spring的支持,可以很容易内嵌到使用Spring的系统中。
&#61618; 通过了常规的J2EE服务器(如TomEE、Geronimo、Jboss、GlassFish、WebLogic等)的测试。
&#61618; 支持多种传输协议,如:in-VM、TCP、SSL、NIO、UDP、multicast、Jgroups、JXTA等。
&#61618; 支持通过JDBC和journal提供高速的消息持久化。
&#61618; 从设计上保证了高性能的集群、客户端-服务器、点对点通信。
&#61618; 支持基于Web的API及其他方式的REST调用。
&#61618; 支持Ajax。
&#61618; 支持CXF和Axis。
&#61618; 支持用作内嵌JMS provider,进行测试。
2.2 其他产品
2.2.1 OpenJMS
OpenJMS是完全基于JMS1.1规范实现的JMS provider,有以下特性:
&#61618; 支持通过JDBC提供消息持久化。
&#61618; 支持Applet。
&#61618; 能够与Jakarta Tomcat这样的Servlet容器结合。
&#61618; 支持RMI、TCP、HTTP与SSL协议。
&#61618; 提供可靠消息传输、事务和消息过滤。
2.2.2 Jafka
基于scala语言开发的分布式发布订阅消息系统Kafka的Java移植版,功能比较简单,但其最大的特色分布式和高性能:
&#61618; 消息持久化非常快,服务端存储消息的开销为O(1)。
&#61618; 基于文件系统,能够持久化TB级的消息而不损失性能。
&#61618; 吞吐量很大,同等机器配置下,Jafka吞吐量比同类MQ均高。
&#61618; 完全的分布式系统,broker、producer、consumer都原生自动支持分布式;自动实现复杂均衡。
&#61618; 内核非常小,内部机制简洁,适合进行内嵌或者二次开发。
&#61618; 消息格式以及通信机制非常简单,适合进行跨语言开发。
3 ActiveMQ的使用
ActiveMQ的使用分为两种,一种是嵌入式,即把ActiveMQ作为内嵌的JMS provider集成到Tomcat或其他Web服务器中;另一种是独立部署,即ActiveMQ单独部署,独立运行,通过ActiveMQ提供的API进行进程外访问。
我们采用独立部署的方式使用ActiveMQ,这样的做法更有利于业务的解耦和工程部署结构的分离;也有利于对ActiveMQ进行配置定制和优化。
我们知道JMS规范中约定了两种消息发送方式:P2P和Publish/Subscribe。根据业务的需求,我们采用P/S方式更为合理,也能更好的发挥ActiveMQ本身的优势。
3.1 消息的发布
// 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(&tcp://localhost:61616&);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(&myTopic.messages&);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while(true) {
TextMessage message = session.createTextMessage();
message.setText(&TIME:& + (new Date()).toLocaleString());
// 发布主题消息
producer.send(message);
System.out.println(&Sent message: & + message.getText());
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
3.2 消息的订阅
// 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(&tcp://localhost:61616&);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(&myTopic.messages&);
// 创建订阅
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
// 订阅接收方法
public void onMessage(Message message) {
TextMessage tm = (TextMessage)
System.out.println(&Received message: & + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
3.3 消息的持久订阅
// 连接到ActiveMQ服务器
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(&tcp://localhost:61616&);
Connection connection = factory.createConnection();
String ip = &&;
// 获取本机IP
InetAddress addr = InetAddress.getLocalHost();
ip = addr.getHostAddress().toString();
} catch (UnknownHostException ex) {
ex.printStackTrace();
if(!&&.equals(ip)) {
System.out.println(&CLIENT: & + ip);
// 设置订阅客户端ID
connection.setClientID(ip);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建主题
Topic topic = session.createTopic(&myTopic.messages&);
// 创建订阅
MessageConsumer consumer = session.createDurableSubscriber(topic, &test&);
consumer.setMessageListener(new MessageListener() {
// 订阅接收方法
public void onMessage(Message message) {
TextMessage tm = (TextMessage)
System.out.println(&Received message: & + tm.getText());
} catch (JMSException e) {
e.printStackTrace();
3.4 注意事项
&#61557; 普通订阅的情况下,客户端只有在连接到服务器的情况下,才能接收服务器上的主题消息。
&#61557; 持久订阅后,当客户端在线时,服务器端会把客户端在上次下线之后到本次上线之间的所有消息一并推送给客户端;这样就保证了客户端不会有丢失的消息。
&#61557; 持久订阅会引发另一个问题:当新增一个订阅客户端时,这个客户端会收到服务器上该主题下的所有未过期消息。
4 ActiveMQ的部署
&#61557; 安装JDK(1.4以上即可)。
&#61557; 设置JAVA_HOME环境变量。
&#61557; 直接解压ActiveMQ压缩包。
默认配置下,执行解压目录下bin/activemq.bat即可正常运行。之后,可以通过访问http://localhost:8161/admin查看ActiveMQ的运行情况(默认用户名和密码为admin/admin)。
ActiveMQ的配置存放在安装目录的conf/activemq.xml文件中。
因为ActiveMQ采用了Jetty作为容器,因此Jetty相关的配置在conf/jetty.xml文件中。
5.1 消息持久化
默认的情况下,ActiveMQ的消息持久化是基于文件系统的KahaDB。我们可以通过配置,让ActiveMQ使用MySQL实现消息持久化:
&#61557; 将MySQL的jar包复制到安装目录的lib下。
&#61557; 修改配置文件:
&persistenceAdapter&
&jdbcPersistenceAdapter dataDirectory=&${activemq.base}/data& dataSource=&#derby-ds& /&
&/persistenceAdapter&
&#61557; 增加节点(与broker节点同级):
&bean id=&derby-ds& class=&mons.dbcp.BasicDataSource& destroy-method=&close&&
&property name=&driverClassName& value=&com.mysql.jdbc.Driver& /&
&property name=&url& value=&jdbc:mysql://localhost/activemq?relaxAutoCommit=true& /&
&property name=&username& value=&activemq& /&
&property name=&password& value=&activemq& /&
&property name=&maxActive& value=&200& /&
&property name=&poolPreparedStatements& value=&true& /&
&#61557; 然后,重启ActiveMQ即可。
ActiveMQ的集群由服务器端和客户端共同完成。服务器端通过部署Master/Slaver机制,通过进行分布式部署,以实现服务器集群的平行扩展。而客户端则采取静态地址发现,或者动态地址发现的方式,实现服务器的负载均衡选择。
5.2.1 服务器端的部署
ActiveMQ支持Master/Slaver机制,但简单Master/Slaver方式有一定的局限性,不适合服务器集群的平行扩展(当然,简单Master/Slaver已经足够支撑一般的商业应用)。因此,ActiveMQ提供了支持大并发请求的集群方式:共享文件系统的集群,以及基于JDBC的集群。
&#61557; 共享文件系统的集群
实际上就是基于文件系统进行集群部署(前面提到过,ActiveMQ默认的消息存储就是基于文件系统的),可以通过分布式存储系统或共享数据目录来实现。这种方式只需要修改conf/activemq.xml:
&persistenceAdapter&
&journaledJDBC dataDirectory=&/sharedFileSystem/broker&/&
&/persistenceAdapter&
&#61557; 基于JDBC的集群
原理与共享文件系统一致,只不过把文件系统换成了平台。即:多台ActiveMQ连接同一个数据库,从而实现ActiveMQ的服务器集群。配置同5.1。
5.2.2 客户端的使用
服务器端的集群对客户端而言是透明的,但如果客户端希望得到集群和负载均衡的功能支持,则必须在代码中有所体现。
最常规的方法就是failover协议,fileover支持客户端在当前服务器断开的情况下,自动重新连接到新的服务器上,而新的服务器地址可以来源于静态地址列表,也可以来源于动态地址广播。
&#61557; 静态地址发现的常规用法
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(&failover:(tcp://primary:61616,tcp://secondary:61616)?randomize=false&);
&#61557; 动态地址发现的常规用法
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(&failover:(multicast://host:6255)&);
当然,ActiveMQ还支持更多的协议,如:fanout、discovery等。
5.3 管理与监控
ActiveMQ提供了一个Web后台用于查看服务器运行状态,并提供了对消息队列、主题及订阅者等进行管理的功能。
另外,ActiveMQ也可以通过配置支持Nagios的集成监控。
(window.slotbydup=window.slotbydup || []).push({
id: '2467140',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467141',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467143',
container: s,
size: '1000,90',
display: 'inlay-fix'
(window.slotbydup=window.slotbydup || []).push({
id: '2467148',
container: s,
size: '1000,90',
display: 'inlay-fix'

我要回帖

更多关于 activemq 部署 的文章

 

随机推荐