20191030003

单机安装

安装版本与环境说明

  1. CentOS 7.5
  2. jdk1.8
  3. erlang-21.2.3-1
  4. rabbitmq-server-3.7.9

一、安装依赖环境erlang

  1. http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的版本
  2. https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本,erlang-*.centos.x86_64.rpm就是centos版本的。
  3. 复制下载地址后,使用wget命令下载
wget -P /home/download https://github.com/rabbitmq/erlang-rpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
  1. 安装 Erlang
sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
  1. 安装 socat
yum install -y socat

二、安装RabbitMQ

  1. 在官方下载页面找到CentOS7版本的下载链接,下载rpm安装包
wget -P /home/download https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm

提示:可以在https://github.com/rabbitmq/rabbitmq-server/tags下载历史版本

  1. 安装RabbitMQ
sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm

三、启动和关闭

  • 启动服务
sudo systemctl start rabbitmq-server
  • 查看状态
sudo systemctl status rabbitmq-server
  • 停止服务
sudo systemctl stop rabbitmq-server

设置开机启动

sudo systemctl enable rabbitmq-server

四、开启Web管理插件

  1. 开启插件
rabbitmq-plugins enable rabbitmq_management

说明:rabbitmq有一个默认的guest用户,但只能通过localhost访问,所以需要添加一个能够远程访问的用户。

  1. 添加用户
rabbitmqctl add_user admin admin
  1. 为用户分配操作权限
rabbitmqctl set_user_tags admin administrator
  1. 为用户分配资源权限
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

五、防火墙添加端口

RabbitMQ 服务启动后,还不能进行外部通信,需要将端口添加都防火墙

  1. 添加端口
sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent

sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent

sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent

sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
  1. 重启防火墙
sudo firewall-cmd --reload

至此,可以通过外部浏览器访问控制台管理页面,http://10.211.55.8:15672/,如下图:

image.png

代码示例

生产者代码示例

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Producer
 * @Description
 * @Author 夕
 * @Date 2019-11-04 13:40
 * @Version V1.0
 **/
//简单队列生产者,使用RabbitMQ的默认交换器发送消息
public class Producer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置连接属性
        factory.setHost("10.211.55.8");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3.从连接工作获取连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection("生产者");
            //4.从连接中创建通道
            channel = connection.createChannel();

            /*
             * 5.声明(创建)队列,
             * 如果队列不存在才会创建
             * RabbitMQ不允许声明两个队列名相同,属性不同的队列,否则会报错
             **/
            channel.queueDeclare("queue1", false, false, false, null);
            //6.发送消息
            String message = "Hello RabbitMQ";
            channel.basicPublish("","queue1",null,message.getBytes());
            System.out.println("消息已发送");

        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            //7.关闭通道
            if(channel !=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if(connection !=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }
}

消费者代码示例

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @ClassName Consumer
 * @Description
 * @Author 夕
 * @Date 2019-11-04 13:40
 * @Version V1.0
 **/
//简单队列消费者
public class Consumer {
    public static void main(String[] args) {
        //1.创建连接工程
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置连接属性
        factory.setHost("10.211.55.8");
        factory.setUsername("admin");
        factory.setPassword("admin");
        //3.从连接工厂获取连接
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection("消费者");
	    //4.从连接中获取通道
            channel = connection.createChannel();
            //5.声明(创建)队列
            channel.queueDeclare("queue1", false, false, false, null);
            //6.定义收到消息后的回调
            DeliverCallback callback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery message) throws IOException {
                    System.out.println("收到消息:"+new String(message.getBody()));
                }
            };
            //7.监听队列
            channel.basicConsume("queue1", true, callback, new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {

                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
            //8.关闭通道
            if(channel !=null && channel.isOpen()){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            //9.关闭连接
            if(connection !=null && connection.isOpen()){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}