文章目录
  1. 1. 简介
    1. 1.1. 几个概念说明
    2. 1.2. 安装erlang
    3. 1.3. 安装RabbitMQ
      1. 1.3.1. 要用全路径
      2. 1.3.2. Path: /etc/rabbitmq/
    4. 1.4. 安装web插件管理界面
    5. 1.5. 用户管理
    6. 1.6. 插件
    7. 1.7. 测试用的Java代码
  2. 2. 参考文章

RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。

简介

典型的消费-生产者模型。 应用:对无需即时返回且耗时的操作,进行异步处理。

RabbitMQ的结构图如下: RabbitMQ的结构图

几个概念说明

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

安装erlang

1
$sudo apt-get install erlang-nox

安装RabbitMQ

1
$sudo dpkg -i rabbitmq-server_3.2.3-1_all.deb

启动/停止rabbitmq-server要用全路径

1
2
3
4
5
6
/etc/init.d/rabbitmq-server start
 * Starting message broker rabbitmq-server                                                                                                 [ OK ]
/etc/init.d/rabbitmq-server stop
 * Stopping message broker rabbitmq-server                                                                                                 [ OK ]
/etc/init.d/rabbitmq-server restart
 * Restarting message broker rabbitmq-server                                                                                               [ OK ]

配置文件 Path: /etc/rabbitmq/

  • 1 enabled_plugins
  • 2 rabbitmq.conf
  • 3 rabbitmq-env.conf

RABBITMQ_NODE_PORT:5672

Configuration
rabbitmq-env.cof

RabbitMQ启动参数具体含义, 偶还没仔细看

安装web插件管理界面

1
2
3
$rabbitmq-plugins enable rabbitmq_management
$ls /etc/rabbitmq
$/etc/init.d/rabbitmq-server restart

查看:http://localhost:15672 用户名/密码:guest/guest 效果图

用户管理

1
2
3
4
5
6
7
8
9
10
1.添加用户
$rabbitmqctl add_user username password
2.删除用户
$rabbitmqctl delete_user username
3.修改密码
$rabbitmqctl change_password username newpassword
4.列出所有用户
$rabbitmqctl list_users
5.用户赋权
$rabbitmqctl set_user_tags newuser administrator

插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
1. 查看所有插件
$rabbitmq-plugins list

[e] amqp_client                       3.2.3
[ ] cowboy                            0.5.0-rmq3.2.3-git4b93c2d
[ ] eldap                             3.2.3-gite309de4
[e] mochiweb                          2.7.0-rmq3.2.3-git680dba8
[ ] rabbitmq_amqp1_0                  3.2.3
[ ] rabbitmq_auth_backend_ldap        3.2.3
[ ] rabbitmq_auth_mechanism_ssl       3.2.3
[ ] rabbitmq_consistent_hash_exchange 3.2.3
[ ] rabbitmq_federation               3.2.3
[ ] rabbitmq_federation_management    3.2.3
[ ] rabbitmq_jsonrpc                  3.2.3
[ ] rabbitmq_jsonrpc_channel          3.2.3
[ ] rabbitmq_jsonrpc_channel_examples 3.2.3
[E] rabbitmq_management               3.2.3
[e] rabbitmq_management_agent         3.2.3
[ ] rabbitmq_management_visualiser    3.2.3
[ ] rabbitmq_mqtt                     3.2.3
[ ] rabbitmq_shovel                   3.2.3
[ ] rabbitmq_shovel_management        3.2.3
[ ] rabbitmq_stomp                    3.2.3
[ ] rabbitmq_tracing                  3.2.3
[e] rabbitmq_web_dispatch             3.2.3
[ ] rabbitmq_web_stomp                3.2.3
[ ] rabbitmq_web_stomp_examples       3.2.3
[ ] rfc4627_jsonrpc                   3.2.3-git5e67120
[ ] sockjs                            0.3.4-rmq3.2.3-git3132eb9
[e] webmachine                        1.10.3-rmq3.2.3-gite9359c7


2. 激活插件
$rabbitmq-plugins enable rabbitmq_management

