grabbit

A lightweight transactional message bus on top of RabbitMQ

Github星跟蹤圖

CircleCI
Go Report Card
Coverage Status
GitHub release

grabbit

A lightweight transactional message bus on top of RabbitMQ supporting:

  1. Supported messaging semantics
    • One Way
    • Duplex
    • Publish/Subscribe
    • Request/Reply (RPC)
  2. Long running processes via the Saga pattern
  3. Retry and backoffs
  4. Publisher confirms
  5. Reliable messaging and local service transactivity via Transaction Outbox pattern
  6. Deadlettering
  7. Structured logging
  8. Reporting Metrics via Prometheus
  9. Distributed Tracing via OpenTracing

Stable release

the v1.x branch contains the latest stable releases of grabbit and one should track that branch to get point and minor release updates.

Supported transactional resources

  1. MySql > 8.0 (InnoDB)

Supported serializers

  1. gob
  2. Avro
  3. Protobuf

Instrumentation

  1. Opentracing

Usage

The following outlines the basic usage of grabbit.
For a complete view of how you would use grabbit including how to write saga's and handle deadlettering refer to grabbit/tests package

import (
  "github.com/wework/grabbit/gbus"
  "github.com/wework/grabbit/gbus/builder"
)

Define a message

type SomeMessage struct {}

func(SomeMessage) SchemaName() string{
   return "some.unique.namespace.somemessage"
}

Creating a transactional GBus instance

gb := builder.
        New().
    Bus("connection string to RabbitMQ").
    Txnl("mysql", "connection string to mysql").
    WithConfirms().
    Build("name of your service")

Register a command handler



handler := func(invocation gbus.Invocation, message *gbus.BusMessage) error{
    cmd, ok := message.Payload.(*SomeCommand)
    if ok {
      fmt.Printf("handler invoked with  message %v", cmd)
            return nil
    }

        return fmt.Errorf("failed to handle message")
  }

gb.HandleMessage(SomeCommand{}, handler)

Register an event handler



eventHandler := func(invocation gbus.Invocation, message *gbus.BusMessage) {
    evt, ok := message.Payload.(*SomeEvent)
    if ok {
      fmt.Printf("handler invoked with event %v", evt)
            return nil
    }

        return fmt.Errorf("failed to handle event")
  }

gb.HandleEvent("name of exchange", "name of topic", SomeEvent{}, eventHandler)

Start the bus

gb.Start()
defer gb.Shutdown()

Send a command

gb.Send(context.Background(), "name of service you are sending the command to", gbus.NewBusMessage(SomeCommand{}))

Publish an event

gb.Publish(context.Background(), "name of exchange", "name of topic", gbus.NewBusMessage(SomeEvent{}))

RPC style call



request := gbus.NewBusMessage(SomeRPCRequest{})
reply := gbus.NewBusMessage(SomeRPCReply{})
timeOut := 2 * time.Second

reply, e := gb.RPC(context.Background(), "name of service you are sending the request to", request, reply, timeOut)

if e != nil{
  fmt.Printf("rpc call failed with error %v", e)
} else{
  fmt.Printf("rpc call returned with reply %v", reply)
}

Testing

  1. ensure that you have the dependencies installed: go get -v -t -d ./...
  2. make sure to first: docker-compose up -V -d
  3. then to run the tests: go test ./...

主要指標

概覽
名稱與所有者wework/grabbit
主編程語言Go
編程語言Go (語言數: 1)
平台
許可證Apache License 2.0
所有者活动
創建於2019-04-04 18:30:07
推送於2023-02-25 04:08:39
最后一次提交2019-12-14 09:39:38
發布數13
最新版本名稱v1.1.8 (發布於 )
第一版名稱v1.0.0 (發布於 )
用户参与
星數98
關注者數7
派生數20
提交數425
已啟用問題?
問題數75
打開的問題數19
拉請求數138
打開的拉請求數7
關閉的拉請求數15
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?