专业的编程技术博客社区

网站首页 > 博客文章 正文

NSQ简介及连接模式(switch connector)

baijin 2024-09-04 02:07:06 博客文章 9 ℃ 0 评论


1、 NSQ简介


NSQ是一个实时的分布式的消息处理平台。


主要有以下特点:


  • 支持分布式的拓扑结构,没有SPOF(单点故障)
  • 支持横向水平扩展(没有 brokers 等任何集中式代理)
  • 性能优越,基于推送的低延迟的消息传递模式
  • 结合负载均衡和多路广播类型的消息路由
  • 擅长面向流(streaming)(高吞吐量)和 任务(Job)导向的工作负载
  • 主要是基于内存的(超过高水位线消息将透明的存储到磁盘中)
  • 运行时的服务发现
  • 传输层安全性(TLS)
  • 数据格式不可知
  • 很少的依赖项,容易部署,具有健全的、有边界的、默认的配置项
  • 简单的TCP协议,支持任何语言的客户端库
  • 具有HTTP接口,用于统计、管理行为以及生产者操作(不需要客户端库额外发布)
  • 集成 statsd 用于实时监测
  • 强大的集群管理接口 (nsqadmin)


2、主要组件


  • nsqd 消息队列的核心,是一个守护进程,负责接收、队列和投递信息给客户端。
  • nsqlookupd 管理拓扑信息,是一个守护进程,并提供最终一致的服务发现功能。
  • nsqadmin 是一个Web 管理界面,用于实时的检测集群信息,并能够执行各种管理任务。


3、连接方式


分为两种,直连模式和通过nsqlookupd进行连接


4、直连模式


NSQ 客户端通过TCP连接到提供指定主题的所有实例。没有中间代理 brokers,没有单点故障。


直接从所有生产者中消费消息。从技术来上来讲,哪个客户端连接到哪个NSQ实例并不重要,只要有足够的客户端连接到所有的生产者以满足消息总量就可以,并保证所有的消息最终都会被处理。


?编辑


实验:在一台虚拟机上,模拟启动两个 nsqd 实例


(1)启动第一个实例, tcp端口地址为:8000,http端口地址为:8001,存储磁盘备份消息的路径为/nsq_data


./nsqd -tcp-address ":8000" -http-address ":8001" -data-path=/nsq_data




?编辑


启动成功后,在存储路径nsq_data会生成文件nsq.dat


?编辑


其中文件内容为JSON格式的信息:


{
	"topics": [{
		"channels": [{
			"name": "HelloWordChan",
			"paused": false
		}],
		"name": "HelloWordTopic",
		"paused": false
	}],
	"version": "1.2.1-alpha"
}



?编辑


(2)启动第二个实例, tcp端口地址为:7000,http端口地址为:7001,存储磁盘备份消息的路径为/nsq_data_2


./nsqd -tcp-address ":7000" -http-address ":7001" -data-path=/nsq_data_2



?编辑


启动成功后,在存储路径nsq_data_2 会生成文件nsq.dat


?编辑


其中文件内容为JSON格式的信息:


{
	"topics": [{
		"channels": [{
			"name": "HelloWordChan",
			"paused": false
		}],
		"name": "HelloWordTopic",
		"paused": false
	}],
	"version": "1.2.1-alpha"
}



(3) 因资源有限,现创建一个Consumer端 应用程序


package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "os"
   "os/signal"
   "syscall"
)

type  StdoutHandler struct {
   topicName  string
}

func (sh *StdoutHandler) HandleMessage( m *nsq.Message)  error {
   _, err := os.Stdout.WriteString(sh.topicName)
   if err != nil  {
      log.Fatalf("ERROR: failed to write to os.Stdout - %s",err)
   }

   _, err = os.Stdout.WriteString(" | ")
   if err != nil {
      log.Fatalf("Error: failed to write to os.Stdout - %s",err)
   }

   _, err = os.Stdout.Write(m.Body)
   if err != nil {
      log.Fatalf("Error: failed to write to os.Stdout - %s",err)
   }

   _, err = os.Stdout.WriteString("\n")
   if err != nil {
      log.Fatalf("Error: failed to write to os.Stdout - %s",err)
   }
   return nil
}

func main() {

   cfg := nsq.NewConfig()
   nsqdTCPAddrs := []string {"192.168.1.18:7000","192.168.1.18:8000"}
   topicName := "HelloWordTopic"
   channel := "HelloWordChan"

   // 设置信号量
   sigChan := make(chan os.Signal,1)
   signal.Notify(sigChan,syscall.SIGINT,syscall.SIGTERM)

   fmt.Println("job is running......")

   //创建 Consumer
   consumer, err := nsq.NewConsumer(topicName, channel, cfg)
   if err != nil {
      log.Fatal(err)
   }

    // 创建 Handler
    stdOutHandler := &StdoutHandler{topicName: topicName}

    //为Consumer添加Handler
    consumer.AddHandler(stdOutHandler)
    //连接NSQDS
   err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
   if err != nil {
      log.Fatal(err)
   }

   //取得当前连接信息
   stats := consumer.Stats()
   if stats.Connections == 0 {
      log.Fatal("stats report 0 connections (should be > 0)")
   }

   <- sigChan
}



