gocelery

Go 中的 Celery 分布式任务队列。(Celery Distributed Task Queue in Go)

  • 所有者: gocelery/gocelery
  • 平台: Linux, Mac, Windows
  • 许可证: MIT License
  • 分类:
  • 主题:
  • 喜欢:
    0
      比较:

Github星跟踪图

gocelery

用于 Celery 分布式任务队列的 Go 客户端/服务器。

为什么?

参与了几个将服务器从 Python 迁移到 Go 的项目,我意识到 Go 可以提高现有 python Web 应用程序的性能。 由于 Celery 分布式任务通常用于此类 Web 应用程序,因此该库允许您在 Go 中实现 Celery workers 并提交 Celery 任务。

您还可以将此库用作纯分布式任务队列。

支持的中间人/后端(Brokers/Backends)

现在支持 Redis 和 AMQP !!

  • Redis(中间人/后端)
  • AMQP(中间人/后端) - 不允许同时使用频道

Celery 配置

必须将 Celery 配置为使用 json 而不是默认的 pickle 编码。 这是因为 Go 目前没有对 pickle 对象解码的稳定支持。通过以下配置参数来使用 json。

从版本4.0开始,Celery 使用消息协议版本2作为默认值。 GoCelery 尚不支持消息协议版本2,因此您必须将 CELERY_TASK_PROTOCOL 显式设置为1。

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

示例

GoCelery GoDoc 有很好的例子。

另请查看 example 目录以获取示例 python 代码。

GoCelery Worker 示例

运行在 Go 中实现的 Celery Worker

// initialize celery client
cli, _ := NewCeleryClient(
    NewRedisCeleryBroker("redis://"),
    NewRedisCeleryBackend("redis://"),
    5, // number of workers
)
// task
add := func(a, b int) int {
    return a + b
}
// register task
cli.Register("worker.add", add)
// start workers (non-blocking call)
cli.StartWorker()
// wait for client request
time.Sleep(10 * time.Second)
// stop workers gracefully (blocking call)
cli.StopWorker()

Python 客户端示例

从 Python 客户端提交任务。

from celery import Celery
app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)
@app.task
def add(x, y):
    return x + y
if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())

Python Worker 示例

运行用 Python 实现的 Celery Worker

from celery import Celery
app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)
@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery 客户端示例

从 Go 客户端提交任务

func main() {
    // initialize celery client
    cli, _ := NewCeleryClient(
        NewRedisCeleryBroker("redis://"),
        NewRedisCeleryBackend("redis://"),
        1,
    )
    // prepare arguments
    taskName := "worker.add"
    argA := rand.Intn(10)
    argB := rand.Intn(10)
    // run task
    asyncResult, err := cli.Delay(taskName, argA, argB)
    if err != nil {
        panic(err)
    }
    // get results from backend with timeout
    res, err := asyncResult.Get(10 * time.Second)
    if err != nil {
        panic(err)
    }
    log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))
}

Celery 任务消息示例

Celery 消息协议版本1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

项目

如果您在项目中使用 gocelery,请告诉我们!

贡献

非常欢迎您做出任何贡献。 请为任何更改创建 Pull Request。

许可

gocelery 是在 MIT 许可下提供的。

(First edition: vz edited at 2019.08.13)

主要指标

概览
名称与所有者gocelery/gocelery
主编程语言Go
编程语言Go (语言数: 3)
平台Linux, Mac, Windows
许可证MIT License
所有者活动
创建于2016-07-08 02:31:41
推送于2023-09-12 10:17:12
最后一次提交2020-11-10 22:48:04
发布数1
最新版本名称v1.0 (发布于 )
第一版名称v1.0 (发布于 )
用户参与
星数2.4k
关注者数45
派生数247
提交数88
已启用问题?
问题数103
打开的问题数47
拉请求数38
打开的拉请求数8
关闭的拉请求数34
项目设置
已启用Wiki?
已存档?
是复刻?
已锁定?
是镜像?
是私有?

gocelery

Go Client/Server for Celery Distributed Task Queue

Build Status
Coverage Status
Go Report Card
"Open Issues"
"Latest Release"
GoDoc
License
FOSSA Status

Why?

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications.
As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

demo

Supported Brokers/Backends

Now supporting both Redis and AMQP!!

  • Redis (broker/backend)
  • AMQP (broker/backend) - does not allow concurrent use of channels

Celery Configuration

Celery must be configured to use json instead of default pickle encoding.
This is because Go currently has no stable support for decoding pickle objects.
Pass below configuration parameters to use json.

Starting from version 4.0, Celery uses message protocol version 2 as default value.
GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_ENABLE_UTC=True,
CELERY_TASK_PROTOCOL=1,

Example

GoCelery GoDoc has good examples.
Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	5, // number of workers
)

// task
add := func(a, b int) int {
	return a + b
}

// register task
cli.Register("worker.add", add)

// start workers (non-blocking call)
cli.StartWorker()

// wait for client request
time.Sleep(10 * time.Second)

// stop workers gracefully (blocking call)
cli.StopWorker()

Python Client Example

Submit Task from Python Client

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y

if __name__ == '__main__':
    ar = add.apply_async((5456, 2878), serializer='json')
    print(ar.get())

Python Worker Example

Run Celery Worker implemented in Python

from celery import Celery

app = Celery('tasks',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379'
)

@app.task
def add(x, y):
    return x + y
celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle

GoCelery Client Example

Submit Task from Go Client

// create redis connection pool
redisPool := &redis.Pool{
  Dial: func() (redis.Conn, error) {
		c, err := redis.DialURL("redis://")
		if err != nil {
			return nil, err
		}
		return c, err
	},
}

// initialize celery client
cli, _ := gocelery.NewCeleryClient(
	gocelery.NewRedisBroker(redisPool),
	&gocelery.RedisCeleryBackend{Pool: redisPool},
	1,
)

// prepare arguments
taskName := "worker.add"
argA := rand.Intn(10)
argB := rand.Intn(10)

// run task
asyncResult, err := cli.Delay(taskName, argA, argB)
if err != nil {
	panic(err)
}

// get results from backend with timeout
res, err := asyncResult.Get(10 * time.Second)
if err != nil {
	panic(err)
}

log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res))

Sample Celery Task Message

Celery Message Protocol Version 1

{
    "expires": null,
    "utc": true,
    "args": [5456, 2878],
    "chord": null,
    "callbacks": null,
    "errbacks": null,
    "taskset": null,
    "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b",
    "retries": 0,
    "task": "worker.add",
    "timelimit": [null, null],
    "eta": null,
    "kwargs": {}
}

Projects

Please let us know if you use gocelery in your project!

Contributing

You are more than welcome to make any contributions.
Please create Pull Request for any changes.

LICENSE

The gocelery is offered under MIT license.