当前位置:Gxlcms > 数据库问题 > JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中

时间:2021-07-01 10:21:17 帮助过:71人阅读

<!--
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS"/>
</persistenceAdapter>
技术分享

该配置表示,我们将要使用一个叫做“MySQL-DS”的JDBC数据源;
3、配置MySQL数据源:在</broker>节点后面,增加MySQL数据源配置:

技术分享 <!-- MySQL DataSource -->
<bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8"/>
<property name="username" value="misc_root"/>
<property name="password" value="misc_root_pwd"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
技术分享

其实这就是一个Spring的Bean的配置,注意id与上面的保持一致;

整个AMQ的配置文件内容为:

技术分享 <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>classpath:/META-INF/credentials.properties</value>
</property>
</bean>

<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost">
<!--
For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it‘s probably due to producer flow control. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<!--
Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:

http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false" />
</managementContext>

<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:

http://activemq.apache.org/persistence.html
-->
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#MySQL-DS" />
</persistenceAdapter>

<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:

http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616" />
</transportConnectors>
</broker> <!-- MySQL DataSource --> <bean id="MySQL-DS" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://127.0.0.1:3306/misc?useUnicode=true&amp;characterEncoding=UTF-8" /> <property name="username" value="misc_root" /> <property name="password" value="misc_root_pwd" /> <property name="poolPreparedStatements" value="true" /> </bean> <!-- Enable web consoles, REST and Ajax APIs and demos It also includes Camel (with its web console), see ${ACTIVEMQ_HOME}/conf/camel.xml for more info Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details --> <import resource="jetty.xml"/></beans>
技术分享


四、查看MySQL数据表
重新启动AMQ,启动完成之后,我们发现,misc数据库多了3张数据表:

mysql> SHOW tables; +----------------+ | Tables_in_misc | +----------------+ | activemq_acks  | | activemq_lock  | | activemq_msgs  | +----------------+

数据表activemq_msgs即为持久化消息表;

五、持久化消息
系统启动完毕之后,消息表中内容为空:

mysql> SELECT * FROM activemq_msgs; Empty set

1、发送消息:打开http://127.0.0.1:8161/demo/页面,找到“Send a message”链接,打开页面(http://127.0.0.1:8161/demo/send.html),填写完表格后,点击“Send”按键,即AMQ投递了一个消息;
2、查看消息:发送之后,我们可以看到数据表中多了一条消息:

mysql> SELECT * FROM activemq_msgs; +----+-----------------+--------------------------------------------+-----------+------------+-----+----------+ | ID | CONTAINER       | MSGID_PROD                                 | MSGID_SEQ | EXPIRATION | MSG | PRIORITY | +----+-----------------+--------------------------------------------+-----------+------------+-----+----------+ |  1 | queue://FOO.BAR | ID:SHI-AP33382A-1486-1309840138441-2:2:1:1 |         1 |          0 | |        5 | +----+-----------------+--------------------------------------------+-----------+------------+-----+----------+

3、取得消息:找到“Receive a message”链接,打开页面(http://127.0.0.1:8161/demo/message/FOO/BAR?readTimeout=10000&type=queue),发现该页面不是一个标准HTML页面,查看其源代码,其内容是不是就是刚才的消息内容?
4、查看消息:消息消费之后,我们可以看到数据表没有消息了:

mysql> SELECT * FROM activemq_msgs; Empty set

5、我们可以生产多条消息,然后一条一条的消费,发现消息表中的消息一条一条的减少;
6、在发送消息页面,“Destination Type”如果选择“Topic”的话,则消息表中并没有数据,原因在于“Queue”为ptp模式消息,“Topic”为发布/订阅模式消息,当没有订阅者时,消息直接丢掉了。

JMS的内容先介绍到这里,下面我将结合Spring来启动AMQ(即AMQ与应用一同启动,上面介绍的都是单独的启动),通过测试代码来发送和消费消息,敬请期待!

JMS服务器ActiveMQ的初体验并持久化消息到MySQL数据库中

标签:

人气教程排行