正如RabbitMQ文档中提到的那样,建立tcp连接非常昂贵。因此,针对该渠道概念进行了介绍。现在我遇到了这个例子。在main()每次发布消息时,它都会创建连接。 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")。它不应该一次全局声明,并且应该有故障转移机制,以防连接像单例对象那样被关闭。如果amqp.Dial是线程安全的,我想应该是
main()
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
编辑的问题:
我以以下方式处理连接错误。我在其中侦听频道并在出错时创建新的连接。但是当我杀死现有的连接并尝试发布消息时。我收到以下错误。
错误:
2016/03/30 19:20:08 Failed to open a channel: write tcp 172.16.5.48:51085->172.16.0.20:5672: use of closed network connection exit status 1 7:25 PM
代码:
func main() { Conn, err := amqp.Dial("amqp://guest:guest@172.16.0.20:5672/") failOnError(err, "Failed to connect to RabbitMQ") context := &appContext{queueName: "QUEUENAME",exchangeName: "ExchangeName",exchangeType: "direct",routingKey: "RoutingKey",conn: Conn} c := make(chan *amqp.Error) go func() { error := <-c if(error != nil){ Conn, err = amqp.Dial("amqp://guest:guest@172.16.0.20:5672/") failOnError(err, "Failed to connect to RabbitMQ") Conn.NotifyClose(c) } }() Conn.NotifyClose(c) r := web.New() // We pass an instance to our context pointer, and our handler. r.Get("/", appHandler{context, IndexHandler}) graceful.ListenAndServe(":8086", r) }
当然,您不应该为每个请求创建连接。使它成为应用程序上下文的全局变量或更好的部分,您可以在启动时对其进行一次初始化。
您可以通过使用Connection.NotifyClose以下方法注册通道来处理连接错误:
Connection.NotifyClose
func initialize() { c := make(chan *amqp.Error) go func() { err := <-c log.Println("reconnect: " + err.Error()) initialize() }() conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { panic("cannot connect") } conn.NotifyClose(c) // create topology }