Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package zeromq
import (
"context"
"strconv"
"strings"
zmq "github.com/go-zeromq/zmq4"
)
// Simple zmq publisher abstraction
type Publisher struct {
socket zmq.Socket
}
// Create a new publisher.
func NewPublisher() (*Publisher, error) {
socket := zmq.NewPub(context.Background())
return &Publisher{
socket: socket,
}, nil
}
// Start the publisher on the given port.
func (pub *Publisher) Start(port int) error {
return pub.socket.Listen("tcp://*:" + strconv.Itoa(port))
}
// Stop the publisher.
func (pub *Publisher) Shutdown() error {
return pub.socket.Close()
}
// Publish a new list of messages.
func (pub *Publisher) Send(messages []string) error {
if len(messages) == 0 || len(messages[0]) == 0 {
panic("zmq: invalid messages")
}
data := strings.Join(messages, " ")
msg := zmq.NewMsgString(data)
return pub.socket.Send(msg)
}