delayed_job 的 基本用法

访问量: 655

delayed_job 是用来做某些可以延后的,对时间要求不高的任务,例如: 发送1W个邮件,处理10W个数据,不能在规定的时间内(例如1秒)立即执行完的任务。

delayed_job 有两个版本,一个是最初的:
https://github.com/tobi/delayed_job
另一个是改进的:
https://github.com/collectiveidea/delayed_job

我用的是第二个。它们用法几乎一样,除了在 优先级方面(priority ),后者是数字越大,优先级越低。

文档方面很奇怪,建议两者都要看,互相综合,才能看完整。

下面把我的一点心得贴出来:

1. worker 至关重要。默认是启动一个,看起来是单线程。 启动时配置参数:
$ script/delayed_job -n 4 start  (启动4个worker)

2. 每个job一旦被执行,那么它被lock住,(lock=true),等到该JOB彻底被执行完毕,才会把它从delayed_job 表中删除掉。

3. 可以在 config/initializers 中进行配置:

# delayed_job_config.rb
Delayed::Worker.destroy_failed_jobs = true
Delayed::Worker.sleep_delay = 30
Delayed::Worker.max_attempts = 5 
Delayed::Worker.max_run_time = 10.minutes


4. 几个hooks:
从源代码中可以看到几个hooks:
      def invoke_job
        hook :before
        payload_object.perform
        hook :success
      rescue Exception => e
        hook :error, e
        raise e
      ensure
        hook :after
      end 


比如,定义个before:
class ParanoidNewsletterJob < NewsletterJob
  def perform
    emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
  end

  def before(job)
    record_stat 'newsletter_job/start'
  end
end


5. priority 很重要。 可以为某些任务设置。 但是不要被他所迷惑。 在多worker情况下是不准确的。


例如,我有2 种job, 一种是 drink_water, 一种是: eat_food
那么,光设置 priority 还不行,在多个worker下,还要设置 一个wait_job.

class EatFoodJob < Struct.new(:job_name)
  def perform
    Rails.logger.info "===(#{job_name}) running"
    sleep_time = 1 + rand(5)
    sleep sleep_time
    Rails.logger.info "===(#{job_name}) after #{sleep_time} sleep, done "
  end 
end


class WaitJob < Struct.new(:job_name)
  def perform
    Rails.logger.info "===(#{job_name}, #{self.class}) running"
    loop do
      break unless DelayedJob.where(:priority => 10).all.size > 0 
      Rails.logger.info " === !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds"
      sleep 1
    end 
    Rails.logger.info "===(#{job_name}) done "
  end 
end


class DrinkWaterJob < Struct.new(:job_name)
  def perform
    Rails.logger.info "===(#{job_name}, #{self.class}) running"
    sleep_time = 5 + rand(5)
    sleep sleep_time
    Rails.logger.info "===(#{job_name}) after #{sleep_time} sleep, done "
  end 
end


然后,启动4个worker:
$ script/delayed_job -n 4 start

最后,我们要设置4个wait_job (对应4个worker) :
(1..10).each do |i|
  Delayed::Job.enqueue DrinkWaterJob.new("drink#{i}"), :priority => 10
  Delayed::Job.enqueue EatFoodJob.new("eat#{i}"), :priority => 20
end
(1..4).each do |i|
  Delayed::Job.enqueue WaitJob.new("wait#{i}"), :priority => 15
end


最后,可以从日志中看出,所有的 eat_food( priority = 20  ) 都在 drink_water( priority = 10) 的任务之后执行了,没有越轨的行为。

引用
===(drink1, DrinkWaterJob) running
===(drink2, DrinkWaterJob) running
===(drink3, DrinkWaterJob) running
===(drink4, DrinkWaterJob) running
===(drink3) after 7 sleep, done
===(drink5, DrinkWaterJob) running
===(drink2) after 8 sleep, done
===(drink6, DrinkWaterJob) running
===(drink1) after 9 sleep, done
===(drink7, DrinkWaterJob) running
===(drink4) after 9 sleep, done
===(drink8, DrinkWaterJob) running
===(drink6) after 7 sleep, done
===(drink5) after 8 sleep, done
===(drink9, DrinkWaterJob) running
===(drink10, DrinkWaterJob) running
===(drink7) after 7 sleep, done
===(wait1, WaitJob) running
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
===(drink8) after 8 sleep, done
===(wait2, WaitJob) running
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
===(drink9) after 6 sleep, done
===(wait3, WaitJob) running
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
=== !!!!!!  jobs: priority => 10 not done, now wait for 5 seconds
===(drink10) after 9 sleep, done
===(wait4, WaitJob) running
===(wait4) done
===(eat1) running
===(wait2) done
===(eat2) running
===(wait1) done
===(eat3) running
===(wait3) done
===(eat4) running
===(eat1) after 1 sleep, done
===(eat5) running
===(eat4) after 3 sleep, done
===(eat6) running
===(eat3) after 4 sleep, done
===(eat7) running
===(eat5) after 4 sleep, done
===(eat8) running
===(eat2) after 5 sleep, done
===(eat9) running
===(eat7) after 3 sleep, done
===(eat10) running
===(eat8) after 3 sleep, done
===(eat6) after 5 sleep, done
===(eat9) after 5 sleep, done
===(eat10) after 3 sleep, done



如果不加入那4个wait_job, 就会看到, 在多个worker之下, 有些JOB 不是严格的按照priority来执行的。 这个非常姚明。。。见下面的日志:

(1..10).each do |i|
  Delayed::Job.enqueue DrinkWaterJob.new("drink#{i}"), :priority => 10
  Delayed::Job.enqueue EatFoodJob.new("eat#{i}"), :priority => 20
end


引用

===(drink1, DrinkWaterJob) running
===(drink2, DrinkWaterJob) running
===(drink3, DrinkWaterJob) running
===(drink4, DrinkWaterJob) running
===(drink1) after 8 sleep, done
===(drink5, DrinkWaterJob) running
===(drink2) after 8 sleep, done
===(drink6, DrinkWaterJob) running
===(drink3) after 8 sleep, done
===(drink7, DrinkWaterJob) running
===(drink4) after 7 sleep, done
===(drink8, DrinkWaterJob) running
===(drink5) after 6 sleep, done
===(drink9, DrinkWaterJob) running
===(drink8) after 5 sleep, done
===(drink10, DrinkWaterJob) running
===(drink7) after 8 sleep, done
===(eat1) running
===(drink6) after 9 sleep, done
===(eat2) running
===(drink9) after 5 sleep, done
===(eat3) running
===(eat1) after 2 sleep, done
===(eat4) running
===(eat4) after 2 sleep, done
===(eat5) running
===(eat3) after 4 sleep, done
===(eat6) running
===(eat2) after 5 sleep, done
===(eat7) running
===(drink10) after 9 sleep, done
===(eat8) running
===(eat7) after 2 sleep, done
===(eat9) running
===(eat6) after 3 sleep, done
===(eat8) after 1 sleep, done
===(eat10) running
===(eat5) after 4 sleep, done
===(eat9) after 1 sleep, done
===(eat10) after 2 sleep, done

订阅/RSS Feed

Subscribe

分类/category