RabbitMQ 初读
更新日期:
RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
简介
典型的消费-生产者模型。 应用:对无需即时返回且耗时的操作,进行异步处理。
RabbitMQ的结构图如下:
几个概念说明
Broker:简单来说就是消息队列服务器实体。 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。 Queue:消息队列载体,每个消息都会被投入到一个或多个队列。 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。 vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。 producer:消息生产者,就是投递消息的程序。 consumer:消息消费者,就是接受消息的程序。 channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
安装erlang
1 | $sudo apt-get install erlang-nox |
安装RabbitMQ
1 | $sudo dpkg -i rabbitmq-server_3.2.3-1_all.deb |
启动/停止rabbitmq-server要用全路径
1 2 3 4 5 6 | /etc/init.d/rabbitmq-server start * Starting message broker rabbitmq-server [ OK ] /etc/init.d/rabbitmq-server stop * Stopping message broker rabbitmq-server [ OK ] /etc/init.d/rabbitmq-server restart * Restarting message broker rabbitmq-server [ OK ] |
配置文件 Path: /etc/rabbitmq/
- 1 enabled_plugins
- 2 rabbitmq.conf
- 3 rabbitmq-env.conf
RABBITMQ_NODE_PORT:5672
Configuration rabbitmq-env.cof
RabbitMQ启动参数具体含义, 偶还没仔细看
安装web插件管理界面
1 2 3 | $rabbitmq-plugins enable rabbitmq_management $ls /etc/rabbitmq $/etc/init.d/rabbitmq-server restart |
查看:http://localhost:15672
用户名/密码:guest/guest
用户管理
1 2 3 4 5 6 7 8 9 10 | 1.添加用户 $rabbitmqctl add_user username password 2.删除用户 $rabbitmqctl delete_user username 3.修改密码 $rabbitmqctl change_password username newpassword 4.列出所有用户 $rabbitmqctl list_users 5.用户赋权 $rabbitmqctl set_user_tags newuser administrator |
插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | 1. 查看所有插件 $rabbitmq-plugins list [e] amqp_client 3.2.3 [ ] cowboy 0.5.0-rmq3.2.3-git4b93c2d [ ] eldap 3.2.3-gite309de4 [e] mochiweb 2.7.0-rmq3.2.3-git680dba8 [ ] rabbitmq_amqp1_0 3.2.3 [ ] rabbitmq_auth_backend_ldap 3.2.3 [ ] rabbitmq_auth_mechanism_ssl 3.2.3 [ ] rabbitmq_consistent_hash_exchange 3.2.3 [ ] rabbitmq_federation 3.2.3 [ ] rabbitmq_federation_management 3.2.3 [ ] rabbitmq_jsonrpc 3.2.3 [ ] rabbitmq_jsonrpc_channel 3.2.3 [ ] rabbitmq_jsonrpc_channel_examples 3.2.3 [E] rabbitmq_management 3.2.3 [e] rabbitmq_management_agent 3.2.3 [ ] rabbitmq_management_visualiser 3.2.3 [ ] rabbitmq_mqtt 3.2.3 [ ] rabbitmq_shovel 3.2.3 [ ] rabbitmq_shovel_management 3.2.3 [ ] rabbitmq_stomp 3.2.3 [ ] rabbitmq_tracing 3.2.3 [e] rabbitmq_web_dispatch 3.2.3 [ ] rabbitmq_web_stomp 3.2.3 [ ] rabbitmq_web_stomp_examples 3.2.3 [ ] rfc4627_jsonrpc 3.2.3-git5e67120 [ ] sockjs 0.3.4-rmq3.2.3-git3132eb9 [e] webmachine 1.10.3-rmq3.2.3-gite9359c7 2. 激活插件 $rabbitmq-plugins enable rabbitmq_management 3. 关闭插件 $rabbitmq-plugins disable rabbitmq_management |
测试用的Java代码
代码来源: RabbitMQ 入门指南(Java)
依赖Jar: amqp-client-3.0.4.jar, commons-lang-2.6.jar
EndPoint.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | package examples.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws IOException { this.endPointName = endpointName; // Create a connection factory ConnectionFactory factory = new ConnectionFactory(); // hostname of your rabbitmq server factory.setHost("192.168.6.129"); // getting a connection connection = factory.newConnection(); // creating a channel channel = connection.createChannel(); // declaring a queue for this channel. If queue does not exist, // it will be created on the server. channel.queueDeclare(endpointName, false, false, false, null); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * * @throws IOException */ public void close() throws IOException { this.channel.close(); this.connection.close(); } } |
Producer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | package examples.rabbitmq; import java.io.IOException; import java.io.Serializable; import org.apache.commons.lang.SerializationUtils; /** * The producer endpoint that writes to the queue. * @author syntx * */ public class Producer extends EndPoint{ public Producer(String endPointName) throws IOException{ super(endPointName); } public void sendMessage(Serializable object) throws IOException { channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object)); } } |
QueueConsumer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | package examples.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; /** * 读取队列的程序端,实现了Runnable接口。 * @author syntx * */ public class QueueConsumer extends EndPoint implements Runnable, Consumer{ public QueueConsumer(String endPointName) throws IOException{ super(endPointName); } public void run() { try { //start consuming messages. Auto acknowledge messages. channel.basicConsume(endPointName, true,this); } catch (IOException e) { e.printStackTrace(); } } /** * Called when consumer is registered. */ public void handleConsumeOk(String consumerTag) { System.out.println("Consumer "+consumerTag +" registered"); } /** * Called when new message is available. */ public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { Map map = (HashMap)SerializationUtils.deserialize(body); System.out.println("Message Number "+ map.get("message number") + " received."); } public void handleCancel(String consumerTag) {} public void handleCancelOk(String consumerTag) {} public void handleRecoverOk(String consumerTag) {} public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {} } |
Main.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | package examples.rabbitmq; import java.io.IOException; import java.sql.SQLException; import java.util.HashMap; public class Main { public Main() throws Exception{ QueueConsumer consumer = new QueueConsumer("queue"); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer("queue"); for (int i = 0; i < 100000; i++) { HashMap message = new HashMap(); message.put("message number", i); producer.sendMessage(message); System.out.println("Message Number "+ i +" sent."); } } /** * @param args * @throws SQLException * @throws IOException */ public static void main(String[] args) throws Exception{ new Main(); } } |
参考文章
消息队列RabbitMQ入门介绍 可扩展Web架构与分布式系统 想看来着,但是文章实在是太长了 -_-! RabbitMQ之消息发布订阅与信息持久化技术 RabbitMQ启动参数具体含义 rabbitmq-service用户手册 RabbitMQ的安装,配置,监控