NATS - Go Client
A Go client for the NATS messaging system.



Installation
go get github.com/nats-io/nats.go/
go get github.com/nats-io/nats-server
When using or transitioning to Go modules support:
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.9.1
go get github.com/nats-io/nats-server/v2
Basic Usage
import nats "github.com/nats-io/nats.go"
nc, _ := nats.Connect(nats.DefaultURL)
nc.Publish("foo", []byte("Hello World"))
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})
nc.Subscribe("request", func(m *nats.Msg) {
m.Respond([]byte("answer is 42"))
})
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch
sub.Unsubscribe()
sub.Drain()
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)
nc.Subscribe("help", func(m *nats.Msg) {
nc.Publish(m.Reply, []byte("I can help!"))
})
nc.Drain()
nc.Close()
Encoded Connections
nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()
c.Publish("foo", "Hello World")
c.Subscribe("foo", func(s string) {
fmt.Printf("Received a message: %s\n", s)
})
type person struct {
Name string
Address string
Age int
}
c.Subscribe("hello", func(p *person) {
fmt.Printf("Received a person: %+v\n", p)
})
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}
c.Publish("hello", me)
sub, err := c.Subscribe("foo", nil)
sub.Unsubscribe()
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
fmt.Printf("Request failed: %v\n", err)
}
c.Subscribe("help", func(subj, reply string, msg string) {
c.Publish(reply, "I can help!")
})
c.Close();
New Authentication (Nkeys and User Credentials)
This requires server with version >= 2.0.0
NATS servers have a new security and authentication mechanism to authenticate with user credentials and Nkeys.
The simplest form is to use the helper method UserCredentials(credsFilepath).
nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))
The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server.
The core client library never has direct access to your private key and simply performs the callback for signing the server challenge.
The helper will load and wipe and erase memory it uses for each connect or reconnect.
The helper also can take two entries, one for the JWT and one for the NKey seed file.
nc, err := nats.Connect(url, nats.UserCredentials("user.jwt", "user.nk"))
You can also set the callback handlers directly and manage challenge signing directly.
nc, err := nats.Connect(url, nats.UserJWT(jwtCB, sigCB))
Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt
> cat seed.txt
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM
This is a helper function which will load and decode and do the proper signing for the server nonce.
It will clear memory in between invocations.
You can choose to use the low level option and provide the public key and a signature callback on your own.
opt, err := nats.NkeyOptionFromSeed("seed.txt")
nc, err := nats.Connect(serverUrl, opt)
nc, err := nats.Connect(serverUrl, nats.Nkey(pubNkey, sigCB))
TLS
nc, err := nats.Connect("tls://nats.demo.io:4443")
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)
certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}
config := &tls.Config{
ServerName: opts.Host,
Certificates: []tls.Certificate{cert},
RootCAs: pool,
MinVersion: tls.VersionTLS12,
}
nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}
Using Go Channels (netchan)
nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()
type person struct {
Name string
Address string
Age int
}
recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)
sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)
me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}
sendCh <- me
who := <- recvCh
Wildcard Subscriptions
nc.Subscribe("foo.*.baz", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
nc.Subscribe("foo.bar.*", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
nc.Subscribe("foo.>", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})
nc.Publish("foo.bar.baz", []byte("Hello World"))
Queue Groups
nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
received += 1;
})
Advanced Usage
nc.Flush()
fmt.Println("All clear!")
err := nc.FlushTimeout(1*time.Second)
if err != nil {
fmt.Println("All clear!")
} else {
fmt.Println("Flushed timed out!")
}
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")
nc1.Subscribe("foo", func(m *Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})
nc2.Publish("foo", []byte("Hello World!"));
Clustered Usage
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"
nc, err := nats.Connect(servers)
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))
nc, err = nats.Connect(servers, nats.DontRandomize())
nc, err = nats.Connect(servers,
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Printf("Got disconnected! Reason: %q\n", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
})
)
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))
nc, err = nats.Connect("nats://localhost:4222",
nats.UserInfo("foo", "bar"),
nats.Token("S3cretT0ken"))
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))
Context support (+Go 1.7)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
nc, err := nats.Connect(nats.DefaultURL)
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
Message string `json:"message"`
}
type response struct {
Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)
License
Unless otherwise noted, the NATS source files are distributed
under the Apache Version 2.0 license found in the LICENSE file.