主要是通过语句 err = consumer.ConnectToNSQDs(adds),连接多个NSQD服务。


正常启动应用程序,会输出如下结果:


?编辑


(4) 选择一个NSQD 服务,向指定的主题发布消息


curl -d '8001 message info' 'http://192.168.1.18:8001/pub?topic=HelloWordTopic'



curl -d '7001 message info' 'http://192.168.1.18:7001/pub?topic=HelloWordTopic'



如果发送消息成功,会得到响应信息:OK:


?编辑


NSQD 会根据消息路由算法,把消息推送至一个客户端。


客户端应用的输出如下:


?编辑


可以看到无论在哪个NSQD应用下发布的消息都会推送至客户端。


(5)在磁盘备份消息的路径下,也会同步备份消息文件:


?编辑


查看文件内容:


?编辑


?编辑


(6)尝试停止一个NSQD服务,查看客户端应用是否会检查NSQD服务的状态


?编辑


客户端应用输出如下信息:


2021/02/03 15:18:07 ERR 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) IO error - EOF


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) beginning close


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) readLoop exiting


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) breaking out of writeLoop


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) writeLoop exiting


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) finished draining, cleanup exiting


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) clean close complete


2021/02/03 15:18:07 WRN 1 [HelloWordTopic/HelloWordChan] there are 1 connections left alive


2021/02/03 15:18:07 INF 1 [HelloWordTopic/HelloWordChan] (192.168.1.18:7000) re-connecting in 1m0s


客户端会尝试进行重连,重连的规则摘自官网内容:


  • If the consumer is configured with a specific list of nsqd instances, reconnection should be handled by delaying the retry attempt in an exponential backoff manner (i.e. try to reconnect in 8s, 16s, 32s, etc., up to a max).


(7)通过另外一个活着的NSQD服务,发布一条消息,查看客户端应用是否会接收到


curl -d '8001 Connection Handling' 'http://192.168.1.18:8001/pub?topic=HelloWordTopic'



客户端应用也会收到消息:


?编辑


同时,也可以看出,如果向关闭的NSQD服务,发布消息,会返回错误信息:


?编辑


5、直接连接方式总结


(1)如果NSQD实例出现异常,客户端会不断的进行重连,如果想要去掉这个连接信息,就需要额外做一些处理。


(2)如果需要对NSQD进行水平扩展,需要修改客户端应用程序,比如ConnectToNSQDs方法。


6、去中心化的连接模式NSQLOOKUPD


nsqlookupd 可以提供运行时的服务发现,为消费者查找到指定主题的nsqd生产者实例。


通过运行多个实例来实现高可用,它们不直接相互通信,并且数据保持最终一致性。实现去中心化。


消费者轮询其配置的所有的nsqlookupd实例并联合它们的响应。异常节点不会导致系统停止运行。


?编辑


示例图中,运行着多个nsqd实例和多个nsqlookupd实例,客户端应用根据NSQLOOKUPD 来查找可用的NSQD实例


注意:nsqlookupd 有两个接口:一个是TCP接口(默认端口:4160),nsqd使用它进行广播。


一个是 HTTP接口(默认端口:4161),客户端库使用它进行服务发现和其他管理操作。


实验:


(1)先启动两个 nsqlookupd


./nsqlookupd -tcp-address "192.168.1.18:8200" -http-address "192.168.1.18:8201"



?编辑


./nsqlookupd -tcp-address "192.168.1.18:7200" -http-address "192.168.1.18:7201"



?编辑


(2)为了验证运行时服务发现,先启动两个nsqd


./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=192.168.1.18:8200 --lookupd-tcp-address=192.168.1.18:7200 -broadcast-address='192.168.1.18' -data-path=/nsq_data



?编辑


./nsqd -tcp-address ":7000" -http-address ":7001" --lookupd-tcp-address=192.168.1.18:8200 --lookupd-tcp-address=192.168.1.18:7200 -broadcast-address='192.168.1.18' -data-path=/nsq_data_2



?编辑


注意:此种情况,在启动nsqd的时候,需要指定-broadcast-address,否则客户端会无法连接至 nsqd.


此参数的主要作用是指明要广播注册到nsqlookupd中的nsqd 自身的地址


(3)启动一个客户端应用


package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "os"
   "os/signal"
   "syscall"
)

