Oban

以现代 PostgreSQL 为后盾,在 Elixir 中进行强大的作业处理。「💎 Robust job processing in Elixir, backed by modern PostgreSQL」

  • 所有者: sorentwo/oban
  • 平台: BSD, Docker, Linux, Mac, Raspbian, Windows
  • 许可证: Apache License 2.0
  • 分类:
  • 主题:
  • 喜欢:
    0
      比较:

Github星跟踪图

Table of Contents

Features

Oban's primary goals are reliability, consistency and observability.
It is fundamentally different from other background job processing tools because
it retains job data for historic metrics and inspection. You can leave your
application running indefinitely without worrying about jobs being lost or
orphaned due to crashes.

Advantages over in-memory, mnesia, Redis and RabbitMQ based tools:

  • Fewer Dependencies — If you are running a web app there is a very good
    chance that you're running on top of a RDBMS. Running your job queue
    within PostgreSQL minimizes system dependencies and simplifies data backups.
  • Transactional Control — Enqueue a job along with other database changes,
    ensuring that everything is committed or rolled back atomically.
  • Database Backups — Jobs are stored inside of your primary database, which
    means they are backed up together with the data that they relate to.

Advanced features and advantages over other RDBMS based tools:

  • Isolated Queues — Jobs are stored in a single table but are executed in
    distinct queues. Each queue runs in isolation, ensuring that a job in a single
    slow queue can't back up other faster queues.
  • Queue Control — Queues can be started, stopped, paused, resumed and scaled
    independently at runtime.
  • Resilient Queues — Failing queries won't crash the entire supervision tree,
    instead they trip a circuit breaker and will be retried again in the future.
  • Job Killing — Jobs can be killed in the middle of execution regardless of
    which node they are running on. This stops the job at once and flags it as
    discarded.
  • Triggered Execution — Database triggers ensure that jobs are dispatched as
    soon as they are inserted into the database.
  • Unique Jobs — Duplicate work can be avoided through unique job controls.
    Uniqueness can be enforced at the argument, queue and worker level for any
    period of time.
  • Scheduled Jobs — Jobs can be scheduled at any time in the future, down to
    the second.
  • Periodic (CRON) Jobs — Automatically enqueue jobs on a cron-like schedule.
    Duplicate jobs are never enqueued, no matter how many nodes you're running.
  • Job Priority — Prioritize jobs within a queue to run ahead of others.
  • Job Safety — When a process crashes or the BEAM is terminated executing
    jobs aren't lost—they are quickly recovered by other running nodes or
    immediately when the node is restarted.
  • Historic Metrics — After a job is processed the row is not deleted.
    Instead, the job is retained in the database to provide metrics. This allows
    users to inspect historic jobs and to see aggregate data at the job, queue or
    argument level.
  • Node Metrics — Every queue records metrics to the database during runtime.
    These are used to monitor queue health across nodes and may be used for
    analytics.
  • Queue Draining — Queue shutdown is delayed so that slow jobs can finish
    executing before shutdown. When shutdown starts queues are paused and stop
    executing new jobs. Any jobs left running after the shutdown grace period may
    be rescued later.
  • Telemetry Integration — Job life-cycle events are emitted via
    Telemetry integration. This enables simple logging, error reporting
    and health checkups without plug-ins.

Requirements

Oban has been developed and actively tested with Elixir 1.8+, Erlang/OTP 21.1+
and PostgreSQL 11.0+. Running Oban currently requires Elixir 1.8+, Erlang 21+,
and PostgreSQL 9.6+.

UI

A web-based user interface for monitoring and managing Oban is available as a
private beta. Learn more about it and register for the beta at
oban.dev.

Installation

Oban is published on Hex. Add it to your list of
dependencies in mix.exs:

def deps do
  [
    {:oban, "1.0.0"}
  ]
end

Then run mix deps.get to install Oban and its dependencies, including
Ecto, Jason and Postgrex.

After the packages are installed you must create a database migration to
add the oban_jobs table to your database:

mix ecto.gen.migration add_oban_jobs_table

Open the generated migration in your editor and call the up and down
functions on Oban.Migrations:

defmodule MyApp.Repo.Migrations.AddObanJobsTable do
  use Ecto.Migration

  def up do
    Oban.Migrations.up()
  end

  # We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
  # necessary, regardless of which version we've migrated `up` to.
  def down do
    Oban.Migrations.down(version: 1)
  end
end

This will run all of Oban's versioned migrations for your database. Migrations
between versions are idempotent and will never change after a release. As new
versions are released you may need to run additional migrations.

Now, run the migration to create the table:

mix ecto.migrate

Next see Usage for how to integrate Oban into your application and
start defining jobs!

Note About Releases

