Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
421 views
in Technique[技术] by (71.8m points)

python - Celery tasks running every 20 seconds is overlapping and starting before the last can finish

I have a celery task running every 20 seconds across 3 instances all connected to one database. The problem is handler is firing off twice sometimes the tasks overlap. Seems like the filtered items are not updating while the tasks overlap:

@periodic_task(run_every=timedelta(seconds=20))
def process_webhook_transactions():
    """Process webhook transactions"""
    transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
    for transaction in transactions:
        data = transaction.body
        event = data.get('event_category')
        if event is None:
            transaction.status = WebhookTransaction.ERROR
            transaction.save()
            continue
        
        
        handler = WEBHOOK_HANDLERS.get(event, default_handler)
        success = handler(data)

        if success:
            transaction.status = WebhookTransaction.PROCESSED
        else:
            transaction.status = WebhookTransaction.ERROR
        transaction.save()

What is the best way to avoid this?

question from:https://stackoverflow.com/questions/65946548/celery-tasks-running-every-20-seconds-is-overlapping-and-starting-before-the-las

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You could use select_for_update and skip_locked to prevent the duplicated rows when 3 workers run that task at the same time. Like so:

transactions = WebhookTransaction.objects.filter(status=WebhookTransaction.UNPROCESSED)
transactions = transactions.select_for_update(skip_locked=True, of=("self",))

But this approach will make one worker instance work harder than others (first task selected all the transactions and others don't have much transactions left). You could create a new task which also run in 20 seconds, and this task will split all transactions into smaller chunks (10-20 maybe?) and then trigger process_webhook_transactions with these chunks.

If handler = WEBHOOK_HANDLERS.get(event, default_handler) is an asynchronous, I think split chunk approach is also good because you could run it concurrent to improve the speed up the task.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...