activemq spring 整合整合RabbitMQ 一直收到重复消息怎么处理

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&从一个 Spring 应用程序入门使用 RabbitMQ 服务
此教程讲解了如何在 Java 和 Spring 应用程序中使用用于 Cloud Foundry 的 RabbitMQ 服务。
消息传递在 Spring Web 应用程序的上下文中具有不同方式的用途。参见 RabbitMQ 主站点上区域,以获得一些思路。在教程中,我们将构建一个使用 RabbitMQ 的非常简单的应用程序,以将注意力集中于连接 RabbitMQ 服务的基础知识。当您理解这个教程后,您将能够将更实用的服务应用整合进您的 Cloud Foundry 上的 Spring 应用程序中。
此教程包括在 Cloud Foundry 上创建简单 Spring 应用程序以及将其与 RabbitMQ 服务挂钩的整个过程。如果您已经在 Cloud Foundry 上拥有自己的应用程序,您应该已经熟悉此处的大部分内容。如果是这样,您可直接跳到最后一部分内容,即讨论如何从您的应用程序访问 RabbitMQ 服务的部分。您还可以在
上找到已完成的应用程序的完整代码。
我们将构建的应用程序将由一个单个页面组成,它看上去像这样:
此应用程序能够发布消息至单个 RabbitMQ 队列和从单个 RabbitMQ 队列获取消息。我们可以在输入框中键入消息,单击“发布”按钮将其发布至队列。另外,我们可以单击获取按钮从队列得到消息,下一条消息将被显示,或者将会告诉您队列为空。
在开始之前,您的开发计算机上须已经安装一些新程序。此指南假定您已经安装
此教程将使用与 Cloud Foundry 进行交互的 vmc 命令行工具。如果您尚未安装 vmc,请遵循 的说明。请注意 vmc 会不断被增强,因此即使您已经安装了它,您也需要定期通过以下方法对其进行更新:
$ gem update vmc
vmc 不是与 Cloud Foundry 一起开发的唯一方法:在 Eclipse 开发环境中支持 Clound Foundry。参见,查看如何在 STS 中使用 Cloud Foundry。
创建一个 Spring MVC 初始应用程序
现在我们将按照文章
所述创建一个最小的 Spring MVC 应用程序,并将其推送至 Cloud Foundry。应用程序源由五个文件组成。文件位置将遵循 Maven 常规项目布局。
src/main/webapp/WEB-INF/web.xml 直接交给 Spring 框架:
&!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"/dtd/web-app_2_3.dtd" &
&web-app version="2.5" xmlns="/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="/xml/ns/javaee /xml/ns/javaee/web-app_2_5.xsd"&
&servlet-name&appServlet&/servlet-name&
&servlet-class&org.springframework.web.servlet.DispatcherServlet&/servlet-class&
&init-param&
&param-name&contextConfigLocation&/param-name&
&param-value&/WEB-INF/spring/servlet-context.xml&/param-value&
&/init-param&
&load-on-startup&1&/load-on-startup&
&/servlet&
&servlet-mapping&
&servlet-name&appServlet&/servlet-name&
&url-pattern&/&/url-pattern&
&/servlet-mapping&
&/web-app&
Spring 应用程序上下文 XML 文件 src/main/webapp/WEB-INF/spring/servlet-context.xml 是最小的 Spring MVC 应用程序,它直接声明将对 MVC 使用基于注释的配置,而且将打包并扫描:
&?xml version="1.0" encoding="UTF-8"?&
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"&
&context:component-scan base-package="com.rabbitmq.cftutorial.simple"/&
&mvc:annotation-driven/&
控制器类 src/main/java/com/rabbitmq/cftutorial/simple/HomeController.java 直接交给 JSP 模板:
package com.rabbitmq.cftutorial.simple;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
public class HomeController {
@RequestMapping(value = "/")
public String home() {
return "WEB-INF/views/home.jsp";
而 JSP 模板 src/main/webapp/WEB-INF/views/home.jsp 将显示一个简单的静态页面。
&%@ taglib uri="/jsp/jstl/core" prefix="c" %&
&%@ page session="false" %&
&title&Simple RabbitMQ Application&/title&
&h1&Simple RabbitMQ Application&/h1&
最后,pom.xml 文件向 Maven 描述该项目:
&project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"&
&modelVersion&4.0.0&/modelVersion&
&groupId&com.rabbitmq.cftutorial&/groupId&
&artifactId&simple&/artifactId&
&packaging&war&/packaging&
&version&1.0-SNAPSHOT&/version&
&name&cftutorial-simple&/name&
&description&Simple RabbitMQ Application&/description&
&properties&
&java-version&1.6&/java-version&
&org.springframework-version&3.0.5.RELEASE&/org.springframework-version&
&/properties&
&dependencies&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-context&/artifactId&
&version&${org.springframework-version}&/version&
&/dependency&
&dependency&
&groupId&org.springframework&/groupId&
&artifactId&spring-webmvc&/artifactId&
&version&${org.springframework-version}&/version&
&/dependency&
&dependency&
&groupId&javax.servlet&/groupId&
&artifactId&jstl&/artifactId&
&version&1.2&/version&
&/dependency&
&/dependencies&
&groupId&org.apache.maven.plugins&/groupId&
&artifactId&maven-compiler-plugin&/artifactId&
&configuration&
&source&${java-version}&/source&
&target&${java-version}&/target&
&/configuration&
&/plugins&
&/project&
下一步我们构建项目:
$ mvn package
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESSFUL
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2 seconds
[INFO] Finished at: Tue Aug 02 14:04:59 BST 2011
[INFO] Final Memory: 17M/321M
[INFO] ------------------------------------------------------------------------
现在我们已准备好向 Cloud Foundry 推送应用程序。在进行 vmc 推送前更改为目标目录。这里,我将把我的应用程序称为“rabbit-simple”,但是您应该选择您自己的应用程序名称。
我们还可以在此时将服务与我们的应用程序捆绑。但是此时我们将不添加 rabbitmq 服务 — 我们将在初始应用程序运行后再这样做。
$ cd target
$ vmc push
Would you like to deploy from the current directory? [Yn]: y
Application Name: rabbit-simple
Application Deployed URL: 'rabbit-'?
Detected a Java SpringSource Spring Application, is this correct? [Yn]: y
Memory Reservation [Default:512M] (64M, 128M, 256M or 512M)
Creating Application: OK
Would you like to bind any services to 'rabbit-simple'? [yN]: n
Uploading Application:
Checking for available resources: OK
Processing resources: OK
Packing application: OK
Uploading (3K): OK
Push Status: OK
Staging Application: OK
Starting Application: OK
如果操作成功,您应该能够访问应用程序的 URL 和查看主页:
扩展应用程序
我们已有在 Cloud Foundry 上运行的最小的 Spring MVC 应用程序,现在我们将添加演示 RabbitMQ 服务用途的所需部件。
我们使用页面元素扩展在 src/main/webapp/WEB-INF/views/home.jsp 处的 JSP 模板,以构建介绍部分中所示的页面:
&%@ taglib uri="/jsp/jstl/core" prefix="c" %&
&%@ taglib prefix="form" uri="http://www.springframework.org/tags/form" %&
&%@ page session="false" %&
&title&Simple RabbitMQ Application&/title&
&h1&Simple RabbitMQ Application&/h1&
&h2&Publish a message&/h2&
&form:form modelAttribute="message" action="/publish" method="post"&
&form:label for="value" path="value"&Message to publish:&/form:label&
&form:input path="value" type="text"/&
&input type="submit" value="Publish"/&
&/form:form&
&c:if test="${published}"&
&p&Published a message!&/p&
&h2&Get a message&/h2&
&form:form action="/get" method="post"&
&input type="submit" value="Get one"/&
&/form:form&
&c:choose&
&c:when test="${got_queue_empty}"&
&p&Queue empty&/p&
&c:when test="${got != null}"&
&p&Got message: &c:out value="${got}"/&&/p&
&/c:choose&
我们需要对控制器类添加操作,以支持发布和获取表单:
package com.rabbitmq.cftutorial.simple;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.ui.Model;
@Controller
public class HomeController {
@RequestMapping(value = "/")
public String home(Model model) {
model.addAttribute(new Message());
return "WEB-INF/views/home.jsp";
@RequestMapping(value = "/publish", method=RequestMethod.POST)
public String publish(Model model) {
return home(model);
@RequestMapping(value = "/get", method=RequestMethod.POST)
public String get(Model model) {
return home(model);
我们还需添加一个消息类,以支持发布表单。它仅为消息保存字符串值。
package com.rabbitmq.cftutorial.simple;
public class Message {
private String value;
public String getValue() {
return value;
public void setValue(String value) {
this.value = value;
通过这些添加项,我们可在 Cloud Foundry 上更新应用程序,并可查看:
$ mvn package
$ cd target
$ vmc update rabbit-simple
Uploading Application:
Checking for available resources: OK
Processing resources: OK
Packing application: OK
Uploading (3K): OK
Push Status: OK
Stopping Application: OK
Staging Application: OK
Starting Application: OK
您应可查看上文介绍部分中所示的页面。在下一部分中,我们将在控制器中填写操作的代码以使应用程序可完全正常运作。
使用 RabbitMQ 服务
在此部分中,我们将使用先前部分中已创建的简单应用程序,并让其使用 RabbitMQ 服务。
首选,我们将在 Cloud Foundry 上创建 RabbitMQ 服务的一个实例:
$ vmc create-service
1. rabbitmq
3. mongodb
Please select one you wish to provision: 1
Creating Service [rabbitmq-aaaad]: OK
在此时,服务实例已被成功创建。为了使其对我们的应用程序可用,我们还需要捆绑它:
$ vmc bind-service rabbitmq-aaaad rabbit-simple
Binding Service: OK
Stopping Application: OK
Staging Application: OK
Starting Application: OK
vmc 应用程序命令确认服务已与我们的应用程序捆绑:
$ vmc apps
+-----------------+----+---------+----------------------------------+-----------------------------+
| Application
| Services
+-----------------+----+---------+----------------------------------+-----------------------------+
| rabbit-simple
| RUNNING | rabbit-
| rabbitmq-aaaad
+-----------------+----+---------+----------------------------------+-----------------------------+
协议(RabbitMQ 支持 AMQP 版本 0.8 和 0.9.1)访问 RabbitMQ 服务。因此我们需要使用 AMQP 客户端库。幸运的是,已有一个 Spring AMQP 项目允许 AMQP 应用程序使用 Spring 概念。我们还将使用 cloudfoundry-runtime jar 以便从 Spring 应用程序对 Cloud Foundry 服务的访问,包括 RabbitMQ。因此我们将添加相应的依赖项至 pom.xml 文件:
&properties&
&/properties&
&repositories&
&repository&
&id&spring-milestone&/id&
&name&Spring Maven MILESTONE Repository&/name&
&url&http://maven.springframework.org/milestone&/url&
&/repository&
&/repositories&
&dependencies&
&dependency&
&groupId&cglib&/groupId&
&artifactId&cglib-nodep&/artifactId&
&version&2.2&/version&
&/dependency&
&dependency&
&groupId&org.springframework.amqp&/groupId&
&artifactId&spring-rabbit&/artifactId&
&version&1.0.0.RC2&/version&
&/dependency&
&dependency&
&groupId&org.cloudfoundry&/groupId&
&artifactId&cloudfoundry-runtime&/artifactId&
&version&0.7.1&/version&
&/dependency&
&/dependencies&
然后我们将扩展 Spring 应用程序上下文 XML。添加项:
使用 cloudfoundry-runtime 取得与 RabbitMQ 服务的一个连接。
配置作为 Spring AMQP 主要入口点的 RabbitTemplate 和 RabbitAdmin。
在 RabbitMQ 代理中声明一个队列调用的消息。
有关更多详细信息,请查看 。
&beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:cloud="http://schema.cloudfoundry.org/spring"
xsi:schemaLocation="http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://schema.cloudfoundry.org/spring http://schema.cloudfoundry.org/spring/cloudfoundry-spring-0.7.xsd"&
&context:component-scan base-package="com.rabbitmq.cftutorial.simple"/&
&mvc:annotation-driven/&
&!-- Obtain a connection to the RabbitMQ via cloudfoundry-runtime: --&
&cloud:rabbit-connection-factory id="connectionFactory"/&
&!-- Set up the AmqpTemplate/RabbitTemplate: --&
&rabbit:template connection-factory="connectionFactory"/&
&!-- Request that queues, exchanges and bindings be automatically
declared on the broker: --&
&rabbit:admin connection-factory="connectionFactory"/&
&!-- Declare the "messages" queue: --&
&rabbit:queue name="messages" durable="true"/&
最后,实际发布和获取消息的 HomeController 变更是十分简单的:
package com.rabbitmq.cftutorial.simple;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.ui.Model;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.amqp.core.AmqpTemplate;
@Controller
public class HomeController {
@Autowired AmqpTemplate amqpTemplate;
@RequestMapping(value = "/")
public String home(Model model) {
model.addAttribute(new Message());
return "WEB-INF/views/home.jsp";
@RequestMapping(value = "/publish", method=RequestMethod.POST)
public String publish(Model model, Message message) {
// Send a message to the "messages" queue
amqpTemplate.convertAndSend("messages", message.getValue());
model.addAttribute("published", true);
return home(model);
@RequestMapping(value = "/get", method=RequestMethod.POST)
public String get(Model model) {
// Receive a message from the "messages" queue
String message = (String)amqpTemplate.receiveAndConvert("messages");
if (message != null)
model.addAttribute("got", message);
model.addAttribute("got_queue_empty", true);
return home(model);
现在一切就绪,最后的 vmc 更新将使应用程序完全正常运作。我们可以发布一条消息:
并收回这条消息:
教程的内容就此结束。您可在 找到关于 RabbitMQ 和 AMQP 的更多资源。您可在
网站找到关于在 Spring 应用程序中使用 RabbitMQ 的更多信息。如果您对此教程或 RabbitMQ 服务存在任何疑问或反馈,请使用
的论坛与我们联系。上篇文章中,我们把每个Message都是deliver(提供)到某个Consumer。在这篇文章中,我们将会将同一个Message deliver(提供)到多个Consumer中。这个模式也被成为 "publish / subscribe"。
&&& 这篇文章中,我们将创建一个日志系统,它包含两个部分:第一个部分是发出log(Producer),第二个部分接收到并打印(Consumer)。 我们将构建两个Consumer,第一个将log写到物理磁盘上;第二个将log输出的屏幕。
1. Exchanges
&& 关于exchange的概念在《》中有详细介绍。现在做一下简单的回顾。
&& RabbitMQ 的Messaging Model就是Producer并不会直接发送Message到queue。实际上,Producer并不知道它发送的Message是否已经到达queue。
&& Producer发送的Message实际上是发到了Exchange中。Exchange的功能也很简单:从Producer接收Message,然后投递到queue中。Exchange需要知道如何处理Message,是把它放到一个queue中,还是放到多个queue中?这个rule是通过 Exchange 的类型定义的。
我们知道有三种类型的Exchange:direct, topic 和fanout。fanout就是广播模式,会将所有的Message都放到它所知道的queue中。创建一个名字为logs,类型为fanout的Exchange。
channel.ExchangeDeclare("logs", "fanout");
2. Temporary queues
&&& 截至现在,我们用的queue都是有名字的:第一个是hello,第二个是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成为可能。
&&& 但是对于我们将要构建的日志系统,并不需要有名字的queue。我们希望得到所有的log,而不是它们中间的一部分。而且我们只对当前的log感兴趣。为了实现这个目标,我们需要两件事情:
&&& 1) 每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。基本上都是这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
&&&& 2)当Consumer关闭连接时,这个queue要被deleted。可以加个exclusive的参数。方法:NET好像不用设置exclusive,Consumer关闭之后会自动删掉Queue。
channel.QueueDeclare(queueName, durable, exclusive, autoDelete, null);
3. Bindings绑定
现在我们已经创建了fanout类型的exchange和没有名字的queue(实际上是RabbitMQ帮我们取了名字)。那exchange怎么样知道它的Message发送到哪个queue呢?答案就是通过bindings:绑定。
channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//广播QueueDeclareOk queueOk = channel.QueueDeclare();var queueName = queueOk.QueueN
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//不需要指定routing key,设置了fanout,指了也没有用.
现在logs的exchange就将它的Message附加到我们创建的queue了。
完整的fanout例子:
Producer.cs
1 static void Main(string[] args)
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
const string EXCHANGE_NAME = "logs";
const string ROUTING_KEY = "";
channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//广播
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(EXCHANGE_NAME, ROUTING_KEY, null, body);//不需要指定routing key,设置了fanout,指了也没有用.
Console.WriteLine(" [x] Sent {0}", message);
Consumer.cs
1 static void Main(string[] args)
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
const string EXCHANGE_NAME = "logs";
const string ROUTING_KEY = "";
channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//广播
QueueDeclareOk queueOk = channel.QueueDeclare();//每当Consumer连接时,我们需要一个新的,空的queue。因为我们不对老的log感兴趣。幸运的是,如果在声明queue时不指定名字,那么RabbitMQ会随机为我们选择这个名字。
////现在我们已经创建了fanout类型的exchange和没有名字的queue(实际上是RabbitMQ帮我们取了名字)。
////那exchange怎么样知道它的Message发送到哪个queue呢?答案就是通过bindings:绑定。
string queueName = queueOk.QueueN//得到RabbitMQ帮我们取了名字
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//不需要指定routing key,设置了fanout,指了也没有用.
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
while (true)
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
var body = ea.B
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
/tutorials/tutorial-three-dotnet.html(官网)
http://blog.csdn.net/anzhsoft/article/details/(翻译)
阅读(...) 评论()

我要回帖

更多关于 activemq spring 整合 的文章

 

随机推荐