3. 关闭插件
$rabbitmq-plugins disable rabbitmq_management

测试用的Java代码

代码来源: RabbitMQ 入门指南(Java)

依赖Jar: amqp-client-3.0.4.jar, commons-lang-2.6.jar

EndPoint.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package examples.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public abstract class EndPoint {

	protected Channel channel;
	protected Connection connection;
	protected String endPointName;

	public EndPoint(String endpointName) throws IOException {
		this.endPointName = endpointName;

		// Create a connection factory
		ConnectionFactory factory = new ConnectionFactory();

		// hostname of your rabbitmq server
		factory.setHost("192.168.6.129");

		// getting a connection
		connection = factory.newConnection();

		// creating a channel
		channel = connection.createChannel();

		// declaring a queue for this channel. If queue does not exist,
		// it will be created on the server.
		channel.queueDeclare(endpointName, false, false, false, null);
	}

	/**
	 * 关闭channel和connection。并非必须,因为隐含是自动调用的。
	 * 
	 * @throws IOException
	 */
	public void close() throws IOException {
		this.channel.close();
		this.connection.close();
	}
}

Producer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package examples.rabbitmq;

import java.io.IOException;
import java.io.Serializable;

import org.apache.commons.lang.SerializationUtils;


/**
 * The producer endpoint that writes to the queue.
 * @author syntx
 *
 */
public class Producer extends EndPoint{
	
	public Producer(String endPointName) throws IOException{
		super(endPointName);
	}

	public void sendMessage(Serializable object) throws IOException {
	    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
	}	
}

QueueConsumer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package examples.rabbitmq;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;


/**
 * 读取队列的程序端,实现了Runnable接口。
 * @author syntx
 *
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer{
	
	public QueueConsumer(String endPointName) throws IOException{
		super(endPointName);		
	}
	
	public void run() {
		try {
			//start consuming messages. Auto acknowledge messages.
			channel.basicConsume(endPointName, true,this);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Called when consumer is registered.
	 */
	public void handleConsumeOk(String consumerTag) {
		System.out.println("Consumer "+consumerTag +" registered");		
	}

	/**
	 * Called when new message is available.
	 */
	public void handleDelivery(String consumerTag, Envelope env,
			BasicProperties props, byte[] body) throws IOException {
		Map map = (HashMap)SerializationUtils.deserialize(body);
	    System.out.println("Message Number "+ map.get("message number") + " received.");
		
	}

	public void handleCancel(String consumerTag) {}
	public void handleCancelOk(String consumerTag) {}
	public void handleRecoverOk(String consumerTag) {}
	public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}

Main.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package examples.rabbitmq;

import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;

public class Main {
	public Main() throws Exception{
		
		QueueConsumer consumer = new QueueConsumer("queue");
		Thread consumerThread = new Thread(consumer);
		consumerThread.start();
		
		Producer producer = new Producer("queue");
		
		for (int i = 0; i < 100000; i++) {
			HashMap message = new HashMap();
			message.put("message number", i);
			producer.sendMessage(message);
			System.out.println("Message Number "+ i +" sent.");
		}
	}
	
	/**
	 * @param args
	 * @throws SQLException 
	 * @throws IOException 
	 */
	public static void main(String[] args) throws Exception{
	  new Main();
	}
}

参考文章

消息队列RabbitMQ入门介绍
可扩展Web架构与分布式系统 想看来着,但是文章实在是太长了 -_-!
RabbitMQ之消息发布订阅与信息持久化技术
RabbitMQ启动参数具体含义
rabbitmq-service用户手册
RabbitMQ的安装,配置,监控

文章目录
  1. 1. 简介
    1. 1.1. 几个概念说明
    2. 1.2. 安装erlang
    3. 1.3. 安装RabbitMQ
      1. 1.3.1. 要用全路径
      2. 1.3.2. Path: /etc/rabbitmq/
    4. 1.4. 安装web插件管理界面
    5. 1.5. 用户管理
    6. 1.6. 插件
    7. 1.7. 测试用的Java代码
  2. 2. 参考文章