博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
docker快速安装rabbitmq
阅读量:5331 次
发布时间:2019-06-14

本文共 8819 字,大约阅读时间需要 29 分钟。

一、获取镜像

#指定版本,该版本包含了web控制页面docker pull rabbitmq:management

二、运行镜像

#方式一:默认guest 用户,密码也是 guestdocker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management#方式二:设置用户名和密码docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

三、访问ui页面

http://localhost:15672/

四、golang案例

#producer生产者代码package mainimport (    "fmt"    "log"    "github.com/streadway/amqp")const (    //AMQP URI    uri = "amqp://guest:guest@10.0.0.11:5672/" // 10.0.0.11为主机ip    //Durable AMQP exchange name    exchangeName = ""    //Durable AMQP queue name    queueName = "test-queues"    //Body of message    bodyMsg string = "hello angel")//如果存在错误,则输出func failOnError(err error, msg string) {    if err != nil {        log.Fatalf("%s: %s", msg, err)        panic(fmt.Sprintf("%s: %s", msg, err))    }}func main() {    //调用发布消息函数    publish(uri, exchangeName, queueName, bodyMsg)    log.Printf("published %dB OK", len(bodyMsg))}//发布者的方法//@amqpURI, amqp的地址//@exchange, exchange的名称//@queue, queue的名称//@body, 主体内容func publish(amqpURI string, exchange string, queue string, body string) {    //建立连接    log.Printf("dialing %q", amqpURI)    connection, err := amqp.Dial(amqpURI)    failOnError(err, "Failed to connect to RabbitMQ")    defer connection.Close()    //创建一个Channel    log.Printf("got Connection, getting Channel")    channel, err := connection.Channel()    failOnError(err, "Failed to open a channel")    defer channel.Close()    log.Printf("got queue, declaring %q", queue)    //创建一个queue    q, err := channel.QueueDeclare(        queueName, // name        false, // durable        false, // delete when unused        false, // exclusive        false, // no-wait        nil, // arguments    )    failOnError(err, "Failed to declare a queue")    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)    // Producer只能发送到exchange,它是不能直接发送到queue的    // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue    // routing_key就是指定的queue名字    err = channel.Publish(        exchange, // exchange        q.Name, // routing key        false, // mandatory        false, // immediate        amqp.Publishing{            Headers: amqp.Table{},            ContentType: "text/plain",            ContentEncoding: "",            Body: []byte(body),        })    failOnError(err, "Failed to publish a message")}
#consumer消费者代码package mainimport (    "fmt"    "log"    "github.com/streadway/amqp")const (    //AMQP URI    uri = "amqp://guest:guest@10.0.0.11:5672/"    //Durable AMQP exchange nam    exchangeName = ""    //Durable AMQP queue name    queueName = "test-queues")//如果存在错误,则输出func failOnError(err error, msg string) {    if err != nil {        log.Fatalf("%s: %s", msg, err)        panic(fmt.Sprintf("%s: %s", msg, err))    }}func main() {    //调用消息接收者    consumer(uri, exchangeName, queueName)}//接收者方法//@amqpURI, amqp的地址//@exchange, exchange的名称//@queue, queue的名称func consumer(amqpURI string, exchange string, queue string) {    //建立连接    log.Printf("dialing %q", amqpURI)    connection, err := amqp.Dial(amqpURI)    failOnError(err, "Failed to connect to RabbitMQ")    defer connection.Close()    //创建一个Channel    log.Printf("got Connection, getting Channel")    channel, err := connection.Channel()    failOnError(err, "Failed to open a channel")    defer channel.Close()    log.Printf("got queue, declaring %q", queue)    //创建一个queue    q, err := channel.QueueDeclare(        queueName, // name        false, // durable        false, // delete when unused        false, // exclusive        false, // no-wait        nil, // arguments    )    failOnError(err, "Failed to declare a queue")    log.Printf("Queue bound to Exchange, starting Consume")    //订阅消息    msgs, err := channel.Consume(        q.Name, // queue        "", // consumer        true, // auto-ack        false, // exclusive        false, // no-local        false, // no-wait        nil, // args    )    failOnError(err, "Failed to register a consumer")    //创建一个channel    forever := make(chan bool)    //调用gorountine    go func() {        for d := range msgs {            log.Printf("Received a message: %s", d.Body)        }    }()    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")    //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出    <-forever}

 

五、拥有消息确认的代码

