gocelery

Go 中的 Celery 分布式任务队列。(Celery Distributed Task Queue in Go)

  • 所有者: gocelery/gocelery
  • 平台: Linux, Mac, Windows
  • 許可證: MIT License
  • 分類:
  • 主題:
  • 喜歡:
    0
      比較:

Github星跟蹤圖

gocelery

用于 Celery 分布式任务队列的 Go 客户端/服务器。

为什么?

参与了几个将服务器从 Python 迁移到 Go 的项目,我意识到 Go 可以提高现有 python Web 应用程序的性能。 由于 Celery 分布式任务通常用于此类 Web 应用程序,因此该库允许您在 Go 中实现 Celery workers 并提交 Celery 任务。

您还可以将此库用作纯分布式任务队列。

支持的中间人/后端(Brokers/Backends)

现在支持 Redis 和 AMQP !!

  • Redis(中间人/后端)
  • AMQP(中间人/后端) - 不允许同时使用频道

Celery 配置

必须将 Celery 配置为使用 json 而不是默认的 pickle 编码。 这是因为 Go 目前没有对 pickle 对象解码的稳定支持。通过以下配置参数来使用 json。

从版本4.0开始,Celery 使用消息协议版本2作为默认值。 GoCelery 尚不支持消息协议版本2,因此您必须将 CELERY_TASK_PROTOCOL 显式设置为1。

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

示例

GoCelery GoDoc 有很好的例子。

另请查看 example 目录以获取示例 python 代码。

GoCelery Worker 示例

运行在 Go 中实现的 Celery Worker

// initialize celery client
cli, _ := NewCeleryClient(
    NewRedisCeleryBroker("redis://"),
    NewRedisCeleryBackend("redis://"),
    5, // number of workers
)
// task
add := func(a, b int) int {
    return a + b
}
// register task
cli.Register("worker.add", add)
// start workers (non-blocking call)
cli.StartWorker()
// wait for client request
time.Sleep(10 * time.Second)
// stop workers gracefully (blocking call)
cli.StopWorker()

Python 客户端示例

从 Python 客户端提交任务。

from celery import Celery
app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)
@app.task
def add(x, y):
    return x + y
if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())

Python Worker 示例

运行用 Python 实现的 Celery Worker

from celery import Celery
app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)
@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery 客户端示例

从 Go 客户端提交任务

func main() {
    // initialize celery client
    cli, _ := NewCeleryClient(
        NewRedisCeleryBroker("redis://"),
        NewRedisCeleryBackend("redis://"),
        1,
    )
    // prepare arguments
    taskName := "worker.add"
    argA := rand.Intn(10)
    argB := rand.Intn(10)
    // run task
    asyncResult, err := cli.Delay(taskName, argA, argB)
    if err != nil {
        panic(err)
    }
    // get results from backend with timeout
    res, err := asyncResult.Get(10 * time.Second)
    if err != nil {
        panic(err)
    }
    log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
}

Celery 任务消息示例

Celery 消息协议版本1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

项目

如果您在项目中使用 gocelery,请告诉我们!

贡献

非常欢迎您做出任何贡献。 请为任何更改创建 Pull Request。

许可

gocelery 是在 MIT 许可下提供的。

(First edition: vz edited at 2019.08.13)

主要指標

概覽
名稱與所有者gocelery/gocelery
主編程語言Go
編程語言Go (語言數: 3)
平台Linux, Mac, Windows
許可證MIT License
所有者活动
創建於2016-07-08 02:31:41
推送於2023-09-12 10:17:12
最后一次提交2020-11-10 22:48:04
發布數1
最新版本名稱v1.0 (發布於 )
第一版名稱v1.0 (發布於 )
用户参与
星數2.4k
關注者數45
派生數240
提交數88
已啟用問題?
問題數103
打開的問題數47
拉請求數38
打開的拉請求數8
關閉的拉請求數34
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status
Coverage Status
Go Report Card
"Open Issues"
"Latest Release"
GoDoc
License
FOSSA Status

Why?

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications.
As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding.
This is because Go currently has no stable support for decoding pickle objects.
Pass below configuration parameters to use json.

Starting from version 4.0, Celery uses message protocol version 2 as default value.
GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

Example

GoCelery GoDoc has good examples.
Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	5, // number of workers
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("worker.add", add)

// start workers (non-blocking call)
cli.StartWorker()

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()

Python Client Example

Submit Task from Python Client

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())

Python Worker Example

Run Celery Worker implemented in Python

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery Client Example

Submit Task from Go Client

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
	panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
	panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Sample Celery Task Message

Celery Message Protocol Version 1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Projects

Please let us know if you use gocelery in your project!

Contributing

You are more than welcome to make any contributions.
Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.