wxvirus wxvirus
首页
  • Go文章

    • Go语言学习
  • Rust

    • Rust学习
  • Java

    • 《Java》
  • Python文章

    • Python
  • PHP文章

    • PHP设计模式
  • 学习笔记

    • 《Git》
  • HTML
  • CSS
  • JS
  • 技术文档
  • GitHub技巧
  • 刷题
  • 博客搭建
  • 算法学习
  • 架构设计
  • 设计模式
  • 学习
  • 面试
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

无解的lifecycle

let today = new Beginning()
首页
  • Go文章

    • Go语言学习
  • Rust

    • Rust学习
  • Java

    • 《Java》
  • Python文章

    • Python
  • PHP文章

    • PHP设计模式
  • 学习笔记

    • 《Git》
  • HTML
  • CSS
  • JS
  • 技术文档
  • GitHub技巧
  • 刷题
  • 博客搭建
  • 算法学习
  • 架构设计
  • 设计模式
  • 学习
  • 面试
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • C&C++

  • PHP

  • Python

  • Go

    • go基础

    • go核心

    • 网络编程

    • gowebsocket

    • gocasbin

    • K8S

    • rabbitmq

      • rabbitmq了解
      • 快速部署和go客户端使用
      • 生产者发送消息
        • 代码
        • 消费者读取消息
        • 简单封装 MQ 发送消息
      • 交换机
    • 框架相关

    • go-zero

    • kafka

    • rpc

    • 性能相关

  • microservice

  • rust

  • Java

  • 学习笔记

  • 后端
  • Go
  • rabbitmq
wxvirus
2023-01-15

生产者发送消息

# 生产者发送消息

# 代码

package main

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

func main() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d", "wxviurs", "123", "127.0.0.1", 5672)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		log.Fatalln(err)
	}
	defer conn.Close()
	// 创建 channel
	c, err := conn.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	defer c.Close()

	// 创建队列
	queue, err := c.QueueDeclare("test", false, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}

	// 使用channel发布消息
	err = c.Publish("", queue.Name, false, false,
		amqp.Publishing{
			ContentType: "text/plain",       // 消息类型
			Body:        []byte("test0001"), // 消息内容
		})
	if err != nil {
		log.Fatalln(err)
	}
	log.Println("发生消息成功")
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

image-20230107001807167

我这里发送了 2 次,这里的idle表示空闲状态,Total有 2 个消息,队列名为test

# 消费者读取消息

将连接 MQ 的代码进行封装一下

package AppInit

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

var MQCone *amqp.Connection

func init() {
	dsn := fmt.Sprintf("amqp://%s:%s@%s:%d", "wxviurs", "123", "127.0.0.1", 5672)
	conn, err := amqp.Dial(dsn)
	if err != nil {
		log.Fatalln(err)
	}
	MQCone = conn
	log.Println(MQCone.Major)
}

func GetConn() *amqp.Connection {
	return MQCone
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
	"log"
	"rmq/AppInit"
)

func main() {
	conn := AppInit.GetConn()
	defer conn.Close()

	c, err := conn.Channel()
	if err != nil {
		log.Fatalln(err)
	}
	defer c.Close()
	// 消费者
	messages, err := c.Consume("test", "c1", false, false, false, false, nil)
	if err != nil {
		log.Fatalln(err)
	}
	for msg := range messages {
		fmt.Println(msg.DeliveryTag, // 唯一标识
			string(msg.Body),// 内容
		)
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
➜  rmq go run client.go
2023/01/07 14:41:13 0
1 test0001
2 test0002

1
2
3
4
5

image-20230107144254233

此时 MQ 中是Unacked,这个是确认机制,我们获取消息后,我们需要告诉 MQ 消息收到了,否则下次运行,Ready又会变成 2 个,还能继续收到这个消息。

https://www.rabbitmq.com/tutorials/tutorial-two-go.html (opens new window)

for msg := range messages {
    msg.Ack(false)
    fmt.Println(msg.DeliveryTag, // 唯一标识
                string(msg.Body),// 内容
               )
}
1
2
3
4
5
6

# 简单封装 MQ 发送消息

package Lib

import (
	"github.com/streadway/amqp"
	"log"
	"rmq/AppInit"
)

const (
	QueueNewUser = "newuser" //用户注册 对应的队列名称
)

type MQ struct {
	Channel *amqp.Channel
}

func NewMQ() *MQ {
	// 创建channel
	c, err := AppInit.GetConn().Channel()
	if err != nil {
		log.Println(err)
		return nil
	}
	return &MQ{Channel: c}
}

func (mq *MQ) SendMessage(queueName string, message string) error {
	// 声明队列
	_, err := mq.Channel.QueueDeclare(queueName, false, false, false, false, nil)
	if err != nil {
		return err
	}
	// exchange 为空 会使用默认的交换机
	return mq.Channel.Publish("", queueName, false, false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		},
	)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

使用gin来模拟一个api来用作用户注册操作

package main

import (
	"github.com/gin-gonic/gin"
	"log"
	"rmq/Lib"
	"rmq/UserReg/Models"
	"strconv"
	"time"
)

func main() {
	router := gin.Default()
	router.Handle("POST", "/user", func(context *gin.Context) {
		userModel := Models.NewUserModel()
		err := context.BindJSON(&userModel)
		if err != nil {
			context.JSON(400, gin.H{"result": "param error"})
		} else {
			userModel.UserId = int(time.Now().Unix()) //假设就是入库过程
			if userModel.UserId > 0 {                 //假设入库成功
				mq := Lib.NewMQ()
				err := mq.SendMessage(Lib.QueueNewUser, strconv.Itoa(userModel.UserId))
				if err != nil {
					log.Println(err)
				}
			}
			context.JSON(200, gin.H{"result": userModel})
		}
	})
	router.Run(":8080")

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

image-20230107150914902

image-20230107150941704

对应也出现了这个newuser的队列名称。

编辑 (opens new window)
#producer
上次更新: 2023/01/15, 21:39:37
快速部署和go客户端使用
交换机

← 快速部署和go客户端使用 交换机→

最近更新
01
centos7安装redis6文档记录
02-14
02
portainer的安装
02-11
03
gin自定义验证器和翻译器
02-11
更多文章>
Theme by Vdoing | Copyright © 2021-2023 wxvirus
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式