Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: Event loop is closed in multiprocessing enviroments (celery). #46

Open
artur-augustyniak opened this issue Apr 29, 2021 · 7 comments

Comments

@artur-augustyniak
Copy link

Hi,

First of all sorry for not providing PR, lack of spare time.
The problem I faced is not strictly an vt-py issue but i beleave that protection against such a problem can also be added here.
In short words, i ve got software where celery tasks using vt-py are processesd among other tasks, other tasks are using packages containg async code too. While trying to debug error mentioned in title I came up with PoC of problem/possible solution. I believe that code will be best description.

Right now the only more or less clear solution is wrapping async code using threads.

#!/usr/bin/env python3


import os
from multiprocessing import Pool
import asyncio


def other_celery_task(job_id):
    '''
        Here we've got common pattern in other libs, notice 'idiomatic' (or pretend to be) view on event loop as your own resource:
        obtain new event_loop
        try:
            do your job
        finally:
            event_loop.close()
    '''
    cpid = os.getpid()
    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    async def some_aync_op_possibly_throwing_exception():
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() start id", job_id, cpid)
        # await asyncio.sleep()
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() done id", job_id, cpid)
        return 1
    try:
        return event_loop.run_until_complete(some_aync_op_possibly_throwing_exception())
    except Exception:
        print("Oh no, fail, let's ignore it, forget about finnaly below")
    finally:
        event_loop.close()


def vt_client_related_celery_task(job_id):
    '''
        Here we've got reconstructed flow from vt.Client
    '''
    cpid = os.getpid()

    async def vt_client_aync_op():
        print("[*] async def vt_cllient_aync_op() start id", job_id, cpid)
        # await asyncio.sleep()
        print("[*] async def vt_cllient_aync_op() done id", job_id, cpid)
        return 1

    try:
        event_loop = asyncio.get_event_loop()
        print("event loop was in place id",
              job_id, cpid, event_loop.is_closed())

        '''
            try to uncommnet 2 lines below. I assume that closed loop is not NX loop, so Runtime exceptiion will be never thrown.
            When next celery task with vt.Client arrives, we've got RuntimeError: Event loop is closed
        '''
        # if event_loop.is_closed():
        #     raise RuntimeError("other task closed our loop?")
    except RuntimeError:
        # Generate an event loop if there isn't any.
        event_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(event_loop)
        print("event loop regenerated id", job_id, cpid)

    return event_loop.run_until_complete(vt_client_aync_op())


if __name__ == '__main__':
    '''
        Here we've got ovesimplication of default celery worker setup, dealing also with other tasks.
        Pool size 2 is intentionally small, this problem can be non deterministic

    '''

    with Pool(2) as pool:

        vt_round_one = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(0, 9)])
        other_round = pool.map_async(other_celery_task, [9])
        vt_round_two = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(10, 20)])
        if 20 == sum(vt_round_one.get() + other_round.get() + vt_round_two.get()):
            print("all tasks executed")
        else:
            print("yay, fail")

BR
Artur Augustyniak

@plusvic
Copy link
Member

plusvic commented Apr 30, 2022

I don't quite understand the issue. Is that the event loop used by vt-py is being closed at some other place?

@artur-augustyniak
Copy link
Author

Yes, other code packages often interfere with the event loop by closing it, it is not a problem of the vt-py itself. However, it seems to me that it could defend itself against closing the event loop by maintaining its own, e.g. in a dedicated thread for that.

@plusvic
Copy link
Member

plusvic commented May 2, 2022

I'm not sure about that. When you are using vt-py in async mode (i.e: using exclusively async functions from the library) the event loop is something external to the library, it should be the user who decides whether to create a new event loop or not. As a user of the library you may want to use it in a single-threaded environment where a single event loop is shared by multiple concurrent tasks.

@artur-augustyniak
Copy link
Author

Sure, you're right, of course. I am not reporting the error in vt-py. It's a suggestion based on my experience. Unfortunately many libraries implement such protective measures, because in environments like celery you have no chance as a user to control when the event loop for a given process (worker process) is closed. As far as I remember I wrote a suggestion that vt-py may give "manage your own event loop" option, because it was something I had to do myself anyway. So apart from saying "hey why not consider this", the issue is destined to be closed :)

@plusvic
Copy link
Member

plusvic commented May 4, 2022

I'm ok with adding protective measures if that helps, but I don't know what type of protective measures we can add. Do you have some specific measure in mind?

@artur-augustyniak
Copy link
Author

By the end of the week I'll dig out my patches, make them civilized, and do a pull request.
From what I remember, I decided to detect the state of the event loop or send the work to a dedicated thread so that the thread can manage its event loop.

I don't remember the details at the moment, the topic is a bit old but since my workplace didn't explode I guess it works ok ;)

artur-augustyniak added a commit to artur-augustyniak/vt-py that referenced this issue May 6, 2022
@artur-augustyniak
Copy link
Author

Ok, now I remember. The whole point is that when the event loop is only closed but exists then get_event_loop does not throw a runtime exception. In an environment like celery, where you have several processes and you don't control which one your code runs in, and on top of that the code of other libraries closes the event loop (because that's the python idiom - treat the loop as your own resource) you're not able to meaningfully guarantee as a user that the loop will be in a good state.

Run the following test with vt-py master and then with my fork:


import vt
import os
from multiprocessing import Pool
import asyncio


def other_celery_task(job_id):
    '''
        Here we've got common pattern in other libs, notice 'idiomatic' (or pretend to be) view on event loop as your own resource:
        obtain new event_loop
        try:
            do your job
        finally:
            event_loop.close()
    '''
    cpid = os.getpid()
    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    async def some_aync_op_possibly_throwing_exception():
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() start id", job_id, cpid)
        # await asyncio.sleep()
        print(
            "[*] async def some_aync_op_possibly_throwing_exception() done id", job_id, cpid)
        return 1
    try:
        return event_loop.run_until_complete(some_aync_op_possibly_throwing_exception())
    except Exception:
        print("Oh no, fail, let's ignore it, forget about finnaly below")
    finally:
        event_loop.close()


def vt_client_related_celery_task(job_id):
    api_key = "API_KEY"
    with vt.Client(api_key) as client:
        vt_report = client.get_object("/files/47bb7f855cdf116c62499240089fa1b7a69585e8b7f639e192b9d038da4094cd")
        print(vt_report, "asd")
    return 1


if __name__ == '__main__':
    '''
        Here we've got ovesimplication of default celery worker setup, dealing also with other tasks.
        Pool size 2 is intentionally small, this problem can be non deterministic

    '''

    with Pool(2) as pool:

        vt_round_one = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(0, 9)])
        other_round = pool.map_async(other_celery_task, [9])
        
        vt_round_two = pool.map_async(vt_client_related_celery_task,
                                      [i for i in range(10, 20)])
        if 20 == sum(vt_round_one.get() + other_round.get() + vt_round_two.get()):
            print("all tasks executed")
        else:
            print("yay, fail")`
```

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants