随着互联网企业业务量的不断扩大,企业信息网络系统的愈加复杂,性能问题也就越来越凸显出来,串行的业务处理方式显然已经成为主要的瓶颈,我们需要更多异步的并行处理来提高企业信息系统的业务处理能力,因此独立的消息处理系统也就应运而生,ActiveMQ 就是诸多开源消息系统的佼佼者。对于我们的技术选型来说,稳定和适应性是最重要的考虑因素,因此由 Apache 组织背景而且支持发布/订阅(Pub/Sub)模式以及异步 Stomp 协议(Streaming Text Orientated Messaging Protocol)和 REST 方式的 ActiveMQ 就成为了首选的消息处理器,经测试在异步模式下,整个系统的信息处理吞吐量还是很理想的(一台普通服务器能达到每秒收发 4000 个消息,每个消息 4K 字节这样的速度)。
下面我们来看看 ActiveMQ 的基本配置与使用,由于我们公司大部分的内部服务都是使用脚本语言写的,比如 PHP/Python 等,所以我们使用 Stomp 协议来处理;另外,为了尽可能提高消息“生产者”的效率,我们采用了异步 nio 模式;还有关于并发处理以及负载均衡方面还有很多需要注意的配置和选项,我们后面会另找时间补充。
1、下载 ActiveMQ(最新版本 5.4.1)
2、解压安装到 /usr/local/activemq 目录下
3、安装初始化配置文件 ./bin/activemq setup /etc/default/activemq (保存至默认位置,以后可微调启动参数)
4、编辑 ./conf/activemq.xml 加入如下段(authentication / stomp+nio):
...
<plugins>
<!-- Add By James ; Configure authentication; Username, passwords and groups -->
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="system" password="${activemq.password}" groups="users,admins"/>
<authenticationUser username="user" password="${guest.password}" groups="users"/>
<authenticationUser username="guest" password="${guest.password}" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
...
<!-- Add by James for support stomp protocol -->
<transportConnector name="stomp+nio" uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false"/>
...
5、使用 Stomp 库对 ActiveMQ 进行收发消息测试(这里不提供 Java 的例子,因为 Google 上已经有太多了,我们以 PHP 作为脚本语言的程序范例)。
a> 以下是 Stomp 协议的简单命令:
* SEND
* SUBSCRIBE
* UNSUBSCRIBE
* BEGIN
* COMMIT
* ABORT
* ACK
* DISCONNECT
b> PHP 实例(transaction / persistent / ack)
关于 Stomp 的 PHP 库可以到 Google Code 上下载,使用以下测试用例的时候可以配合 http://activemq-hostname:8161/admin/ 查看消息的数量和状态。
c> amq_sent.php(生产者示例)
...
// include a library
require_once("Stomp.php");
// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");
// connect
$con->connect("guest", "password");
// begin transaction
$con->begin("tx1");
try {
// build a message by map
require_once("Stomp/Message/Map.php");
$msgBody = array("city"=>"Belgrade", "name"=>"Dejan");
$msgHeader["persistent"] = "true"; // VERY IMPORTANT : Because By default, Stomp produced messages are set to non-persistent.
$msgHeader["transformation"] = "jms-map-json";
$mapMessage = new StompMessageMap($msgBody, $msgHeader);
// send the message to the queue
$con->send("/queue/test", $mapMessage, array("transaction" => "tx1"));
// test rollback logic
// throw new Exception("Sending failed ...");
// commit sending message
$con->commit("tx1");
// print message
echo "Sent message : ";
print_r($msgBody);
} catch (Exception $e) {
// rollback sending
$con->abort("tx1");
// print message
echo $e->getMessage();
}
// disconnect
$con->disconnect();
...
d> amq_recv.php(消费者示例)
...
// include a library
require_once("Stomp.php");
// make a connection
$con = new Stomp("tcp://192.168.1.11:61613");
// connect with authentication
$con->connect("guest", "password");
// set read timeout
$con->setReadTimeout(1);
// subscribe to the queue
$con->subscribe("/queue/test", array("transformation" => "jms-map-json"));
// receive a message from the queue
$msg = $con->readFrame();
// do what you want with the message
if ( $msg != null) {
echo "Received message : ";
print_r($msg);
// mark the message as received in the queue
$con->ack($msg);
} else {
echo "Failed to receive a message/n";
}
// disconnect
$con->disconnect();
...
6、目前 ActiveMQ 最新版的消息数据是保存在一个快速的文本数据库 kahadb 里面的,使用虽然速度很快但是一旦出错恢复起来总是有很多的问题。所以我建议大家还是把主流数据库的持久化选项打开,虽然慢一点但是数据至少恢复起来要简单不少,我们使用 Mysql 来保存持久化的消息数据,数据库是 activemq,配置在 ./conf/activemq.xml 中,如下:
...
<persistenceFactory>
<journalPersistenceAdapterFactory dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceFactory>
...
<!-- MySql DataSource Sample Setup -->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://192.168.1.10:11811/activemq?relaxAutoCommit=true"/>
<property name="username" value="admin"/>
<property name="password" value="ihush2010"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
...
总结&注意事项:
1、Stomp 协议构造的消息默认是非持久化的,如果要让消息持久化必须加上 "persistent:true" 的消息头,这个搞了我半天时间~
2、编程时候还是需要注意“消费者”的运行效率,因为大量“慢消费者”会在非持久的 Topics 上出现问题,特别在多消费者的情况下,容易造成 ActiveMQ 的反应变慢~
3、如果不是使用 Stomp 协议和 ActiveMQ 建立通讯的情况下尽量使用长连接,Connection 的 start() 和 stop() 方法代价很高。
4、如果使用集群 master-slave 也是可以的,但是总是不够稳定,建议还是使用单台服务器的模式。
分享到:
相关推荐
中间件技术 实验三 消息中间件应用开发: - CSDN博客 https://blog.csdn.net/lly1122334/article/details/80139790
java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)
标签:activemq-stomp-5.10.0.jar,activemq,stomp,5.10.0,jar包下载,依赖包
activemq消息中间件-视频教程activemq消息中间件-视频教程activemq消息中间件-视频教程activemq消息中间件-视频教程
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。 下载本压缩包后解压运行里面的activemq.bat即可。 http://192.168.0.61:8161/ 是管理ActiveMQ的后台管理系统入口, 如需在JAVA中使用,请下载本人的 ...
go语言实现的读取、发送消息到activemq,使用的stomp协议。
当前使用较多的消息中间件有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ等。本套视频以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,带你从零基础入门到熟练掌握ActiveMQ,能够结合...
标签:activemq-stomp-5.10.0-sources.jar,activemq,stomp,5.10.0,sources,jar包下载,依赖包
php 版本 activemq+stomp.php,连接信息源代码+xml解析实例+实例代码demo
消息中间件应用开发: ActiveMQ实现单 线程多队列-Java代码类资源 中间件技术 消息中间件应用开发
消息中间件-ActiveMQ
消息中间件-activeMQ
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。
教程视频:Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件
主要讲解activemq的安装,使用,集群的搭建,以及拓展
消息中间件是一门极为重要的技术。在项目开发中使用到的频率很高。该技术能够很好的解决我们项目之间数据传输的问题。相比于dubbo等RPC远程调用技术,消息中间件更灵活,能够解耦项目之间的依赖,能够消峰、异步等。...
网上关于在LINUX下用C连接ACTIVEMQ的资料实在太少了,在自己搜索几天资料后,将这些内容整合在一起,有关ACTIVEMQ,APR,STOMP一起使用的内容,资料还不够详细,只希望给大家带来帮助!
ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通信。它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通信中间件。ActiveMQ 实现了 JMS 1.1...
activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。