parallel

Ruby: parallel processing made simple and fast

  • 所有者: grosser/parallel
  • 平台:
  • 許可證: MIT License
  • 分類:
  • 主題:
  • 喜歡:
    0
      比較:

Github星跟蹤圖

Parallel

Gem Version
Build Status

Run any code in parallel Processes(> use all CPUs) or Threads(> speedup blocking operations).
Best suited for map-reduce or e.g. parallel downloads/uploads.

Install

gem install parallel

Usage

# 2 CPUs -> work in 2 processes (a,b + c)
results = Parallel.map(['a','b','c']) do, one_letter, expensive_calculation(one_letter)
end

# 3 Processes -> finished after 1 run
results = Parallel.map(['a','b','c'], in_processes: 3) {, one_letter, ... }

# 3 Threads -> finished after 1 run
results = Parallel.map(['a','b','c'], in_threads: 3) {, one_letter, ... }

Same can be done with each

Parallel.each(['a','b','c']) {, one_letter, ... }

or each_with_index, map_with_index, flat_map

Produce one item at a time with lambda (anything that responds to .call) or Queue.

items = [1,2,3]
Parallel.each( -> { items.pop, Parallel::Stop }) {, number, ... }

You can also call any? or all?, which work the same way as Array#any? and Array#all?.

Parallel.any?([1,2,3,4,5,6,7]) {, number, number == 4 }
# => true

Parallel.all?([1,2,nil,4,5]) {, number, number != nil }
# => false

Processes/Threads are workers, they grab the next piece of work when they finish.

Processes

  • Speedup through multiple CPUs
  • Speedup for blocking operations
  • Variables are protected from change
  • Extra memory used
  • Child processes are killed when your main process is killed through Ctrl+c or kill -2

Threads

  • Speedup for blocking operations
  • Variables can be shared/modified
  • No extra memory used

ActiveRecord

Connection Lost

  • Multithreading needs connection pooling, forks need reconnects
  • Adjust connection pool size in config/database.yml when multithreading
# reproducibly fixes things (spec/cases/map_with_ar.rb)
Parallel.each(User.all, in_processes: 8) do, user, user.update_attribute(:some_attribute, some_value)
end
User.connection.reconnect!

# maybe helps: explicitly use connection pool
Parallel.each(User.all, in_threads: 8) do, user, ActiveRecord::Base.connection_pool.with_connection do
    user.update_attribute(:some_attribute, some_value)
  end
end

# maybe helps: reconnect once inside every fork
Parallel.each(User.all, in_processes: 8) do, user, @reconnected, = User.connection.reconnect!, true
  user.update_attribute(:some_attribute, some_value)
end

NameError: uninitialized constant

A race happens when ActiveRecord models are autoloaded inside parallel threads
in environments that lazy-load, like development, test, or migrations.

To fix, autoloaded classes before the parallel block with either require '<modelname>' or ModelName.class.

Break

Parallel.map(User.all) do, user, raise Parallel::Break # -> stops after all current items are finished
end

Kill

Only use if whatever is executing in the sub-command is safe to kill at any point

Parallel.map([1,2,3]) do, x, raise Parallel::Kill if x == 1# -> stop all sub-processes, killing them instantly
  sleep 100 # Do stuff
end

Progress / ETA

# gem install ruby-progressbar

Parallel.map(1..50, progress: "Doing stuff") { sleep 1 }

# Doing stuff, ETA: 00:00:02, ====================, Time: 00:00:10

Use :finish or :start hook to get progress information.

  • :start has item and index
  • :finish has item, index, result

They are called on the main process and protected with a mutex.

Parallel.map(1..100, finish: -> (item, i, result) { ... do something ... }) { sleep 1 }

NOTE: If all you are trying to do is get the index, it is much more performant to use each_with_index instead.

Worker number

Use Parallel.worker_number to determine the worker slot in which your
task is running.

Parallel.each(1..5, :in_processes => 2) {, i, puts "Item: #{i}, Worker: #{Parallel.worker_number}" }
Item: 1, Worker: 1
Item: 2, Worker: 0
Item: 3, Worker: 1
Item: 4, Worker: 0
Item: 5, Worker: 1

Tips

Here are a few notable options.

  • [Benchmark/Test] Disable threading/forking with in_threads: 0 or in_processes: 0, great to test performance or to debug parallel issues
  • [Isolation] Do not reuse previous worker processes: isolation: true
  • [Stop all processses with an alternate interrupt signal] 'INT' (from ctrl+c) is caught by default. Catch 'TERM' (from kill) with interrupt_signal: 'TERM'
  • [Process count via ENV] PARALLEL_PROCESSOR_COUNT=16 will use 16 instead of the number of processors detected. This is used to reconfigure a tool using parallel without inserting custom logic.

TODO

  • Replace Signal trapping with simple rescue Interrupt handler

Authors

Contributors

Michael Grosser
michael@grosser.it
License: MIT

主要指標

概覽
名稱與所有者grosser/parallel
主編程語言Ruby
編程語言Ruby (語言數: 1)
平台
許可證MIT License
所有者活动
創建於2009-08-11 18:54:16
推送於2025-04-14 20:13:34
最后一次提交2025-04-14 13:13:29
發布數122
最新版本名稱v1.27.0 (發布於 2025-04-14 13:13:31)
第一版名稱v0.1.0 (發布於 )
用户参与
星數4.2k
關注者數73
派生數256
提交數803
已啟用問題?
問題數189
打開的問題數34
拉請求數116
打開的拉請求數1
關閉的拉請求數48
项目设置
已啟用Wiki?
已存檔?
是復刻?
已鎖定?
是鏡像?
是私有?