2023-07-26 21:33:29 +02:00

325 lines
9.8 KiB

Code concerned with waiting in different contexts (blocking, async, etc).
These functions are designed to consume the generators returned by the
`generators` module function and to return their final value.
# Copyright (C) 2020 The Psycopg Team
import os
import select
import selectors
from typing import Dict, Optional
from asyncio import get_event_loop, wait_for, Event, TimeoutError
from selectors import DefaultSelector
from . import errors as e
from .abc import RV, PQGen, PQGenConn, WaitFunc
from ._enums import Wait as Wait, Ready as Ready # re-exported
from ._cmodule import _psycopg
WAIT_R = Wait.R
WAIT_W = Wait.W
READY_R = Ready.R
READY_W = Ready.W
def wait_selector(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
Wait for a generator using the best strategy available.
:param gen: a generator performing database operations and yielding
`Ready` values when it would block.
:param fileno: the file descriptor to wait on.
:param timeout: timeout (in seconds) to check for other interrupt, e.g.
to allow Ctrl-C.
:type timeout: float
:return: whatever `!gen` returns on completion.
Consume `!gen`, scheduling `fileno` for completion when it is reported to
block. Once ready again send the ready state back to `!gen`.
s = next(gen)
with DefaultSelector() as sel:
while True:
sel.register(fileno, s)
rlist = None
while not rlist:
rlist =
# note: this line should require a cast, but mypy doesn't complain
ready: Ready = rlist[0][1]
assert s & ready
s = gen.send(ready)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
def wait_conn(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV:
Wait for a connection generator using the best strategy available.
:param gen: a generator performing database operations and yielding
(fd, `Ready`) pairs when it would block.
:param timeout: timeout (in seconds) to check for other interrupt, e.g.
to allow Ctrl-C. If zero or None, wait indefinitely.
:type timeout: float
:return: whatever `!gen` returns on completion.
Behave like in `wait()`, but take the fileno to wait from the generator
itself, which might change during processing.
fileno, s = next(gen)
if not timeout:
timeout = None
with DefaultSelector() as sel:
while True:
sel.register(fileno, s)
rlist =
if not rlist:
raise e.ConnectionTimeout("connection timeout expired")
ready: Ready = rlist[0][1] # type: ignore[assignment]
fileno, s = gen.send(ready)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
async def wait_async(gen: PQGen[RV], fileno: int) -> RV:
Coroutine waiting for a generator to complete.
:param gen: a generator performing database operations and yielding
`Ready` values when it would block.
:param fileno: the file descriptor to wait on.
:return: whatever `!gen` returns on completion.
Behave like in `wait()`, but exposing an `asyncio` interface.
# Use an event to block and restart after the fd state changes.
# Not sure this is the best implementation but it's a start.
ev = Event()
loop = get_event_loop()
ready: Ready
s: Wait
def wakeup(state: Ready) -> None:
nonlocal ready
ready |= state # type: ignore[assignment]
s = next(gen)
while True:
reader = s & WAIT_R
writer = s & WAIT_W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ready = 0 # type: ignore[assignment]
if reader:
loop.add_reader(fileno, wakeup, READY_R)
if writer:
loop.add_writer(fileno, wakeup, READY_W)
await ev.wait()
if reader:
if writer:
s = gen.send(ready)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
async def wait_conn_async(gen: PQGenConn[RV], timeout: Optional[float] = None) -> RV:
Coroutine waiting for a connection generator to complete.
:param gen: a generator performing database operations and yielding
(fd, `Ready`) pairs when it would block.
:param timeout: timeout (in seconds) to check for other interrupt, e.g.
to allow Ctrl-C. If zero or None, wait indefinitely.
:return: whatever `!gen` returns on completion.
Behave like in `wait()`, but take the fileno to wait from the generator
itself, which might change during processing.
# Use an event to block and restart after the fd state changes.
# Not sure this is the best implementation but it's a start.
ev = Event()
loop = get_event_loop()
ready: Ready
s: Wait
def wakeup(state: Ready) -> None:
nonlocal ready
ready = state
fileno, s = next(gen)
if not timeout:
timeout = None
while True:
reader = s & WAIT_R
writer = s & WAIT_W
if not reader and not writer:
raise e.InternalError(f"bad poll status: {s}")
ready = 0 # type: ignore[assignment]
if reader:
loop.add_reader(fileno, wakeup, READY_R)
if writer:
loop.add_writer(fileno, wakeup, READY_W)
await wait_for(ev.wait(), timeout)
if reader:
if writer:
fileno, s = gen.send(ready)
except TimeoutError:
raise e.ConnectionTimeout("connection timeout expired")
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
# Specialised implementation of wait functions.
def wait_select(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
Wait for a generator using select where supported.
s = next(gen)
empty = ()
fnlist = (fileno,)
while True:
rl, wl, xl =
fnlist if s & WAIT_R else empty,
fnlist if s & WAIT_W else empty,
ready = 0
if rl:
ready = READY_R
if wl:
ready |= READY_W
if not ready:
# assert s & ready
s = gen.send(ready) # type: ignore
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
poll_evmasks: Dict[Wait, int]
if hasattr(selectors, "EpollSelector"):
poll_evmasks = {
poll_evmasks = {}
def wait_epoll(gen: PQGen[RV], fileno: int, timeout: Optional[float] = None) -> RV:
Wait for a generator using epoll where supported.
Parameters are like for `wait()`. If it is detected that the best selector
strategy is `epoll` then this function will be used instead of `wait`.
See also:
s = next(gen)
if timeout is None or timeout < 0:
timeout = 0
timeout = int(timeout * 1000.0)
with select.epoll() as epoll:
evmask = poll_evmasks[s]
epoll.register(fileno, evmask)
while True:
fileevs = None
while not fileevs:
fileevs = epoll.poll(timeout)
ev = fileevs[0][1]
ready = 0
if ev & ~select.EPOLLOUT:
ready = READY_R
if ev & ~select.EPOLLIN:
ready |= READY_W
# assert s & ready
s = gen.send(ready)
evmask = poll_evmasks[s]
epoll.modify(fileno, evmask)
except StopIteration as ex:
rv: RV = ex.args[0] if ex.args else None
return rv
if _psycopg:
wait_c = _psycopg.wait_c
# Choose the best wait strategy for the platform.
# the selectors objects have a generic interface but come with some overhead,
# so we also offer more finely tuned implementations.
wait: WaitFunc
# Allow the user to choose a specific function for testing
if "PSYCOPG_WAIT_FUNC" in os.environ:
fname = os.environ["PSYCOPG_WAIT_FUNC"]
if not fname.startswith("wait_") or fname not in globals():
raise ImportError(
"PSYCOPG_WAIT_FUNC should be the name of an available wait function;"
f" got {fname!r}"
wait = globals()[fname]
elif _psycopg:
wait = wait_c
elif selectors.DefaultSelector is getattr(selectors, "SelectSelector", None):
# On Windows, SelectSelector should be the default.
wait = wait_select
elif selectors.DefaultSelector is getattr(selectors, "EpollSelector", None):
wait = wait_epoll
wait = wait_selector