type  StdoutHandler struct {
   topicName  string
}

func (sh *StdoutHandler) HandleMessage( m *nsq.Message)  error {
   _, err := os.Stdout.WriteString(sh.topicName)
   if err != nil  {
      log.Fatalf("ERROR: failed to write to os.Stdout - %s",err)
   }

   _, err = os.Stdout.WriteString(" | ")
   if err != nil {
      log.Fatalf("Error: failed to write to os.Stdout - %s",err)
   }

   _, err = os.Stdout.Write(m.Body)
   if err != nil {
      log.Fatalf("Error: failed to write to os.Stdout - %s",err)
   }

   _, err = os.Stdout.WriteString("\n")
   if err != nil {
      log.Fatalf("Error: failed to write to os.Stdout - %s",err)
   }
   return nil
}

func main() {

   cfg := nsq.NewConfig()

   //nsqdTCPAddrs := []string {"192.168.1.18:7000","192.168.1.18:8000"}
   lookupdHTTPAddrs := []string {"192.168.1.18:7201","192.168.1.18:8201"}
   topicName := "HelloWordTopic"
   channel := "HelloWordChan"

   // 设置信号量
   sigChan := make(chan os.Signal,1)
   signal.Notify(sigChan,syscall.SIGINT,syscall.SIGTERM)

   fmt.Println("job is running......")

   //创建 Consumer
   consumer, err := nsq.NewConsumer(topicName, channel, cfg)
   if err != nil {
      log.Fatal(err)
   }

    // 创建 Handler
    stdOutHandler := &StdoutHandler{topicName: topicName}

    //为Consumer添加Handler
    consumer.AddHandler(stdOutHandler)
    //连接NSQDS
   //err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
   //连接 NSQLOOKUPD
   err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
   if err != nil {
      log.Fatal(err)
   }

   //取得当前连接信息
   stats := consumer.Stats()
   if stats.Connections == 0 {
      log.Fatal("stats report 0 connections (should be > 0)")
   }

   <- sigChan

}



正常情况下,输出结果如下:


?编辑


主要通过方法


err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)


连接nsqlookupd.


底层是通过发起一个HTTP方法:


http://192.168.1.18:7201/lookup?topic=HelloWordTopic


http://192.168.1.18:8201/lookup?topic=HelloWordTopic


来获取到指定主题(topic=HelloWordTopic)的nsqd列表信息,然后再对所有的nsqd实例进行连接。


(3)客户端轮询


官方文档中写道:


  • If the consumer is configured to discover instances via nsqlookupd, reconnection should be handled automatically based on the polling interval (i.e. if a consumer disconnects from an nsqd, the client library should only attempt to reconnect if that instance is discovered by a subsequent nsqlookupd polling round). This ensures that consumers can learn about nsqd that are introduced to the topology and ones that are removed (or failed).


意思是:如果消费者使用nsqlookupd进行服务发现,客户端应用会基于配置的轮询间隔自动的进行重连。比如,如果一个消费者断开了与一个nsqd 的连接,那么客户端应用应该只能在该实例被随后的nsqlookupd轮询发现时才会尝试重新连接。这使得客户端能够充分了解网络拓扑中的nsqd的生命状态(正常、失败或被删除的等)。


在客户端实例代码的打印日志中可以看出


?编辑


客户端每隔一分钟就会查询一次nsqlookupd,来获知nsqd实例的状态。


同时在后端的nsqdlookup的日志中,也记录了这次查询:


?编辑


(4)启动nsqadmin,查看所有的 topic


./nsqadmin  --lookupd-http-address=192.168.1.18:8201 --lookupd-http-address=192.168.1.18:7201



?编辑


浏览器中输入:http://192.168.1.18:4171/


?编辑


会发现topics的相关信息:


?编辑


?编辑


(5)水平扩展nsqd,验证服务发现


./nsqd -tcp-address ":6000" -http-address ":6001" --lookupd-tcp-address=192.168.1.18:8200 --lookupd-tcp-address=192.168.1.18:7200 -broadcast-address='192.168.1.18' -data-path=/nsq_data_3



?编辑


同时两个nsqlookupd实例,会监听到新加入的nsqd实例


?编辑


?编辑


由于新建立的nsqd实例还没有指定topic,因此需要指定要处理的topic


curl -X POST 'http://192.168.1.18:6001/topic/create?topic=HelloWordTopic'



在客户端的输出日志中,可以看到已经连接到新增加的nsqd实例


?编辑


(6)手动关闭一个nsqd实例,验证服务发现功能


关闭 -tcp-address ":8000"这个nsqd实例


?编辑


客户端输出日志如下:


?编辑


可以看出,客户端通过nsqlookupd查询的nsqd 实例都是有效可用的。



?

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表