gocelery
用于 Celery 分布式任务队列的 Go 客户端/服务器。
为什么?
参与了几个将服务器从 Python 迁移到 Go 的项目,我意识到 Go 可以提高现有 python Web 应用程序的性能。 由于 Celery 分布式任务通常用于此类 Web 应用程序,因此该库允许您在 Go 中实现 Celery workers 并提交 Celery 任务。
您还可以将此库用作纯分布式任务队列。
支持的中间人/后端(Brokers/Backends)
现在支持 Redis 和 AMQP !!
- Redis(中间人/后端)
- AMQP(中间人/后端) - 不允许同时使用频道
Celery 配置
必须将 Celery 配置为使用 json 而不是默认的 pickle 编码。 这是因为 Go 目前没有对 pickle 对象解码的稳定支持。通过以下配置参数来使用 json。
从版本4.0开始,Celery 使用消息协议版本2作为默认值。 GoCelery 尚不支持消息协议版本2,因此您必须将 CELERY_TASK_PROTOCOL 显式设置为1。
CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_ENABLE_UTC=True, CELERY_TASK_PROTOCOL=1,
示例
GoCelery GoDoc 有很好的例子。
另请查看 example 目录以获取示例 python 代码。
GoCelery Worker 示例
运行在 Go 中实现的 Celery Worker
// initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://"), NewRedisCeleryBackend("redis://"), 5, // number of workers ) // task add := func(a, b int) int { return a + b } // register task cli.Register("worker.add", add) // start workers (non-blocking call) cli.StartWorker() // wait for client request time.Sleep(10 * time.Second) // stop workers gracefully (blocking call) cli.StopWorker()
Python 客户端示例
从 Python 客户端提交任务。
from celery import Celery app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379' ) @app.task def add(x, y): return x + y if __name__ == '__main__': ar = add.apply_async((5456, 2878), serializer='json') print(ar.get())
Python Worker 示例
运行用 Python 实现的 Celery Worker
from celery import Celery app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379' ) @app.task def add(x, y): return x + y celery -A worker worker --loglevel=debug --without-heartbeat --without-mingle
GoCelery 客户端示例
从 Go 客户端提交任务
func main() { // initialize celery client cli, _ := NewCeleryClient( NewRedisCeleryBroker("redis://"), NewRedisCeleryBackend("redis://"), 1, ) // prepare arguments taskName := "worker.add" argA := rand.Intn(10) argB := rand.Intn(10) // run task asyncResult, err := cli.Delay(taskName, argA, argB) if err != nil { panic(err) } // get results from backend with timeout res, err := asyncResult.Get(10 * time.Second) if err != nil { panic(err) } log.Printf("result: %+v of type %+v", res, reflect.TypeOf(res)) }
Celery 任务消息示例
Celery 消息协议版本1
{ "expires": null, "utc": true, "args": [5456, 2878], "chord": null, "callbacks": null, "errbacks": null, "taskset": null, "id": "c8535050-68f1-4e18-9f32-f52f1aab6d9b", "retries": 0, "task": "worker.add", "timelimit": [null, null], "eta": null, "kwargs": {} }
项目
如果您在项目中使用 gocelery,请告诉我们!
贡献
非常欢迎您做出任何贡献。 请为任何更改创建 Pull Request。
许可
gocelery 是在 MIT 许可下提供的。
(First edition: vz edited at 2019.08.13)