#producerpackage mainimport (    "fmt"    "github.com/streadway/amqp"    "log"    "os"    "strings")const (    //AMQP URI    uri = "amqp://guest:guest@10.0.0.11:5672/"    //Durable AMQP exchange name    exchangeName = ""    //Durable AMQP queue name    queueName = "test-queues-acknowledgments")//如果存在错误,则输出func failOnError(err error, msg string) {    if err != nil {        log.Fatalf("%s: %s", msg, err)        panic(fmt.Sprintf("%s: %s", msg, err))    }}func main() {    bodyMsg := bodyFrom(os.Args)    //调用发布消息函数    publish(uri, exchangeName, queueName, bodyMsg)    log.Printf("published %dB OK", len(bodyMsg))}func bodyFrom(args []string) string {    var s string    if (len(args) < 2) || os.Args[1] == "" {        s = "hello angel"    } else {        s = strings.Join(args[1:], " ")    }    return s}//发布者的方法//@amqpURI, amqp的地址//@exchange, exchange的名称//@queue, queue的名称//@body, 主体内容func publish(amqpURI string, exchange string, queue string, body string) {    //建立连接    log.Printf("dialing %q", amqpURI)    connection, err := amqp.Dial(amqpURI)    failOnError(err, "Failed to connect to RabbitMQ")    defer connection.Close()    //创建一个Channel    log.Printf("got Connection, getting Channel")    channel, err := connection.Channel()    failOnError(err, "Failed to open a channel")    defer channel.Close()    log.Printf("got queue, declaring %q", queue)    //创建一个queue    q, err := channel.QueueDeclare(        queueName, // name        false,     // durable        false,     // delete when unused        false,     // exclusive        false,     // no-wait        nil,       // arguments    )    failOnError(err, "Failed to declare a queue")    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)    // Producer只能发送到exchange,它是不能直接发送到queue的。    // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。    // routing_key就是指定的queue名字。    err = channel.Publish(        exchange, // exchange        q.Name,   // routing key        false,    // mandatory        false,    // immediate        amqp.Publishing{            Headers:         amqp.Table{},            ContentType:     "text/plain",            ContentEncoding: "",            Body:            []byte(body),        })    failOnError(err, "Failed to publish a message")}
#consumerpackage mainimport (    "bytes"    "fmt"    "github.com/streadway/amqp"    "log"    "time")const (    //AMQP URI    uri = "amqp://guest:guest@10.0.0.11:5672/"    //Durable AMQP exchange nam    exchangeName = ""    //Durable AMQP queue name    queueName = "test-queues-acknowledgments")//如果存在错误,则输出func failOnError(err error, msg string) {    if err != nil {        log.Fatalf("%s: %s", msg, err)        panic(fmt.Sprintf("%s: %s", msg, err))    }}func main() {    //调用消息接收者    consumer(uri, exchangeName, queueName)}//接收者方法//@amqpURI, amqp的地址//@exchange, exchange的名称//@queue, queue的名称func consumer(amqpURI string, exchange string, queue string) {    //建立连接    log.Printf("dialing %q", amqpURI)    connection, err := amqp.Dial(amqpURI)    failOnError(err, "Failed to connect to RabbitMQ")    defer connection.Close()    //创建一个Channel    log.Printf("got Connection, getting Channel")    channel, err := connection.Channel()    failOnError(err, "Failed to open a channel")    defer channel.Close()    log.Printf("got queue, declaring %q", queue)    //创建一个queue    q, err := channel.QueueDeclare(        queueName, // name        false,     // durable        false,     // delete when unused        false,     // exclusive        false,     // no-wait        nil,       // arguments    )    failOnError(err, "Failed to declare a queue")    log.Printf("Queue bound to Exchange, starting Consume")    //订阅消息    msgs, err := channel.Consume(        q.Name, // queue        "",     // consumer        false,  // auto-ack        false,  // exclusive        false,  // no-local        false,  // no-wait        nil,    // args    )    failOnError(err, "Failed to register a consumer")    //创建一个channel    forever := make(chan bool)    //调用gorountine    go func() {        for d := range msgs {            log.Printf("Received a message: %s", d.Body)            dot_count := bytes.Count(d.Body, []byte("."))            t := time.Duration(dot_count)            time.Sleep(t * time.Second)            log.Printf("Done")            d.Ack(false)        }    }()    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")    //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出    <-forever}

 

转载于:https://www.cnblogs.com/angelyan/p/11218260.html

你可能感兴趣的文章
Android 监听返回键、HOME键
查看>>
Android ContentProvider的实现
查看>>
sqlserver 各种判断是否存在(表名、函数、存储过程等)
查看>>
给C#学习者的建议 - CLR Via C# 读后感
查看>>
Recover Binary Search Tree
查看>>
Java 实践:生产者与消费者
查看>>
[转]IOCP--Socket IO模型终结篇
查看>>
各种正则验证
查看>>
观察者模式(Observer)
查看>>
python中numpy.r_和numpy.c_
查看>>
egret3D与2D混合开发,画布尺寸不一致的问题
查看>>
freebsd 实现 tab 命令 补全 命令 提示
查看>>
struts1和struts2的区别
查看>>
函数之匿名函数
查看>>
shell习题第16题:查用户
查看>>
Redis常用命令
查看>>
2018.11.06 bzoj1040: [ZJOI2008]骑士(树形dp)
查看>>
2019.02.15 bzoj5210: 最大连通子块和(链分治+ddp)
查看>>
redis cluster 集群资料
查看>>
微软职位内部推荐-Sr. SE - Office incubation
查看>>