{"id":961,"hash":"34590d0d805911033f36beadc1a22b2c0706a4dbae7b27097990ba29c7468382","pattern":"Using queues results in asyncio exception &quot;got Future &lt;Future pending&gt; attached to a different loop&quot;","full_message":"I'm trying to run this simple code with asyncio queues, but catch exceptions, and even nested exceptions. \n\nI would like to get some help with making queues in asyncio work correctly:\n\nimport asyncio, logging\n\nlogging.basicConfig(level=logging.DEBUG)\nlogging.getLogger(\"asyncio\").setLevel(logging.WARNING)\n\nnum_workers = 1\nin_queue = asyncio.Queue()\nout_queue = asyncio.Queue()\ntasks = []\n\nasync def run():\n    for request in range(1):\n        await in_queue.put(request)\n\n    # each task consumes from 'input_queue' and produces to 'output_queue':\n    for i in range(num_workers):\n        tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))\n    # tasks.append(asyncio.create_task(saver()))\n\n    print('waiting for queues...')\n    await in_queue.join()\n    # await out_queue.join()\n    print('all queues done')\n\n    for task in tasks:\n        task.cancel()\n    print('waiting until all tasks cancelled')\n    await asyncio.gather(*tasks, return_exceptions=True)\n    print('done')\n\nasync def worker(name):\n    while True:\n        try:\n            print(f\"{name} started\")\n            num = await in_queue.get()\n            print(f'{name} got {num}')\n            await asyncio.sleep(0)\n            # await out_queue.put(num)\n        except Exception as e:\n            print(f\"{name} exception {e}\")\n        finally:\n            print(f\"{name} ended\")\n            in_queue.task_done()\n\nasync def saver():\n    while True:\n        try:\n            print(\"saver started\")\n            num = await out_queue.get()\n            print(f'saver got {num}')\n            await asyncio.sleep(0)\n            print(\"saver ended\")\n        except Exception as e:\n            print(f\"saver exception {e}\")\n        finally:\n            out_queue.task_done()\n\nasyncio.run(run(), debug=True)\nprint('Done!')\n\nOutput:\n\nwaiting for queues...\nworker-0 started\nworker-0 got 0\nworker-0 ended\nworker-0 started\nworker-0 exception \nworker-0 ended\nERROR:asyncio:unhandled exception during asyncio.run() shutdown\ntask: <Task finished coro=<worker() done, defined at temp4.py:34> exception=ValueError('task_done() called too many times') created at Python37\\lib\\asyncio\\tasks.py:325>\nTraceback (most recent call last):\n  File \"Python37\\lib\\asyncio\\runners.py\", line 43, in run\n    return loop.run_until_complete(main)\n  File \"Python37\\lib\\asyncio\\base_events.py\", line 573, in run_until_complete\n    return future.result()\n  File \"temp4.py\", line 23, in run\n    await in_queue.join()\n  File \"Python37\\lib\\asyncio\\queues.py\", line 216, in join\n    await self._finished.wait()\n  File \"Python37\\lib\\asyncio\\locks.py\", line 293, in wait\n    await fut\nRuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\\lib\\asyncio\\base_events.py:158] created at Python37\\lib\\asyncio\\base_events.py:552> got Future <Future pending> attached to a different loop\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n  File \"temp4.py\", line 46, in worker\n    in_queue.task_done()\n  File \"Python37\\lib\\asyncio\\queues.py\", line 202, in task_done\n    raise ValueError('task_done() called too many times')\nValueError: task_done() called too many times\nTraceback (most recent call last):\n  File \"C:\\Program Files\\JetBrains\\PyCharm Community Edition 2018.1.4\\helpers\\pydev\\pydevd.py\", line 1664, in <module>\n    main()\n  File \"C:\\Program Files\\JetBrains\\PyCharm Community Edition 2018.1.4\\helpers\\pydev\\pydevd.py\", line 1658, in main\n    globals = debugger.run(setup['file'], None, None, is_module)\n  File \"C:\\Program Files\\JetBrains\\PyCharm Community Edition 2018.1.4\\helpers\\pydev\\pydevd.py\", line 1068, in run\n    pydev_imports.execfile(file, globals, locals)  # execute the script\n  File \"C:\\Program Files\\JetBrains\\PyCharm Community Edition 2018.1.4\\helpers\\pydev\\_pydev_imps\\_pydev_execfile.py\", line 18, in execfile\n    exec(compile(contents+\"\\n\", file, 'exec'), glob, loc)\n  File \"temp4.py\", line 63, in <module>\n    asyncio.run(run(), debug=True)\n  File \"Python37\\lib\\asyncio\\runners.py\", line 43, in run\n    return loop.run_until_complete(main)\n  File \"Python37\\lib\\asyncio\\base_events.py\", line 573, in run_until_complete\n    return future.result()\n  File \"temp4.py\", line 23, in run\n    await in_queue.join()\n  File \"Python37\\lib\\asyncio\\queues.py\", line 216, in join\n    await self._finished.wait()\n  File \"Python37\\lib\\asyncio\\locks.py\", line 293, in wait\n    await fut\nRuntimeError: Task <Task pending coro=<run() running at temp4.py:23> cb=[_run_until_complete_cb() at Python37\\lib\\asyncio\\base_events.py:158] created at Python37\\lib\\asyncio\\base_events.py:552> got Future <Future pending> attached to a different loop\n\nThis is the basic flow, what I would like to do later is run more requests on more workers where each worker will move the number from in_queue to out_queue and then the saver will print the numbers from out_queue.","ecosystem":"pypi","package_name":"python-3.x","package_version":null,"solution":"In Python 3.9 and older, your queues must be created inside the loop. You created them outside the loop created for asyncio.run(), so they use events.get_event_loop(). asyncio.run() creates a new loop, and futures created for the queue in one loop can't then be used in the other.\n\nCreate your queues in your top-level run() coroutine, and either pass them to the coroutines that need them, or use contextvars.ContextVar objects if you must use globals.\n\nIn Python 3.10 and neweer, asyncio primitives were refactored to access the current active loop via the asyncio.get_running_loop() function, when they first need access to the loop. This is then cached on the object. This makes it easier to create such primitives as globals before you start the event loop.\n\nYou also need to clean up how you handle task cancelling inside your tasks. A task is cancelled by raising a asyncio.CancelledError exception in the task. You can ignore it, but if you catch it to do clean-up work, you must re-raise it.\n\nYour task code catches all exceptions without re-raising, including CancelledError, so you block proper cancellations.\n\nInstead, what does happen during cancellation is that you call queue.task_done(); don't do that, at least not when your task is being cancelled. You should only call task_done() when you actually are handling a queue task, but your code calls task_done() when an exception occurs while waiting for a queue task to appear.\n\nIf you need to use try...finally: in_queue.task_done(), put this around the block of code that handles an item received from the queue, and keep the await in_queue.get() outside of that try block. You don't want to mark tasks done you didn't actually receive.\n\nFinally, when you print exceptions, you want to print their repr(); for historical reasons, the str() conversion of exceptions produces their .args value, which is not very helpful for CancelledError exceptions, which have an empty .args. Use {e!r} in formatted strings, so you can see what exception you are catching:\n\nworker-0 exception CancelledError()\n\nSo, corrected code, with the saver() task enabled, the queues created inside of run(), and task exception handling cleaned up, would be:\n\nimport asyncio, logging\n\nlogging.basicConfig(level=logging.DEBUG)\nlogging.getLogger(\"asyncio\").setLevel(logging.WARNING)\n\nnum_workers = 1\n\nasync def run():\n    in_queue = asyncio.Queue()\n    out_queue = asyncio.Queue()\n\n    for request in range(1):\n        await in_queue.put(request)\n\n    # each task consumes from 'in_queue' and produces to 'out_queue':\n    tasks = []\n    for i in range(num_workers):\n        tasks.append(asyncio.create_task(\n            worker(in_queue, out_queue, name=f'worker-{i}')))\n    tasks.append(asyncio.create_task(saver(out_queue)))\n\n    await in_queue.join()\n    await out_queue.join()\n\n    for task in tasks:\n        task.cancel()\n\n    await asyncio.gather(*tasks, return_exceptions=True)\n\n    print('done')\n\nasync def worker(in_queue, out_queue, name):\n    print(f\"{name} started\")\n    try:\n        while True:\n            num = await in_queue.get()\n            try:\n                print(f'{name} got {num}')\n                await asyncio.sleep(0)\n                await out_queue.put(num)\n            except Exception as e:\n                print(f\"{name} exception {e!r}\")\n                raise\n            finally:\n                in_queue.task_done()\n    except asyncio.CancelledError:\n        print(f\"{name} is being cancelled\")\n        raise\n    finally:\n        print(f\"{name} ended\")\n\nasync def saver(out_queue):\n    print(\"saver started\")\n    try:\n        while True:\n            num = await out_queue.get()\n            try:\n                print(f'saver got {num}')\n                await asyncio.sleep(0)\n                print(\"saver ended\")\n            except Exception as e:\n                print(f\"saver exception {e!r}\")\n                raise\n            finally:\n                out_queue.task_done()\n    except asyncio.CancelledError:\n        print(f\"saver is being cancelled\")\n        raise\n    finally:\n        print(f\"saver ended\")\n\nasyncio.run(run(), debug=True)\nprint('Done!')\n\nThis prints\n\nworker-0 started\nworker-0 got 0\nsaver started\nsaver got 0\nsaver ended\ndone\nworker-0 is being cancelled\nworker-0 ended\nsaver is being cancelled\nsaver ended\nDone!\n\nIf you want to use globals, to share queue objects, then use ContextVar objects. You still create the queues in run(), but if you were to start multiple loops then the contextvars module integration will take care of keeping the queues separate:\n\nfrom contextvars import ContextVar\n# ...\n\nin_queue = ContextVar('in_queue')\nout_queue = ContextVar('out_queue')\n\nasync def run():\n    in_, out = asyncio.Queue(), asyncio.Queue()\n    in_queue.set(in_)\n    out_queue.set(out)\n\n    for request in range(1):\n        await in_.put(request)\n\n    # ...\n\n    for i in range(num_workers):\n        tasks.append(asyncio.create_task(worker(name=f'worker-{i}')))\n    tasks.append(asyncio.create_task(saver()))\n\n    await in_.join()\n    await out.join()\n\n    # ...\n\nasync def worker(name):\n    print(f\"{name} started\")\n    in_ = in_queue.get()\n    out = out_queue.get()\n    try:\n        while True:\n            num = await in_.get()\n            try:\n                # ...\n                await out.put(num)\n                # ...\n            finally:\n                in_.task_done()\n    # ...\n\nasync def saver():\n    print(\"saver started\")\n    out = out_queue.get()\n    try:\n        while True:\n            num = await out.get()\n            try:\n                # ...\n            finally:\n                out.task_done()\n    # ...","confidence":0.95,"source":"stackoverflow","source_url":"https://stackoverflow.com/questions/53724665/using-queues-results-in-asyncio-exception-got-future-future-pending-attached","votes":72,"created_at":"2026-04-19T04:52:05.810982+00:00","updated_at":"2026-04-19T04:52:05.810982+00:00"}