Nsq-分布式消息队列中间件介绍
一、NSQ介绍
1、NSQ是Go语言编写的,开源的分布式消息队列中间件,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。
2、NSQ非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置)且具有最大的灵活性,支持众多消息协议。
3、支持消息内存队列的大小设置,默认完全持久化(值为0),消息即可持久到磁盘也可以保存在内存中。
4、保证消息至少传递一次,以确保消息可以最终成功发送,且收到的消息是无序的, 实现了松散订购。
5、单个nsqd可以有多个Topic,每个Topic又可以有多个Channel。Channel能够接收Topic所有消息的副本,从而实现了消息多播分发;而Channel上的每个消息被分发给它的订阅者,从而实现负载均衡。
二、代码示例
producer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
package main import ( "fmt" "github.com/nsqio/go-nsq" "time" "encoding/json" ) const NSQ_TYPE string = "YTC_IP" const NSQ_PORT int = 4150 const IP string = "127.0.0.1" var producer *nsq.Producer type Message struct { TaskId int `json:"task_id"` Ips string `json:"ips"` Ports string `json:"ports"` Proc string `json:"proc"` } // 主函数 func main() { for { //推送nsq p := Message{ TaskId: 1, Ips: "192.168.0.1", Ports: "80", Proc: "tcp", } info, jerr := json.Marshal(p) if jerr != nil { panic("Could not connect.") } err := Publish(NSQ_TYPE, string(info)) if err != nil { panic(err) } time.Sleep(time.Second * 10) } producer.Stop() } // 初始化生产者 func InitProducer(str string) { var err error producer, err = nsq.NewProducer(str, nsq.NewConfig()) if err != nil { panic(err) } } //发布消息 func Publish(topic string, message string) error { var err error if producer != nil { if message == "" { //不能发布空串,否则会导致error return nil } fmt.Println(message) err = producer.Publish(topic, []byte(message)) // 发布消息 return err } return fmt.Errorf("%d producer is nil", err) } |
consumer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
package main import ( "fmt" "github.com/nsqio/go-nsq" "sync" "time" ) var producer *nsq.Producer const TOPIC string = "test" const CHANNEL string = "test_ch" const IP string = "127.0.0.1" const LOOKUP_PORT int = 4150 //nsq节点端口 const LIMIT int = 10 //客户端并发进程数 const MAX_IN_FLIGHT int = 1 // 消费者 type ConsumerT struct{} func main() { for { //监听publish发布内容 connIp := fmt.Sprintf("%s:%d", IP, LOOKUP_PORT) InitConsumer(TOPIC, CHANNEL, connIp) time.Sleep(time.Second * 10) } } //处理消息 func (*ConsumerT) HandleMessage(msg *nsq.Message) error { fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body)) //HandleTask(string(msg.Body)) fmt.Println(string(msg.Body)) return nil } //初始化消费者 func InitConsumer(topic string, channel string, address string) { waiter := sync.WaitGroup{} waiter.Add(1) go func() { defer waiter.Done() config := nsq.NewConfig() config.MaxInFlight = MAX_IN_FLIGHT //建立多个连接 for i := 1; i < LIMIT; i++ { consumer, err := nsq.NewConsumer(topic, channel, config) if nil != err { fmt.Println("err", err) } consumer.SetLogger(nil, 0) //屏蔽系统日志 consumer.AddHandler(&ConsumerT{}) err = consumer.ConnectToNSQD(address) if nil != err { fmt.Println("err", err) } } select {} }() waiter.Wait() } |
发表评论