mysteriendrama/lib/python3.11/site-packages/psycopg/pq/pq_ctypes.py
2023-07-26 21:33:29 +02:00

1089 lines
35 KiB
Python

"""
libpq Python wrapper using ctypes bindings.
Clients shouldn't use this module directly, unless for testing: they should use
the `pq` module instead, which is in charge of choosing the best
implementation.
"""
# Copyright (C) 2020 The Psycopg Team
import sys
import logging
from os import getpid
from weakref import ref
from ctypes import Array, POINTER, cast, string_at, create_string_buffer, byref
from ctypes import addressof, c_char_p, c_int, c_size_t, c_ulong, c_void_p, py_object
from typing import Any, Callable, List, Optional, Sequence, Tuple
from typing import cast as t_cast, TYPE_CHECKING
from .. import errors as e
from . import _pq_ctypes as impl
from .misc import PGnotify, ConninfoOption, PGresAttDesc
from .misc import error_message, connection_summary
from ._enums import Format, ExecStatus, Trace
# Imported locally to call them from __del__ methods
from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQstatus
if TYPE_CHECKING:
from . import abc
__impl__ = "python"
logger = logging.getLogger("psycopg")
def version() -> int:
"""Return the version number of the libpq currently loaded.
The number is in the same format of `~psycopg.ConnectionInfo.server_version`.
Certain features might not be available if the libpq library used is too old.
"""
return impl.PQlibVersion()
@impl.PQnoticeReceiver # type: ignore
def notice_receiver(arg: c_void_p, result_ptr: impl.PGresult_struct) -> None:
pgconn = cast(arg, POINTER(py_object)).contents.value()
if not (pgconn and pgconn.notice_handler):
return
res = PGresult(result_ptr)
try:
pgconn.notice_handler(res)
except Exception as exc:
logger.exception("error in notice receiver: %s", exc)
finally:
res._pgresult_ptr = None # avoid destroying the pgresult_ptr
class PGconn:
"""
Python representation of a libpq connection.
"""
__slots__ = (
"_pgconn_ptr",
"notice_handler",
"notify_handler",
"_self_ptr",
"_procpid",
"__weakref__",
)
def __init__(self, pgconn_ptr: impl.PGconn_struct):
self._pgconn_ptr: Optional[impl.PGconn_struct] = pgconn_ptr
self.notice_handler: Optional[Callable[["abc.PGresult"], None]] = None
self.notify_handler: Optional[Callable[[PGnotify], None]] = None
# Keep alive for the lifetime of PGconn
self._self_ptr = py_object(ref(self))
impl.PQsetNoticeReceiver(pgconn_ptr, notice_receiver, byref(self._self_ptr))
self._procpid = getpid()
def __del__(self) -> None:
# Close the connection only if it was created in this process,
# not if this object is being GC'd after fork.
if getpid() == self._procpid:
self.finish()
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
info = connection_summary(self)
return f"<{cls} {info} at 0x{id(self):x}>"
@classmethod
def connect(cls, conninfo: bytes) -> "PGconn":
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
pgconn_ptr = impl.PQconnectdb(conninfo)
if not pgconn_ptr:
raise MemoryError("couldn't allocate PGconn")
return cls(pgconn_ptr)
@classmethod
def connect_start(cls, conninfo: bytes) -> "PGconn":
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
pgconn_ptr = impl.PQconnectStart(conninfo)
if not pgconn_ptr:
raise MemoryError("couldn't allocate PGconn")
return cls(pgconn_ptr)
def connect_poll(self) -> int:
return self._call_int(impl.PQconnectPoll)
def finish(self) -> None:
self._pgconn_ptr, p = None, self._pgconn_ptr
if p:
PQfinish(p)
@property
def pgconn_ptr(self) -> Optional[int]:
"""The pointer to the underlying `!PGconn` structure, as integer.
`!None` if the connection is closed.
The value can be used to pass the structure to libpq functions which
psycopg doesn't (currently) wrap, either in C or in Python using FFI
libraries such as `ctypes`.
"""
if self._pgconn_ptr is None:
return None
return addressof(self._pgconn_ptr.contents) # type: ignore[attr-defined]
@property
def info(self) -> List["ConninfoOption"]:
self._ensure_pgconn()
opts = impl.PQconninfo(self._pgconn_ptr)
if not opts:
raise MemoryError("couldn't allocate connection info")
try:
return Conninfo._options_from_array(opts)
finally:
impl.PQconninfoFree(opts)
def reset(self) -> None:
self._ensure_pgconn()
impl.PQreset(self._pgconn_ptr)
def reset_start(self) -> None:
if not impl.PQresetStart(self._pgconn_ptr):
raise e.OperationalError("couldn't reset connection")
def reset_poll(self) -> int:
return self._call_int(impl.PQresetPoll)
@classmethod
def ping(self, conninfo: bytes) -> int:
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
return impl.PQping(conninfo)
@property
def db(self) -> bytes:
return self._call_bytes(impl.PQdb)
@property
def user(self) -> bytes:
return self._call_bytes(impl.PQuser)
@property
def password(self) -> bytes:
return self._call_bytes(impl.PQpass)
@property
def host(self) -> bytes:
return self._call_bytes(impl.PQhost)
@property
def hostaddr(self) -> bytes:
return self._call_bytes(impl.PQhostaddr)
@property
def port(self) -> bytes:
return self._call_bytes(impl.PQport)
@property
def tty(self) -> bytes:
return self._call_bytes(impl.PQtty)
@property
def options(self) -> bytes:
return self._call_bytes(impl.PQoptions)
@property
def status(self) -> int:
return PQstatus(self._pgconn_ptr)
@property
def transaction_status(self) -> int:
return impl.PQtransactionStatus(self._pgconn_ptr)
def parameter_status(self, name: bytes) -> Optional[bytes]:
self._ensure_pgconn()
return impl.PQparameterStatus(self._pgconn_ptr, name)
@property
def error_message(self) -> bytes:
return impl.PQerrorMessage(self._pgconn_ptr)
@property
def protocol_version(self) -> int:
return self._call_int(impl.PQprotocolVersion)
@property
def server_version(self) -> int:
return self._call_int(impl.PQserverVersion)
@property
def socket(self) -> int:
rv = self._call_int(impl.PQsocket)
if rv == -1:
raise e.OperationalError("the connection is lost")
return rv
@property
def backend_pid(self) -> int:
return self._call_int(impl.PQbackendPID)
@property
def needs_password(self) -> bool:
"""True if the connection authentication method required a password,
but none was available.
See :pq:`PQconnectionNeedsPassword` for details.
"""
return bool(impl.PQconnectionNeedsPassword(self._pgconn_ptr))
@property
def used_password(self) -> bool:
"""True if the connection authentication method used a password.
See :pq:`PQconnectionUsedPassword` for details.
"""
return bool(impl.PQconnectionUsedPassword(self._pgconn_ptr))
@property
def ssl_in_use(self) -> bool:
return self._call_bool(impl.PQsslInUse)
def exec_(self, command: bytes) -> "PGresult":
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
rv = impl.PQexec(self._pgconn_ptr, command)
if not rv:
raise e.OperationalError(f"executing query failed: {error_message(self)}")
return PGresult(rv)
def send_query(self, command: bytes) -> None:
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
self._ensure_pgconn()
if not impl.PQsendQuery(self._pgconn_ptr, command):
raise e.OperationalError(f"sending query failed: {error_message(self)}")
def exec_params(
self,
command: bytes,
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
) -> "PGresult":
args = self._query_params_args(
command, param_values, param_types, param_formats, result_format
)
self._ensure_pgconn()
rv = impl.PQexecParams(*args)
if not rv:
raise e.OperationalError(f"executing query failed: {error_message(self)}")
return PGresult(rv)
def send_query_params(
self,
command: bytes,
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
) -> None:
args = self._query_params_args(
command, param_values, param_types, param_formats, result_format
)
self._ensure_pgconn()
if not impl.PQsendQueryParams(*args):
raise e.OperationalError(
f"sending query and params failed: {error_message(self)}"
)
def send_prepare(
self,
name: bytes,
command: bytes,
param_types: Optional[Sequence[int]] = None,
) -> None:
atypes: Optional[Array[impl.Oid]]
if not param_types:
nparams = 0
atypes = None
else:
nparams = len(param_types)
atypes = (impl.Oid * nparams)(*param_types)
self._ensure_pgconn()
if not impl.PQsendPrepare(self._pgconn_ptr, name, command, nparams, atypes):
raise e.OperationalError(
f"sending query and params failed: {error_message(self)}"
)
def send_query_prepared(
self,
name: bytes,
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
) -> None:
# repurpose this function with a cheeky replacement of query with name,
# drop the param_types from the result
args = self._query_params_args(
name, param_values, None, param_formats, result_format
)
args = args[:3] + args[4:]
self._ensure_pgconn()
if not impl.PQsendQueryPrepared(*args):
raise e.OperationalError(
f"sending prepared query failed: {error_message(self)}"
)
def _query_params_args(
self,
command: bytes,
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
param_types: Optional[Sequence[int]] = None,
param_formats: Optional[Sequence[int]] = None,
result_format: int = Format.TEXT,
) -> Any:
if not isinstance(command, bytes):
raise TypeError(f"bytes expected, got {type(command)} instead")
aparams: Optional[Array[c_char_p]]
alenghts: Optional[Array[c_int]]
if param_values:
nparams = len(param_values)
aparams = (c_char_p * nparams)(
*(
# convert bytearray/memoryview to bytes
b if b is None or isinstance(b, bytes) else bytes(b)
for b in param_values
)
)
alenghts = (c_int * nparams)(*(len(p) if p else 0 for p in param_values))
else:
nparams = 0
aparams = alenghts = None
atypes: Optional[Array[impl.Oid]]
if not param_types:
atypes = None
else:
if len(param_types) != nparams:
raise ValueError(
"got %d param_values but %d param_types"
% (nparams, len(param_types))
)
atypes = (impl.Oid * nparams)(*param_types)
if not param_formats:
aformats = None
else:
if len(param_formats) != nparams:
raise ValueError(
"got %d param_values but %d param_formats"
% (nparams, len(param_formats))
)
aformats = (c_int * nparams)(*param_formats)
return (
self._pgconn_ptr,
command,
nparams,
atypes,
aparams,
alenghts,
aformats,
result_format,
)
def prepare(
self,
name: bytes,
command: bytes,
param_types: Optional[Sequence[int]] = None,
) -> "PGresult":
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
if not isinstance(command, bytes):
raise TypeError(f"'command' must be bytes, got {type(command)} instead")
if not param_types:
nparams = 0
atypes = None
else:
nparams = len(param_types)
atypes = (impl.Oid * nparams)(*param_types)
self._ensure_pgconn()
rv = impl.PQprepare(self._pgconn_ptr, name, command, nparams, atypes)
if not rv:
raise e.OperationalError(f"preparing query failed: {error_message(self)}")
return PGresult(rv)
def exec_prepared(
self,
name: bytes,
param_values: Optional[Sequence["abc.Buffer"]],
param_formats: Optional[Sequence[int]] = None,
result_format: int = 0,
) -> "PGresult":
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
aparams: Optional[Array[c_char_p]]
alenghts: Optional[Array[c_int]]
if param_values:
nparams = len(param_values)
aparams = (c_char_p * nparams)(
*(
# convert bytearray/memoryview to bytes
b if b is None or isinstance(b, bytes) else bytes(b)
for b in param_values
)
)
alenghts = (c_int * nparams)(*(len(p) if p else 0 for p in param_values))
else:
nparams = 0
aparams = alenghts = None
if not param_formats:
aformats = None
else:
if len(param_formats) != nparams:
raise ValueError(
"got %d param_values but %d param_types"
% (nparams, len(param_formats))
)
aformats = (c_int * nparams)(*param_formats)
self._ensure_pgconn()
rv = impl.PQexecPrepared(
self._pgconn_ptr,
name,
nparams,
aparams,
alenghts,
aformats,
result_format,
)
if not rv:
raise e.OperationalError(
f"executing prepared query failed: {error_message(self)}"
)
return PGresult(rv)
def describe_prepared(self, name: bytes) -> "PGresult":
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
rv = impl.PQdescribePrepared(self._pgconn_ptr, name)
if not rv:
raise e.OperationalError(f"describe prepared failed: {error_message(self)}")
return PGresult(rv)
def send_describe_prepared(self, name: bytes) -> None:
if not isinstance(name, bytes):
raise TypeError(f"bytes expected, got {type(name)} instead")
self._ensure_pgconn()
if not impl.PQsendDescribePrepared(self._pgconn_ptr, name):
raise e.OperationalError(
f"sending describe prepared failed: {error_message(self)}"
)
def describe_portal(self, name: bytes) -> "PGresult":
if not isinstance(name, bytes):
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
self._ensure_pgconn()
rv = impl.PQdescribePortal(self._pgconn_ptr, name)
if not rv:
raise e.OperationalError(f"describe portal failed: {error_message(self)}")
return PGresult(rv)
def send_describe_portal(self, name: bytes) -> None:
if not isinstance(name, bytes):
raise TypeError(f"bytes expected, got {type(name)} instead")
self._ensure_pgconn()
if not impl.PQsendDescribePortal(self._pgconn_ptr, name):
raise e.OperationalError(
f"sending describe portal failed: {error_message(self)}"
)
def get_result(self) -> Optional["PGresult"]:
rv = impl.PQgetResult(self._pgconn_ptr)
return PGresult(rv) if rv else None
def consume_input(self) -> None:
if 1 != impl.PQconsumeInput(self._pgconn_ptr):
raise e.OperationalError(f"consuming input failed: {error_message(self)}")
def is_busy(self) -> int:
return impl.PQisBusy(self._pgconn_ptr)
@property
def nonblocking(self) -> int:
return impl.PQisnonblocking(self._pgconn_ptr)
@nonblocking.setter
def nonblocking(self, arg: int) -> None:
if 0 > impl.PQsetnonblocking(self._pgconn_ptr, arg):
raise e.OperationalError(
f"setting nonblocking failed: {error_message(self)}"
)
def flush(self) -> int:
# PQflush segfaults if it receives a NULL connection
if not self._pgconn_ptr:
raise e.OperationalError("flushing failed: the connection is closed")
rv: int = impl.PQflush(self._pgconn_ptr)
if rv < 0:
raise e.OperationalError(f"flushing failed: {error_message(self)}")
return rv
def set_single_row_mode(self) -> None:
if not impl.PQsetSingleRowMode(self._pgconn_ptr):
raise e.OperationalError("setting single row mode failed")
def get_cancel(self) -> "PGcancel":
"""
Create an object with the information needed to cancel a command.
See :pq:`PQgetCancel` for details.
"""
rv = impl.PQgetCancel(self._pgconn_ptr)
if not rv:
raise e.OperationalError("couldn't create cancel object")
return PGcancel(rv)
def notifies(self) -> Optional[PGnotify]:
ptr = impl.PQnotifies(self._pgconn_ptr)
if ptr:
c = ptr.contents
return PGnotify(c.relname, c.be_pid, c.extra)
impl.PQfreemem(ptr)
else:
return None
def put_copy_data(self, buffer: "abc.Buffer") -> int:
if not isinstance(buffer, bytes):
buffer = bytes(buffer)
rv = impl.PQputCopyData(self._pgconn_ptr, buffer, len(buffer))
if rv < 0:
raise e.OperationalError(f"sending copy data failed: {error_message(self)}")
return rv
def put_copy_end(self, error: Optional[bytes] = None) -> int:
rv = impl.PQputCopyEnd(self._pgconn_ptr, error)
if rv < 0:
raise e.OperationalError(f"sending copy end failed: {error_message(self)}")
return rv
def get_copy_data(self, async_: int) -> Tuple[int, memoryview]:
buffer_ptr = c_char_p()
nbytes = impl.PQgetCopyData(self._pgconn_ptr, byref(buffer_ptr), async_)
if nbytes == -2:
raise e.OperationalError(
f"receiving copy data failed: {error_message(self)}"
)
if buffer_ptr:
# TODO: do it without copy
data = string_at(buffer_ptr, nbytes)
impl.PQfreemem(buffer_ptr)
return nbytes, memoryview(data)
else:
return nbytes, memoryview(b"")
def trace(self, fileno: int) -> None:
"""
Enable tracing of the client/server communication to a file stream.
See :pq:`PQtrace` for details.
"""
if sys.platform != "linux":
raise e.NotSupportedError("currently only supported on Linux")
stream = impl.fdopen(fileno, b"w")
impl.PQtrace(self._pgconn_ptr, stream)
def set_trace_flags(self, flags: Trace) -> None:
"""
Configure tracing behavior of client/server communication.
:param flags: operating mode of tracing.
See :pq:`PQsetTraceFlags` for details.
"""
impl.PQsetTraceFlags(self._pgconn_ptr, flags)
def untrace(self) -> None:
"""
Disable tracing, previously enabled through `trace()`.
See :pq:`PQuntrace` for details.
"""
impl.PQuntrace(self._pgconn_ptr)
def encrypt_password(
self, passwd: bytes, user: bytes, algorithm: Optional[bytes] = None
) -> bytes:
"""
Return the encrypted form of a PostgreSQL password.
See :pq:`PQencryptPasswordConn` for details.
"""
out = impl.PQencryptPasswordConn(self._pgconn_ptr, passwd, user, algorithm)
if not out:
raise e.OperationalError(
f"password encryption failed: {error_message(self)}"
)
rv = string_at(out)
impl.PQfreemem(out)
return rv
def make_empty_result(self, exec_status: int) -> "PGresult":
rv = impl.PQmakeEmptyPGresult(self._pgconn_ptr, exec_status)
if not rv:
raise MemoryError("couldn't allocate empty PGresult")
return PGresult(rv)
@property
def pipeline_status(self) -> int:
if version() < 140000:
return 0
return impl.PQpipelineStatus(self._pgconn_ptr)
def enter_pipeline_mode(self) -> None:
"""Enter pipeline mode.
:raises ~e.OperationalError: in case of failure to enter the pipeline
mode.
"""
if impl.PQenterPipelineMode(self._pgconn_ptr) != 1:
raise e.OperationalError("failed to enter pipeline mode")
def exit_pipeline_mode(self) -> None:
"""Exit pipeline mode.
:raises ~e.OperationalError: in case of failure to exit the pipeline
mode.
"""
if impl.PQexitPipelineMode(self._pgconn_ptr) != 1:
raise e.OperationalError(error_message(self))
def pipeline_sync(self) -> None:
"""Mark a synchronization point in a pipeline.
:raises ~e.OperationalError: if the connection is not in pipeline mode
or if sync failed.
"""
rv = impl.PQpipelineSync(self._pgconn_ptr)
if rv == 0:
raise e.OperationalError("connection not in pipeline mode")
if rv != 1:
raise e.OperationalError("failed to sync pipeline")
def send_flush_request(self) -> None:
"""Sends a request for the server to flush its output buffer.
:raises ~e.OperationalError: if the flush request failed.
"""
if impl.PQsendFlushRequest(self._pgconn_ptr) == 0:
raise e.OperationalError(f"flush request failed: {error_message(self)}")
def _call_bytes(
self, func: Callable[[impl.PGconn_struct], Optional[bytes]]
) -> bytes:
"""
Call one of the pgconn libpq functions returning a bytes pointer.
"""
if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
rv = func(self._pgconn_ptr)
assert rv is not None
return rv
def _call_int(self, func: Callable[[impl.PGconn_struct], int]) -> int:
"""
Call one of the pgconn libpq functions returning an int.
"""
if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
return func(self._pgconn_ptr)
def _call_bool(self, func: Callable[[impl.PGconn_struct], int]) -> bool:
"""
Call one of the pgconn libpq functions returning a logical value.
"""
if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
return bool(func(self._pgconn_ptr))
def _ensure_pgconn(self) -> None:
if not self._pgconn_ptr:
raise e.OperationalError("the connection is closed")
class PGresult:
"""
Python representation of a libpq result.
"""
__slots__ = ("_pgresult_ptr",)
def __init__(self, pgresult_ptr: impl.PGresult_struct):
self._pgresult_ptr: Optional[impl.PGresult_struct] = pgresult_ptr
def __del__(self) -> None:
self.clear()
def __repr__(self) -> str:
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
status = ExecStatus(self.status)
return f"<{cls} [{status.name}] at 0x{id(self):x}>"
def clear(self) -> None:
self._pgresult_ptr, p = None, self._pgresult_ptr
if p:
PQclear(p)
@property
def pgresult_ptr(self) -> Optional[int]:
"""The pointer to the underlying `!PGresult` structure, as integer.
`!None` if the result was cleared.
The value can be used to pass the structure to libpq functions which
psycopg doesn't (currently) wrap, either in C or in Python using FFI
libraries such as `ctypes`.
"""
if self._pgresult_ptr is None:
return None
return addressof(self._pgresult_ptr.contents) # type: ignore[attr-defined]
@property
def status(self) -> int:
return impl.PQresultStatus(self._pgresult_ptr)
@property
def error_message(self) -> bytes:
return impl.PQresultErrorMessage(self._pgresult_ptr)
def error_field(self, fieldcode: int) -> Optional[bytes]:
return impl.PQresultErrorField(self._pgresult_ptr, fieldcode)
@property
def ntuples(self) -> int:
return impl.PQntuples(self._pgresult_ptr)
@property
def nfields(self) -> int:
return impl.PQnfields(self._pgresult_ptr)
def fname(self, column_number: int) -> Optional[bytes]:
return impl.PQfname(self._pgresult_ptr, column_number)
def ftable(self, column_number: int) -> int:
return impl.PQftable(self._pgresult_ptr, column_number)
def ftablecol(self, column_number: int) -> int:
return impl.PQftablecol(self._pgresult_ptr, column_number)
def fformat(self, column_number: int) -> int:
return impl.PQfformat(self._pgresult_ptr, column_number)
def ftype(self, column_number: int) -> int:
return impl.PQftype(self._pgresult_ptr, column_number)
def fmod(self, column_number: int) -> int:
return impl.PQfmod(self._pgresult_ptr, column_number)
def fsize(self, column_number: int) -> int:
return impl.PQfsize(self._pgresult_ptr, column_number)
@property
def binary_tuples(self) -> int:
return impl.PQbinaryTuples(self._pgresult_ptr)
def get_value(self, row_number: int, column_number: int) -> Optional[bytes]:
length: int = impl.PQgetlength(self._pgresult_ptr, row_number, column_number)
if length:
v = impl.PQgetvalue(self._pgresult_ptr, row_number, column_number)
return string_at(v, length)
else:
if impl.PQgetisnull(self._pgresult_ptr, row_number, column_number):
return None
else:
return b""
@property
def nparams(self) -> int:
return impl.PQnparams(self._pgresult_ptr)
def param_type(self, param_number: int) -> int:
return impl.PQparamtype(self._pgresult_ptr, param_number)
@property
def command_status(self) -> Optional[bytes]:
return impl.PQcmdStatus(self._pgresult_ptr)
@property
def command_tuples(self) -> Optional[int]:
rv = impl.PQcmdTuples(self._pgresult_ptr)
return int(rv) if rv else None
@property
def oid_value(self) -> int:
return impl.PQoidValue(self._pgresult_ptr)
def set_attributes(self, descriptions: List[PGresAttDesc]) -> None:
structs = [
impl.PGresAttDesc_struct(*desc) for desc in descriptions # type: ignore
]
array = (impl.PGresAttDesc_struct * len(structs))(*structs) # type: ignore
rv = impl.PQsetResultAttrs(self._pgresult_ptr, len(structs), array)
if rv == 0:
raise e.OperationalError("PQsetResultAttrs failed")
class PGcancel:
"""
Token to cancel the current operation on a connection.
Created by `PGconn.get_cancel()`.
"""
__slots__ = ("pgcancel_ptr",)
def __init__(self, pgcancel_ptr: impl.PGcancel_struct):
self.pgcancel_ptr: Optional[impl.PGcancel_struct] = pgcancel_ptr
def __del__(self) -> None:
self.free()
def free(self) -> None:
"""
Free the data structure created by :pq:`PQgetCancel()`.
Automatically invoked by `!__del__()`.
See :pq:`PQfreeCancel()` for details.
"""
self.pgcancel_ptr, p = None, self.pgcancel_ptr
if p:
PQfreeCancel(p)
def cancel(self) -> None:
"""Requests that the server abandon processing of the current command.
See :pq:`PQcancel()` for details.
"""
buf = create_string_buffer(256)
res = impl.PQcancel(
self.pgcancel_ptr,
byref(buf), # type: ignore[arg-type]
len(buf),
)
if not res:
raise e.OperationalError(
f"cancel failed: {buf.value.decode('utf8', 'ignore')}"
)
class Conninfo:
"""
Utility object to manipulate connection strings.
"""
@classmethod
def get_defaults(cls) -> List[ConninfoOption]:
opts = impl.PQconndefaults()
if not opts:
raise MemoryError("couldn't allocate connection defaults")
try:
return cls._options_from_array(opts)
finally:
impl.PQconninfoFree(opts)
@classmethod
def parse(cls, conninfo: bytes) -> List[ConninfoOption]:
if not isinstance(conninfo, bytes):
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
errmsg = c_char_p()
rv = impl.PQconninfoParse(conninfo, byref(errmsg)) # type: ignore[arg-type]
if not rv:
if not errmsg:
raise MemoryError("couldn't allocate on conninfo parse")
else:
exc = e.OperationalError(
(errmsg.value or b"").decode("utf8", "replace")
)
impl.PQfreemem(errmsg)
raise exc
try:
return cls._options_from_array(rv)
finally:
impl.PQconninfoFree(rv)
@classmethod
def _options_from_array(
cls, opts: Sequence[impl.PQconninfoOption_struct]
) -> List[ConninfoOption]:
rv = []
skws = "keyword envvar compiled val label dispchar".split()
for opt in opts:
if not opt.keyword:
break
d = {kw: getattr(opt, kw) for kw in skws}
d["dispsize"] = opt.dispsize
rv.append(ConninfoOption(**d))
return rv
class Escaping:
"""
Utility object to escape strings for SQL interpolation.
"""
def __init__(self, conn: Optional[PGconn] = None):
self.conn = conn
def escape_literal(self, data: "abc.Buffer") -> bytes:
if not self.conn:
raise e.OperationalError("escape_literal failed: no connection provided")
self.conn._ensure_pgconn()
# TODO: might be done without copy (however C does that)
if not isinstance(data, bytes):
data = bytes(data)
out = impl.PQescapeLiteral(self.conn._pgconn_ptr, data, len(data))
if not out:
raise e.OperationalError(
f"escape_literal failed: {error_message(self.conn)} bytes"
)
rv = string_at(out)
impl.PQfreemem(out)
return rv
def escape_identifier(self, data: "abc.Buffer") -> bytes:
if not self.conn:
raise e.OperationalError("escape_identifier failed: no connection provided")
self.conn._ensure_pgconn()
if not isinstance(data, bytes):
data = bytes(data)
out = impl.PQescapeIdentifier(self.conn._pgconn_ptr, data, len(data))
if not out:
raise e.OperationalError(
f"escape_identifier failed: {error_message(self.conn)} bytes"
)
rv = string_at(out)
impl.PQfreemem(out)
return rv
def escape_string(self, data: "abc.Buffer") -> bytes:
if not isinstance(data, bytes):
data = bytes(data)
if self.conn:
self.conn._ensure_pgconn()
error = c_int()
out = create_string_buffer(len(data) * 2 + 1)
impl.PQescapeStringConn(
self.conn._pgconn_ptr,
byref(out), # type: ignore[arg-type]
data,
len(data),
byref(error), # type: ignore[arg-type]
)
if error:
raise e.OperationalError(
f"escape_string failed: {error_message(self.conn)} bytes"
)
else:
out = create_string_buffer(len(data) * 2 + 1)
impl.PQescapeString(
byref(out), # type: ignore[arg-type]
data,
len(data),
)
return out.value
def escape_bytea(self, data: "abc.Buffer") -> bytes:
len_out = c_size_t()
# TODO: might be able to do without a copy but it's a mess.
# the C library does it better anyway, so maybe not worth optimising
# https://mail.python.org/pipermail/python-dev/2012-September/121780.html
if not isinstance(data, bytes):
data = bytes(data)
if self.conn:
self.conn._ensure_pgconn()
out = impl.PQescapeByteaConn(
self.conn._pgconn_ptr,
data,
len(data),
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
)
else:
out = impl.PQescapeBytea(
data,
len(data),
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
)
if not out:
raise MemoryError(
f"couldn't allocate for escape_bytea of {len(data)} bytes"
)
rv = string_at(out, len_out.value - 1) # out includes final 0
impl.PQfreemem(out)
return rv
def unescape_bytea(self, data: "abc.Buffer") -> bytes:
# not needed, but let's keep it symmetric with the escaping:
# if a connection is passed in, it must be valid.
if self.conn:
self.conn._ensure_pgconn()
len_out = c_size_t()
if not isinstance(data, bytes):
data = bytes(data)
out = impl.PQunescapeBytea(
data,
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
)
if not out:
raise MemoryError(
f"couldn't allocate for unescape_bytea of {len(data)} bytes"
)
rv = string_at(out, len_out.value)
impl.PQfreemem(out)
return rv
# importing the ssl module sets up Python's libcrypto callbacks
import ssl # noqa
# disable libcrypto setup in libpq, so it won't stomp on the callbacks
# that have already been set up
impl.PQinitOpenSSL(1, 0)
__build_version__ = version()