""" psycopg row factories """ # Copyright (C) 2021 The Psycopg Team import functools from typing import Any, Callable, Dict, List, Optional, NamedTuple, NoReturn from typing import TYPE_CHECKING, Sequence, Tuple, Type, TypeVar from collections import namedtuple from typing_extensions import TypeAlias from . import pq from . import errors as e from ._compat import Protocol from ._encodings import _as_python_identifier if TYPE_CHECKING: from .cursor import BaseCursor, Cursor from .cursor_async import AsyncCursor from psycopg.pq.abc import PGresult COMMAND_OK = pq.ExecStatus.COMMAND_OK TUPLES_OK = pq.ExecStatus.TUPLES_OK SINGLE_TUPLE = pq.ExecStatus.SINGLE_TUPLE T = TypeVar("T", covariant=True) # Row factories Row = TypeVar("Row", covariant=True) class RowMaker(Protocol[Row]): """ Callable protocol taking a sequence of value and returning an object. The sequence of value is what is returned from a database query, already adapted to the right Python types. The return value is the object that your program would like to receive: by default (`tuple_row()`) it is a simple tuple, but it may be any type of object. Typically, `!RowMaker` functions are returned by `RowFactory`. """ def __call__(self, __values: Sequence[Any]) -> Row: ... class RowFactory(Protocol[Row]): """ Callable protocol taking a `~psycopg.Cursor` and returning a `RowMaker`. A `!RowFactory` is typically called when a `!Cursor` receives a result. This way it can inspect the cursor state (for instance the `~psycopg.Cursor.description` attribute) and help a `!RowMaker` to create a complete object. For instance the `dict_row()` `!RowFactory` uses the names of the column to define the dictionary key and returns a `!RowMaker` function which would use the values to create a dictionary for each record. """ def __call__(self, __cursor: "Cursor[Any]") -> RowMaker[Row]: ... class AsyncRowFactory(Protocol[Row]): """ Like `RowFactory`, taking an async cursor as argument. """ def __call__(self, __cursor: "AsyncCursor[Any]") -> RowMaker[Row]: ... class BaseRowFactory(Protocol[Row]): """ Like `RowFactory`, taking either type of cursor as argument. """ def __call__(self, __cursor: "BaseCursor[Any, Any]") -> RowMaker[Row]: ... TupleRow: TypeAlias = Tuple[Any, ...] """ An alias for the type returned by `tuple_row()` (i.e. a tuple of any content). """ DictRow: TypeAlias = Dict[str, Any] """ An alias for the type returned by `dict_row()` A `!DictRow` is a dictionary with keys as string and any value returned by the database. """ def tuple_row(cursor: "BaseCursor[Any, Any]") -> "RowMaker[TupleRow]": r"""Row factory to represent rows as simple tuples. This is the default factory, used when `~psycopg.Connection.connect()` or `~psycopg.Connection.cursor()` are called without a `!row_factory` parameter. """ # Implementation detail: make sure this is the tuple type itself, not an # equivalent function, because the C code fast-paths on it. return tuple def dict_row(cursor: "BaseCursor[Any, Any]") -> "RowMaker[DictRow]": """Row factory to represent rows as dictionaries. The dictionary keys are taken from the column names of the returned columns. """ names = _get_names(cursor) if names is None: return no_result def dict_row_(values: Sequence[Any]) -> Dict[str, Any]: # https://github.com/python/mypy/issues/2608 return dict(zip(names, values)) # type: ignore[arg-type] return dict_row_ def namedtuple_row( cursor: "BaseCursor[Any, Any]", ) -> "RowMaker[NamedTuple]": """Row factory to represent rows as `~collections.namedtuple`. The field names are taken from the column names of the returned columns, with some mangling to deal with invalid names. """ res = cursor.pgresult if not res: return no_result nfields = _get_nfields(res) if nfields is None: return no_result nt = _make_nt(cursor._encoding, *(res.fname(i) for i in range(nfields))) return nt._make @functools.lru_cache(512) def _make_nt(enc: str, *names: bytes) -> Type[NamedTuple]: snames = tuple(_as_python_identifier(n.decode(enc)) for n in names) return namedtuple("Row", snames) # type: ignore[return-value] def class_row(cls: Type[T]) -> BaseRowFactory[T]: r"""Generate a row factory to represent rows as instances of the class `!cls`. The class must support every output column name as a keyword parameter. :param cls: The class to return for each row. It must support the fields returned by the query as keyword arguments. :rtype: `!Callable[[Cursor],` `RowMaker`\[~T]] """ def class_row_(cursor: "BaseCursor[Any, Any]") -> "RowMaker[T]": names = _get_names(cursor) if names is None: return no_result def class_row__(values: Sequence[Any]) -> T: return cls(**dict(zip(names, values))) # type: ignore[arg-type] return class_row__ return class_row_ def args_row(func: Callable[..., T]) -> BaseRowFactory[T]: """Generate a row factory calling `!func` with positional parameters for every row. :param func: The function to call for each row. It must support the fields returned by the query as positional arguments. """ def args_row_(cur: "BaseCursor[Any, T]") -> "RowMaker[T]": def args_row__(values: Sequence[Any]) -> T: return func(*values) return args_row__ return args_row_ def kwargs_row(func: Callable[..., T]) -> BaseRowFactory[T]: """Generate a row factory calling `!func` with keyword parameters for every row. :param func: The function to call for each row. It must support the fields returned by the query as keyword arguments. """ def kwargs_row_(cursor: "BaseCursor[Any, T]") -> "RowMaker[T]": names = _get_names(cursor) if names is None: return no_result def kwargs_row__(values: Sequence[Any]) -> T: return func(**dict(zip(names, values))) # type: ignore[arg-type] return kwargs_row__ return kwargs_row_ def no_result(values: Sequence[Any]) -> NoReturn: """A `RowMaker` that always fail. It can be used as return value for a `RowFactory` called with no result. Note that the `!RowFactory` *will* be called with no result, but the resulting `!RowMaker` never should. """ raise e.InterfaceError("the cursor doesn't have a result") def _get_names(cursor: "BaseCursor[Any, Any]") -> Optional[List[str]]: res = cursor.pgresult if not res: return None nfields = _get_nfields(res) if nfields is None: return None enc = cursor._encoding return [ res.fname(i).decode(enc) for i in range(nfields) # type: ignore[union-attr] ] def _get_nfields(res: "PGresult") -> Optional[int]: """ Return the number of columns in a result, if it returns tuples else None Take into account the special case of results with zero columns. """ nfields = res.nfields if ( res.status == TUPLES_OK or res.status == SINGLE_TUPLE # "describe" in named cursors or (res.status == COMMAND_OK and nfields) ): return nfields else: return None