# File lib/em/worker.rb, line 5 def initialize(options={}) @task_queue = EM::Queue.new @concurrency = options.fetch(:concurrency, 12) start end
# File lib/em/worker.rb, line 11 def enqueue(task=nil, callback=nil, options={}, &block) task ||= block || Proc.new {} @task_queue.push([task, callback, options]) true end
# File lib/em/worker.rb, line 19 def do_task @task_queue.pop do |task, callback, options| EM.defer(task, Proc.new { |result| callback.call(result) if callback; do_task }) end end
# File lib/em/worker.rb, line 25 def start @concurrency.times do EM.next_tick do do_task end end end