Celery最佳实践

翻译自原文https://devchecklists.com/celery-tasks-checklist/

  • 更加推荐RabbitMQ或者Redis作为生产环境中的broker

  • 不要在task中使用复杂的对象作为参数,比如避免使用Django Model对象

    # good
    @app.task
    def my_task(user_id):
        user = User.objects.get(id=user_id)
        print(user.name)
    
    # bad
    @app.task
    def my_task(user):
        print(user.name)
    
  • 不要在task中等待其他task的完成

  • 推荐满足幂等性的task

    幂等性是指数学或者计算机科学中某些操作即使被执行多次也不会改变其最初的运算结果

  • 推荐满足原子性的task

    原子性是并发进程运行隔离的保障,院子操作具有成功或者失败的定义,要么成功改变系统的状态,要么没有明显的效果。

  • 如果可能的话,请在失败时选择重试任务。但是一定要确保任务是满足幂等性和原子性要求

  • 设置retry_limit来避免失败的任务无休止的重试下去

  • 如果某些任务的失败可能并不能及时恢复,那就采取指数性退避,抛出随机数以避免影响其他服务

    def exponential_backoff(task_self):
        minutes = task_self.default_retry_delay / 60
        rand = random.uniform(minutes, minutes * 1.3)
        return int(rand ** task_self.request.retries) * 60
    # in the task
    raise self.retry(exc=e, countdown=exponential_backoff(self))
    
  • 使用autoretry_for来减少重试任务的不必要代码的编写

  • 使用retry_backoff来减少做指数退避时不必要的代码

  • 对于那些需要高可靠性的任务,请将acks_lateretry结合使用,再次明确的是需要保证任务的幂等性和原子性 我应该使用retry还是acks_late

  • 设置hard 和soft的时间限制,对于时间超出预期的任务能够优雅的处理

    from celery.exceptions import SoftTimeLimitExceeded
    
    @app.task(task_time_limit=60, task_soft_time_limit=45)
    def my_task():
        try:
            something_possibly_long()
        except SoftTimeLimitExceeded:
            recover()
    
  • 使用多个队列来控制流量和扩展性 路由任务

  • 通过扩展基类Task的方式来定义默认行为 自定义Task Class

  • 使用canvas的特性来控制任务流和解决并发问题 Canvas: Designing Work-flows

  • 尽可能的记录日志,使用get_task_logger来自动获取日志中关于任务名称和任务唯一id的部分

  • 对于失败的任务,应该配备监控系统能够尽快通知

  • 可以使用Flower来监控转换任务

  • 使用task_always_eager来测试任务执行