If you are using releases you may see Postgrex errors logged during your initial
deploy (or any deploy requiring an Oban migration). The errors are only
temporary! After the migration has completed each queue will start producing
jobs normally.

Usage

Oban is a robust job processing library which uses PostgreSQL for storage and
coordination.

Each Oban instance is a supervision tree and not an application. That means it
won't be started automatically and must be included in your application's
supervision tree. All of your configuration is passed into the supervisor,
allowing you to configure Oban like the rest of your application:

# config/config.exs
config :my_app, Oban,
  repo: MyApp.Repo,
  prune: {:maxlen, 10_000},
  queues: [default: 10, events: 50, media: 20]

# lib/my_app/application.ex
defmodule MyApp.Application do
  @moduledoc false

  use Application

  alias MyApp.{Endpoint, Repo}

  def start(_type, _args) do
    children = [
      Repo,
      Endpoint,
      {Oban, Application.get_env(:my_app, Oban)}
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end
end

If you are running tests (which you should be) you'll want to disable pruning
and job dispatching altogether when testing:

# config/test.exs
config :my_app, Oban, crontab: false, queues: false, prune: :disabled

Without dispatch and pruning disabled Ecto will raise constant ownership errors
and you won't be able to run tests.

Configuring Queues

Queues are specified as a keyword list where the key is the name of the queue
and the value is the maximum number of concurrent jobs. The following
configuration would start four queues with concurrency ranging from 5 to 50:

queues: [default: 10, mailers: 20, events: 50, media: 5]

There isn't a limit to the number of queues or how many jobs may execute
concurrently in each queue. Here are a few caveats and guidelines:

Caveats & Guidelines

  • Each queue will run as many jobs as possible concurrently, up to the
    configured limit. Make sure your system has enough resources (i.e. database
    connections) to handle the concurrent load.

  • Queue limits are local (per-node), not global (per-cluster). For example,
    running a queue with a local limit of one on three separate nodes is
    effectively a global limit of three. If you require a global limit you must
    restrict the number of nodes running a particular queue.

  • Only jobs in the configured queues will execute. Jobs in any other queue will
    stay in the database untouched.

  • Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg,
    ImageMagick). The BEAM ensures that the system stays responsive under load,
    but those guarantees don't apply when using ports or shelling out commands.

Defining Workers

Worker modules do the work of processing a job. At a minimum they must define a
perform/2 function, which is called with an args map and the job struct.

Note that when Oban calls perform/2, the args map given when enqueueing the
job is deserialized from the PostgreSQL jsonb data type and therefore map keys
are converted to strings.

Define a worker to process jobs in the events queue:

defmodule MyApp.Business do
  use Oban.Worker, queue: :events

  @impl Oban.Worker
  def perform(%{"id" => id} = args, _job) do
    model = MyApp.Repo.get(MyApp.Business.Man, id)

    case args do
      %{"in_the" => "business"} ->
        # handle business job
        IO.inspect(model)

      %{"vote_for" => vote} ->
        # handle vote job
        IO.inspect(model)

      _ ->
        IO.inspect(model)
    end
  end
end

The use macro also accepts options to customize max attempts, priority, tags,
and uniqueness:

defmodule MyApp.LazyBusiness do
  use Oban.Worker,
    queue: :events,
    priority: 3,
    max_attempts: 3,
    tags: ["business"],
    unique: [period: 30]

  @impl Oban.Worker
  def perform(_args, _job) do
    # do business slowly
  end
end

The value returned from perform/2 is ignored, unless it an {:error, reason}
tuple. With an error return or when perform has an uncaught exception or throw
then the error is reported and the job is retried (provided there are attempts
remaining).

See the Oban.Worker docs for more details on failure conditions and
Oban.Telemetry for details on job reporting.

Enqueueing Jobs

Jobs are simply Ecto structs and are enqueued by inserting them into the
database. For convenience and consistency all workers provide a new/2
function that converts an args map into a job changeset suitable for insertion:

%{id: 1, in_the: "business", of_doing: "business"}

概览

名称与所有者sorentwo/oban
主编程语言Elixir
编程语言Elixir (语言数: 1)
平台BSD, Docker, Linux, Mac, Raspbian, Windows
许可证Apache License 2.0
发布数86
最新版本名称v2.17.10 (发布于 )
第一版名称v0.1.0 (发布于 )
创建于2019-02-25 15:29:11
推送于2024-04-26 17:35:57
最后一次提交
星数3k
关注者数43
派生数292
提交数1.4k
已启用问题?
问题数543
打开的问题数2
拉请求数445
打开的拉请求数0
关闭的拉请求数85
已启用Wiki?
已存档?
是复刻?
已锁定?
是镜像?
是私有?
去到顶部