1089 lines
35 KiB
Python
1089 lines
35 KiB
Python
"""
|
|
libpq Python wrapper using ctypes bindings.
|
|
|
|
Clients shouldn't use this module directly, unless for testing: they should use
|
|
the `pq` module instead, which is in charge of choosing the best
|
|
implementation.
|
|
"""
|
|
|
|
# Copyright (C) 2020 The Psycopg Team
|
|
|
|
import sys
|
|
import logging
|
|
from os import getpid
|
|
from weakref import ref
|
|
|
|
from ctypes import Array, POINTER, cast, string_at, create_string_buffer, byref
|
|
from ctypes import addressof, c_char_p, c_int, c_size_t, c_ulong, c_void_p, py_object
|
|
from typing import Any, Callable, List, Optional, Sequence, Tuple
|
|
from typing import cast as t_cast, TYPE_CHECKING
|
|
|
|
from .. import errors as e
|
|
from . import _pq_ctypes as impl
|
|
from .misc import PGnotify, ConninfoOption, PGresAttDesc
|
|
from .misc import error_message, connection_summary
|
|
from ._enums import Format, ExecStatus, Trace
|
|
|
|
# Imported locally to call them from __del__ methods
|
|
from ._pq_ctypes import PQclear, PQfinish, PQfreeCancel, PQstatus
|
|
|
|
if TYPE_CHECKING:
|
|
from . import abc
|
|
|
|
__impl__ = "python"
|
|
|
|
logger = logging.getLogger("psycopg")
|
|
|
|
|
|
def version() -> int:
|
|
"""Return the version number of the libpq currently loaded.
|
|
|
|
The number is in the same format of `~psycopg.ConnectionInfo.server_version`.
|
|
|
|
Certain features might not be available if the libpq library used is too old.
|
|
"""
|
|
return impl.PQlibVersion()
|
|
|
|
|
|
@impl.PQnoticeReceiver # type: ignore
|
|
def notice_receiver(arg: c_void_p, result_ptr: impl.PGresult_struct) -> None:
|
|
pgconn = cast(arg, POINTER(py_object)).contents.value()
|
|
if not (pgconn and pgconn.notice_handler):
|
|
return
|
|
|
|
res = PGresult(result_ptr)
|
|
try:
|
|
pgconn.notice_handler(res)
|
|
except Exception as exc:
|
|
logger.exception("error in notice receiver: %s", exc)
|
|
finally:
|
|
res._pgresult_ptr = None # avoid destroying the pgresult_ptr
|
|
|
|
|
|
class PGconn:
|
|
"""
|
|
Python representation of a libpq connection.
|
|
"""
|
|
|
|
__slots__ = (
|
|
"_pgconn_ptr",
|
|
"notice_handler",
|
|
"notify_handler",
|
|
"_self_ptr",
|
|
"_procpid",
|
|
"__weakref__",
|
|
)
|
|
|
|
def __init__(self, pgconn_ptr: impl.PGconn_struct):
|
|
self._pgconn_ptr: Optional[impl.PGconn_struct] = pgconn_ptr
|
|
self.notice_handler: Optional[Callable[["abc.PGresult"], None]] = None
|
|
self.notify_handler: Optional[Callable[[PGnotify], None]] = None
|
|
|
|
# Keep alive for the lifetime of PGconn
|
|
self._self_ptr = py_object(ref(self))
|
|
impl.PQsetNoticeReceiver(pgconn_ptr, notice_receiver, byref(self._self_ptr))
|
|
|
|
self._procpid = getpid()
|
|
|
|
def __del__(self) -> None:
|
|
# Close the connection only if it was created in this process,
|
|
# not if this object is being GC'd after fork.
|
|
if getpid() == self._procpid:
|
|
self.finish()
|
|
|
|
def __repr__(self) -> str:
|
|
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
|
|
info = connection_summary(self)
|
|
return f"<{cls} {info} at 0x{id(self):x}>"
|
|
|
|
@classmethod
|
|
def connect(cls, conninfo: bytes) -> "PGconn":
|
|
if not isinstance(conninfo, bytes):
|
|
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
|
|
|
|
pgconn_ptr = impl.PQconnectdb(conninfo)
|
|
if not pgconn_ptr:
|
|
raise MemoryError("couldn't allocate PGconn")
|
|
return cls(pgconn_ptr)
|
|
|
|
@classmethod
|
|
def connect_start(cls, conninfo: bytes) -> "PGconn":
|
|
if not isinstance(conninfo, bytes):
|
|
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
|
|
|
|
pgconn_ptr = impl.PQconnectStart(conninfo)
|
|
if not pgconn_ptr:
|
|
raise MemoryError("couldn't allocate PGconn")
|
|
return cls(pgconn_ptr)
|
|
|
|
def connect_poll(self) -> int:
|
|
return self._call_int(impl.PQconnectPoll)
|
|
|
|
def finish(self) -> None:
|
|
self._pgconn_ptr, p = None, self._pgconn_ptr
|
|
if p:
|
|
PQfinish(p)
|
|
|
|
@property
|
|
def pgconn_ptr(self) -> Optional[int]:
|
|
"""The pointer to the underlying `!PGconn` structure, as integer.
|
|
|
|
`!None` if the connection is closed.
|
|
|
|
The value can be used to pass the structure to libpq functions which
|
|
psycopg doesn't (currently) wrap, either in C or in Python using FFI
|
|
libraries such as `ctypes`.
|
|
"""
|
|
if self._pgconn_ptr is None:
|
|
return None
|
|
|
|
return addressof(self._pgconn_ptr.contents) # type: ignore[attr-defined]
|
|
|
|
@property
|
|
def info(self) -> List["ConninfoOption"]:
|
|
self._ensure_pgconn()
|
|
opts = impl.PQconninfo(self._pgconn_ptr)
|
|
if not opts:
|
|
raise MemoryError("couldn't allocate connection info")
|
|
try:
|
|
return Conninfo._options_from_array(opts)
|
|
finally:
|
|
impl.PQconninfoFree(opts)
|
|
|
|
def reset(self) -> None:
|
|
self._ensure_pgconn()
|
|
impl.PQreset(self._pgconn_ptr)
|
|
|
|
def reset_start(self) -> None:
|
|
if not impl.PQresetStart(self._pgconn_ptr):
|
|
raise e.OperationalError("couldn't reset connection")
|
|
|
|
def reset_poll(self) -> int:
|
|
return self._call_int(impl.PQresetPoll)
|
|
|
|
@classmethod
|
|
def ping(self, conninfo: bytes) -> int:
|
|
if not isinstance(conninfo, bytes):
|
|
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
|
|
|
|
return impl.PQping(conninfo)
|
|
|
|
@property
|
|
def db(self) -> bytes:
|
|
return self._call_bytes(impl.PQdb)
|
|
|
|
@property
|
|
def user(self) -> bytes:
|
|
return self._call_bytes(impl.PQuser)
|
|
|
|
@property
|
|
def password(self) -> bytes:
|
|
return self._call_bytes(impl.PQpass)
|
|
|
|
@property
|
|
def host(self) -> bytes:
|
|
return self._call_bytes(impl.PQhost)
|
|
|
|
@property
|
|
def hostaddr(self) -> bytes:
|
|
return self._call_bytes(impl.PQhostaddr)
|
|
|
|
@property
|
|
def port(self) -> bytes:
|
|
return self._call_bytes(impl.PQport)
|
|
|
|
@property
|
|
def tty(self) -> bytes:
|
|
return self._call_bytes(impl.PQtty)
|
|
|
|
@property
|
|
def options(self) -> bytes:
|
|
return self._call_bytes(impl.PQoptions)
|
|
|
|
@property
|
|
def status(self) -> int:
|
|
return PQstatus(self._pgconn_ptr)
|
|
|
|
@property
|
|
def transaction_status(self) -> int:
|
|
return impl.PQtransactionStatus(self._pgconn_ptr)
|
|
|
|
def parameter_status(self, name: bytes) -> Optional[bytes]:
|
|
self._ensure_pgconn()
|
|
return impl.PQparameterStatus(self._pgconn_ptr, name)
|
|
|
|
@property
|
|
def error_message(self) -> bytes:
|
|
return impl.PQerrorMessage(self._pgconn_ptr)
|
|
|
|
@property
|
|
def protocol_version(self) -> int:
|
|
return self._call_int(impl.PQprotocolVersion)
|
|
|
|
@property
|
|
def server_version(self) -> int:
|
|
return self._call_int(impl.PQserverVersion)
|
|
|
|
@property
|
|
def socket(self) -> int:
|
|
rv = self._call_int(impl.PQsocket)
|
|
if rv == -1:
|
|
raise e.OperationalError("the connection is lost")
|
|
return rv
|
|
|
|
@property
|
|
def backend_pid(self) -> int:
|
|
return self._call_int(impl.PQbackendPID)
|
|
|
|
@property
|
|
def needs_password(self) -> bool:
|
|
"""True if the connection authentication method required a password,
|
|
but none was available.
|
|
|
|
See :pq:`PQconnectionNeedsPassword` for details.
|
|
"""
|
|
return bool(impl.PQconnectionNeedsPassword(self._pgconn_ptr))
|
|
|
|
@property
|
|
def used_password(self) -> bool:
|
|
"""True if the connection authentication method used a password.
|
|
|
|
See :pq:`PQconnectionUsedPassword` for details.
|
|
"""
|
|
return bool(impl.PQconnectionUsedPassword(self._pgconn_ptr))
|
|
|
|
@property
|
|
def ssl_in_use(self) -> bool:
|
|
return self._call_bool(impl.PQsslInUse)
|
|
|
|
def exec_(self, command: bytes) -> "PGresult":
|
|
if not isinstance(command, bytes):
|
|
raise TypeError(f"bytes expected, got {type(command)} instead")
|
|
self._ensure_pgconn()
|
|
rv = impl.PQexec(self._pgconn_ptr, command)
|
|
if not rv:
|
|
raise e.OperationalError(f"executing query failed: {error_message(self)}")
|
|
return PGresult(rv)
|
|
|
|
def send_query(self, command: bytes) -> None:
|
|
if not isinstance(command, bytes):
|
|
raise TypeError(f"bytes expected, got {type(command)} instead")
|
|
self._ensure_pgconn()
|
|
if not impl.PQsendQuery(self._pgconn_ptr, command):
|
|
raise e.OperationalError(f"sending query failed: {error_message(self)}")
|
|
|
|
def exec_params(
|
|
self,
|
|
command: bytes,
|
|
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
|
|
param_types: Optional[Sequence[int]] = None,
|
|
param_formats: Optional[Sequence[int]] = None,
|
|
result_format: int = Format.TEXT,
|
|
) -> "PGresult":
|
|
args = self._query_params_args(
|
|
command, param_values, param_types, param_formats, result_format
|
|
)
|
|
self._ensure_pgconn()
|
|
rv = impl.PQexecParams(*args)
|
|
if not rv:
|
|
raise e.OperationalError(f"executing query failed: {error_message(self)}")
|
|
return PGresult(rv)
|
|
|
|
def send_query_params(
|
|
self,
|
|
command: bytes,
|
|
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
|
|
param_types: Optional[Sequence[int]] = None,
|
|
param_formats: Optional[Sequence[int]] = None,
|
|
result_format: int = Format.TEXT,
|
|
) -> None:
|
|
args = self._query_params_args(
|
|
command, param_values, param_types, param_formats, result_format
|
|
)
|
|
self._ensure_pgconn()
|
|
if not impl.PQsendQueryParams(*args):
|
|
raise e.OperationalError(
|
|
f"sending query and params failed: {error_message(self)}"
|
|
)
|
|
|
|
def send_prepare(
|
|
self,
|
|
name: bytes,
|
|
command: bytes,
|
|
param_types: Optional[Sequence[int]] = None,
|
|
) -> None:
|
|
atypes: Optional[Array[impl.Oid]]
|
|
if not param_types:
|
|
nparams = 0
|
|
atypes = None
|
|
else:
|
|
nparams = len(param_types)
|
|
atypes = (impl.Oid * nparams)(*param_types)
|
|
|
|
self._ensure_pgconn()
|
|
if not impl.PQsendPrepare(self._pgconn_ptr, name, command, nparams, atypes):
|
|
raise e.OperationalError(
|
|
f"sending query and params failed: {error_message(self)}"
|
|
)
|
|
|
|
def send_query_prepared(
|
|
self,
|
|
name: bytes,
|
|
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
|
|
param_formats: Optional[Sequence[int]] = None,
|
|
result_format: int = Format.TEXT,
|
|
) -> None:
|
|
# repurpose this function with a cheeky replacement of query with name,
|
|
# drop the param_types from the result
|
|
args = self._query_params_args(
|
|
name, param_values, None, param_formats, result_format
|
|
)
|
|
args = args[:3] + args[4:]
|
|
|
|
self._ensure_pgconn()
|
|
if not impl.PQsendQueryPrepared(*args):
|
|
raise e.OperationalError(
|
|
f"sending prepared query failed: {error_message(self)}"
|
|
)
|
|
|
|
def _query_params_args(
|
|
self,
|
|
command: bytes,
|
|
param_values: Optional[Sequence[Optional["abc.Buffer"]]],
|
|
param_types: Optional[Sequence[int]] = None,
|
|
param_formats: Optional[Sequence[int]] = None,
|
|
result_format: int = Format.TEXT,
|
|
) -> Any:
|
|
if not isinstance(command, bytes):
|
|
raise TypeError(f"bytes expected, got {type(command)} instead")
|
|
|
|
aparams: Optional[Array[c_char_p]]
|
|
alenghts: Optional[Array[c_int]]
|
|
if param_values:
|
|
nparams = len(param_values)
|
|
aparams = (c_char_p * nparams)(
|
|
*(
|
|
# convert bytearray/memoryview to bytes
|
|
b if b is None or isinstance(b, bytes) else bytes(b)
|
|
for b in param_values
|
|
)
|
|
)
|
|
alenghts = (c_int * nparams)(*(len(p) if p else 0 for p in param_values))
|
|
else:
|
|
nparams = 0
|
|
aparams = alenghts = None
|
|
|
|
atypes: Optional[Array[impl.Oid]]
|
|
if not param_types:
|
|
atypes = None
|
|
else:
|
|
if len(param_types) != nparams:
|
|
raise ValueError(
|
|
"got %d param_values but %d param_types"
|
|
% (nparams, len(param_types))
|
|
)
|
|
atypes = (impl.Oid * nparams)(*param_types)
|
|
|
|
if not param_formats:
|
|
aformats = None
|
|
else:
|
|
if len(param_formats) != nparams:
|
|
raise ValueError(
|
|
"got %d param_values but %d param_formats"
|
|
% (nparams, len(param_formats))
|
|
)
|
|
aformats = (c_int * nparams)(*param_formats)
|
|
|
|
return (
|
|
self._pgconn_ptr,
|
|
command,
|
|
nparams,
|
|
atypes,
|
|
aparams,
|
|
alenghts,
|
|
aformats,
|
|
result_format,
|
|
)
|
|
|
|
def prepare(
|
|
self,
|
|
name: bytes,
|
|
command: bytes,
|
|
param_types: Optional[Sequence[int]] = None,
|
|
) -> "PGresult":
|
|
if not isinstance(name, bytes):
|
|
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
|
|
|
|
if not isinstance(command, bytes):
|
|
raise TypeError(f"'command' must be bytes, got {type(command)} instead")
|
|
|
|
if not param_types:
|
|
nparams = 0
|
|
atypes = None
|
|
else:
|
|
nparams = len(param_types)
|
|
atypes = (impl.Oid * nparams)(*param_types)
|
|
|
|
self._ensure_pgconn()
|
|
rv = impl.PQprepare(self._pgconn_ptr, name, command, nparams, atypes)
|
|
if not rv:
|
|
raise e.OperationalError(f"preparing query failed: {error_message(self)}")
|
|
return PGresult(rv)
|
|
|
|
def exec_prepared(
|
|
self,
|
|
name: bytes,
|
|
param_values: Optional[Sequence["abc.Buffer"]],
|
|
param_formats: Optional[Sequence[int]] = None,
|
|
result_format: int = 0,
|
|
) -> "PGresult":
|
|
if not isinstance(name, bytes):
|
|
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
|
|
|
|
aparams: Optional[Array[c_char_p]]
|
|
alenghts: Optional[Array[c_int]]
|
|
if param_values:
|
|
nparams = len(param_values)
|
|
aparams = (c_char_p * nparams)(
|
|
*(
|
|
# convert bytearray/memoryview to bytes
|
|
b if b is None or isinstance(b, bytes) else bytes(b)
|
|
for b in param_values
|
|
)
|
|
)
|
|
alenghts = (c_int * nparams)(*(len(p) if p else 0 for p in param_values))
|
|
else:
|
|
nparams = 0
|
|
aparams = alenghts = None
|
|
|
|
if not param_formats:
|
|
aformats = None
|
|
else:
|
|
if len(param_formats) != nparams:
|
|
raise ValueError(
|
|
"got %d param_values but %d param_types"
|
|
% (nparams, len(param_formats))
|
|
)
|
|
aformats = (c_int * nparams)(*param_formats)
|
|
|
|
self._ensure_pgconn()
|
|
rv = impl.PQexecPrepared(
|
|
self._pgconn_ptr,
|
|
name,
|
|
nparams,
|
|
aparams,
|
|
alenghts,
|
|
aformats,
|
|
result_format,
|
|
)
|
|
if not rv:
|
|
raise e.OperationalError(
|
|
f"executing prepared query failed: {error_message(self)}"
|
|
)
|
|
return PGresult(rv)
|
|
|
|
def describe_prepared(self, name: bytes) -> "PGresult":
|
|
if not isinstance(name, bytes):
|
|
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
|
|
self._ensure_pgconn()
|
|
rv = impl.PQdescribePrepared(self._pgconn_ptr, name)
|
|
if not rv:
|
|
raise e.OperationalError(f"describe prepared failed: {error_message(self)}")
|
|
return PGresult(rv)
|
|
|
|
def send_describe_prepared(self, name: bytes) -> None:
|
|
if not isinstance(name, bytes):
|
|
raise TypeError(f"bytes expected, got {type(name)} instead")
|
|
self._ensure_pgconn()
|
|
if not impl.PQsendDescribePrepared(self._pgconn_ptr, name):
|
|
raise e.OperationalError(
|
|
f"sending describe prepared failed: {error_message(self)}"
|
|
)
|
|
|
|
def describe_portal(self, name: bytes) -> "PGresult":
|
|
if not isinstance(name, bytes):
|
|
raise TypeError(f"'name' must be bytes, got {type(name)} instead")
|
|
self._ensure_pgconn()
|
|
rv = impl.PQdescribePortal(self._pgconn_ptr, name)
|
|
if not rv:
|
|
raise e.OperationalError(f"describe portal failed: {error_message(self)}")
|
|
return PGresult(rv)
|
|
|
|
def send_describe_portal(self, name: bytes) -> None:
|
|
if not isinstance(name, bytes):
|
|
raise TypeError(f"bytes expected, got {type(name)} instead")
|
|
self._ensure_pgconn()
|
|
if not impl.PQsendDescribePortal(self._pgconn_ptr, name):
|
|
raise e.OperationalError(
|
|
f"sending describe portal failed: {error_message(self)}"
|
|
)
|
|
|
|
def get_result(self) -> Optional["PGresult"]:
|
|
rv = impl.PQgetResult(self._pgconn_ptr)
|
|
return PGresult(rv) if rv else None
|
|
|
|
def consume_input(self) -> None:
|
|
if 1 != impl.PQconsumeInput(self._pgconn_ptr):
|
|
raise e.OperationalError(f"consuming input failed: {error_message(self)}")
|
|
|
|
def is_busy(self) -> int:
|
|
return impl.PQisBusy(self._pgconn_ptr)
|
|
|
|
@property
|
|
def nonblocking(self) -> int:
|
|
return impl.PQisnonblocking(self._pgconn_ptr)
|
|
|
|
@nonblocking.setter
|
|
def nonblocking(self, arg: int) -> None:
|
|
if 0 > impl.PQsetnonblocking(self._pgconn_ptr, arg):
|
|
raise e.OperationalError(
|
|
f"setting nonblocking failed: {error_message(self)}"
|
|
)
|
|
|
|
def flush(self) -> int:
|
|
# PQflush segfaults if it receives a NULL connection
|
|
if not self._pgconn_ptr:
|
|
raise e.OperationalError("flushing failed: the connection is closed")
|
|
rv: int = impl.PQflush(self._pgconn_ptr)
|
|
if rv < 0:
|
|
raise e.OperationalError(f"flushing failed: {error_message(self)}")
|
|
return rv
|
|
|
|
def set_single_row_mode(self) -> None:
|
|
if not impl.PQsetSingleRowMode(self._pgconn_ptr):
|
|
raise e.OperationalError("setting single row mode failed")
|
|
|
|
def get_cancel(self) -> "PGcancel":
|
|
"""
|
|
Create an object with the information needed to cancel a command.
|
|
|
|
See :pq:`PQgetCancel` for details.
|
|
"""
|
|
rv = impl.PQgetCancel(self._pgconn_ptr)
|
|
if not rv:
|
|
raise e.OperationalError("couldn't create cancel object")
|
|
return PGcancel(rv)
|
|
|
|
def notifies(self) -> Optional[PGnotify]:
|
|
ptr = impl.PQnotifies(self._pgconn_ptr)
|
|
if ptr:
|
|
c = ptr.contents
|
|
return PGnotify(c.relname, c.be_pid, c.extra)
|
|
impl.PQfreemem(ptr)
|
|
else:
|
|
return None
|
|
|
|
def put_copy_data(self, buffer: "abc.Buffer") -> int:
|
|
if not isinstance(buffer, bytes):
|
|
buffer = bytes(buffer)
|
|
rv = impl.PQputCopyData(self._pgconn_ptr, buffer, len(buffer))
|
|
if rv < 0:
|
|
raise e.OperationalError(f"sending copy data failed: {error_message(self)}")
|
|
return rv
|
|
|
|
def put_copy_end(self, error: Optional[bytes] = None) -> int:
|
|
rv = impl.PQputCopyEnd(self._pgconn_ptr, error)
|
|
if rv < 0:
|
|
raise e.OperationalError(f"sending copy end failed: {error_message(self)}")
|
|
return rv
|
|
|
|
def get_copy_data(self, async_: int) -> Tuple[int, memoryview]:
|
|
buffer_ptr = c_char_p()
|
|
nbytes = impl.PQgetCopyData(self._pgconn_ptr, byref(buffer_ptr), async_)
|
|
if nbytes == -2:
|
|
raise e.OperationalError(
|
|
f"receiving copy data failed: {error_message(self)}"
|
|
)
|
|
if buffer_ptr:
|
|
# TODO: do it without copy
|
|
data = string_at(buffer_ptr, nbytes)
|
|
impl.PQfreemem(buffer_ptr)
|
|
return nbytes, memoryview(data)
|
|
else:
|
|
return nbytes, memoryview(b"")
|
|
|
|
def trace(self, fileno: int) -> None:
|
|
"""
|
|
Enable tracing of the client/server communication to a file stream.
|
|
|
|
See :pq:`PQtrace` for details.
|
|
"""
|
|
if sys.platform != "linux":
|
|
raise e.NotSupportedError("currently only supported on Linux")
|
|
stream = impl.fdopen(fileno, b"w")
|
|
impl.PQtrace(self._pgconn_ptr, stream)
|
|
|
|
def set_trace_flags(self, flags: Trace) -> None:
|
|
"""
|
|
Configure tracing behavior of client/server communication.
|
|
|
|
:param flags: operating mode of tracing.
|
|
|
|
See :pq:`PQsetTraceFlags` for details.
|
|
"""
|
|
impl.PQsetTraceFlags(self._pgconn_ptr, flags)
|
|
|
|
def untrace(self) -> None:
|
|
"""
|
|
Disable tracing, previously enabled through `trace()`.
|
|
|
|
See :pq:`PQuntrace` for details.
|
|
"""
|
|
impl.PQuntrace(self._pgconn_ptr)
|
|
|
|
def encrypt_password(
|
|
self, passwd: bytes, user: bytes, algorithm: Optional[bytes] = None
|
|
) -> bytes:
|
|
"""
|
|
Return the encrypted form of a PostgreSQL password.
|
|
|
|
See :pq:`PQencryptPasswordConn` for details.
|
|
"""
|
|
out = impl.PQencryptPasswordConn(self._pgconn_ptr, passwd, user, algorithm)
|
|
if not out:
|
|
raise e.OperationalError(
|
|
f"password encryption failed: {error_message(self)}"
|
|
)
|
|
|
|
rv = string_at(out)
|
|
impl.PQfreemem(out)
|
|
return rv
|
|
|
|
def make_empty_result(self, exec_status: int) -> "PGresult":
|
|
rv = impl.PQmakeEmptyPGresult(self._pgconn_ptr, exec_status)
|
|
if not rv:
|
|
raise MemoryError("couldn't allocate empty PGresult")
|
|
return PGresult(rv)
|
|
|
|
@property
|
|
def pipeline_status(self) -> int:
|
|
if version() < 140000:
|
|
return 0
|
|
return impl.PQpipelineStatus(self._pgconn_ptr)
|
|
|
|
def enter_pipeline_mode(self) -> None:
|
|
"""Enter pipeline mode.
|
|
|
|
:raises ~e.OperationalError: in case of failure to enter the pipeline
|
|
mode.
|
|
"""
|
|
if impl.PQenterPipelineMode(self._pgconn_ptr) != 1:
|
|
raise e.OperationalError("failed to enter pipeline mode")
|
|
|
|
def exit_pipeline_mode(self) -> None:
|
|
"""Exit pipeline mode.
|
|
|
|
:raises ~e.OperationalError: in case of failure to exit the pipeline
|
|
mode.
|
|
"""
|
|
if impl.PQexitPipelineMode(self._pgconn_ptr) != 1:
|
|
raise e.OperationalError(error_message(self))
|
|
|
|
def pipeline_sync(self) -> None:
|
|
"""Mark a synchronization point in a pipeline.
|
|
|
|
:raises ~e.OperationalError: if the connection is not in pipeline mode
|
|
or if sync failed.
|
|
"""
|
|
rv = impl.PQpipelineSync(self._pgconn_ptr)
|
|
if rv == 0:
|
|
raise e.OperationalError("connection not in pipeline mode")
|
|
if rv != 1:
|
|
raise e.OperationalError("failed to sync pipeline")
|
|
|
|
def send_flush_request(self) -> None:
|
|
"""Sends a request for the server to flush its output buffer.
|
|
|
|
:raises ~e.OperationalError: if the flush request failed.
|
|
"""
|
|
if impl.PQsendFlushRequest(self._pgconn_ptr) == 0:
|
|
raise e.OperationalError(f"flush request failed: {error_message(self)}")
|
|
|
|
def _call_bytes(
|
|
self, func: Callable[[impl.PGconn_struct], Optional[bytes]]
|
|
) -> bytes:
|
|
"""
|
|
Call one of the pgconn libpq functions returning a bytes pointer.
|
|
"""
|
|
if not self._pgconn_ptr:
|
|
raise e.OperationalError("the connection is closed")
|
|
rv = func(self._pgconn_ptr)
|
|
assert rv is not None
|
|
return rv
|
|
|
|
def _call_int(self, func: Callable[[impl.PGconn_struct], int]) -> int:
|
|
"""
|
|
Call one of the pgconn libpq functions returning an int.
|
|
"""
|
|
if not self._pgconn_ptr:
|
|
raise e.OperationalError("the connection is closed")
|
|
return func(self._pgconn_ptr)
|
|
|
|
def _call_bool(self, func: Callable[[impl.PGconn_struct], int]) -> bool:
|
|
"""
|
|
Call one of the pgconn libpq functions returning a logical value.
|
|
"""
|
|
if not self._pgconn_ptr:
|
|
raise e.OperationalError("the connection is closed")
|
|
return bool(func(self._pgconn_ptr))
|
|
|
|
def _ensure_pgconn(self) -> None:
|
|
if not self._pgconn_ptr:
|
|
raise e.OperationalError("the connection is closed")
|
|
|
|
|
|
class PGresult:
|
|
"""
|
|
Python representation of a libpq result.
|
|
"""
|
|
|
|
__slots__ = ("_pgresult_ptr",)
|
|
|
|
def __init__(self, pgresult_ptr: impl.PGresult_struct):
|
|
self._pgresult_ptr: Optional[impl.PGresult_struct] = pgresult_ptr
|
|
|
|
def __del__(self) -> None:
|
|
self.clear()
|
|
|
|
def __repr__(self) -> str:
|
|
cls = f"{self.__class__.__module__}.{self.__class__.__qualname__}"
|
|
status = ExecStatus(self.status)
|
|
return f"<{cls} [{status.name}] at 0x{id(self):x}>"
|
|
|
|
def clear(self) -> None:
|
|
self._pgresult_ptr, p = None, self._pgresult_ptr
|
|
if p:
|
|
PQclear(p)
|
|
|
|
@property
|
|
def pgresult_ptr(self) -> Optional[int]:
|
|
"""The pointer to the underlying `!PGresult` structure, as integer.
|
|
|
|
`!None` if the result was cleared.
|
|
|
|
The value can be used to pass the structure to libpq functions which
|
|
psycopg doesn't (currently) wrap, either in C or in Python using FFI
|
|
libraries such as `ctypes`.
|
|
"""
|
|
if self._pgresult_ptr is None:
|
|
return None
|
|
|
|
return addressof(self._pgresult_ptr.contents) # type: ignore[attr-defined]
|
|
|
|
@property
|
|
def status(self) -> int:
|
|
return impl.PQresultStatus(self._pgresult_ptr)
|
|
|
|
@property
|
|
def error_message(self) -> bytes:
|
|
return impl.PQresultErrorMessage(self._pgresult_ptr)
|
|
|
|
def error_field(self, fieldcode: int) -> Optional[bytes]:
|
|
return impl.PQresultErrorField(self._pgresult_ptr, fieldcode)
|
|
|
|
@property
|
|
def ntuples(self) -> int:
|
|
return impl.PQntuples(self._pgresult_ptr)
|
|
|
|
@property
|
|
def nfields(self) -> int:
|
|
return impl.PQnfields(self._pgresult_ptr)
|
|
|
|
def fname(self, column_number: int) -> Optional[bytes]:
|
|
return impl.PQfname(self._pgresult_ptr, column_number)
|
|
|
|
def ftable(self, column_number: int) -> int:
|
|
return impl.PQftable(self._pgresult_ptr, column_number)
|
|
|
|
def ftablecol(self, column_number: int) -> int:
|
|
return impl.PQftablecol(self._pgresult_ptr, column_number)
|
|
|
|
def fformat(self, column_number: int) -> int:
|
|
return impl.PQfformat(self._pgresult_ptr, column_number)
|
|
|
|
def ftype(self, column_number: int) -> int:
|
|
return impl.PQftype(self._pgresult_ptr, column_number)
|
|
|
|
def fmod(self, column_number: int) -> int:
|
|
return impl.PQfmod(self._pgresult_ptr, column_number)
|
|
|
|
def fsize(self, column_number: int) -> int:
|
|
return impl.PQfsize(self._pgresult_ptr, column_number)
|
|
|
|
@property
|
|
def binary_tuples(self) -> int:
|
|
return impl.PQbinaryTuples(self._pgresult_ptr)
|
|
|
|
def get_value(self, row_number: int, column_number: int) -> Optional[bytes]:
|
|
length: int = impl.PQgetlength(self._pgresult_ptr, row_number, column_number)
|
|
if length:
|
|
v = impl.PQgetvalue(self._pgresult_ptr, row_number, column_number)
|
|
return string_at(v, length)
|
|
else:
|
|
if impl.PQgetisnull(self._pgresult_ptr, row_number, column_number):
|
|
return None
|
|
else:
|
|
return b""
|
|
|
|
@property
|
|
def nparams(self) -> int:
|
|
return impl.PQnparams(self._pgresult_ptr)
|
|
|
|
def param_type(self, param_number: int) -> int:
|
|
return impl.PQparamtype(self._pgresult_ptr, param_number)
|
|
|
|
@property
|
|
def command_status(self) -> Optional[bytes]:
|
|
return impl.PQcmdStatus(self._pgresult_ptr)
|
|
|
|
@property
|
|
def command_tuples(self) -> Optional[int]:
|
|
rv = impl.PQcmdTuples(self._pgresult_ptr)
|
|
return int(rv) if rv else None
|
|
|
|
@property
|
|
def oid_value(self) -> int:
|
|
return impl.PQoidValue(self._pgresult_ptr)
|
|
|
|
def set_attributes(self, descriptions: List[PGresAttDesc]) -> None:
|
|
structs = [
|
|
impl.PGresAttDesc_struct(*desc) for desc in descriptions # type: ignore
|
|
]
|
|
array = (impl.PGresAttDesc_struct * len(structs))(*structs) # type: ignore
|
|
rv = impl.PQsetResultAttrs(self._pgresult_ptr, len(structs), array)
|
|
if rv == 0:
|
|
raise e.OperationalError("PQsetResultAttrs failed")
|
|
|
|
|
|
class PGcancel:
|
|
"""
|
|
Token to cancel the current operation on a connection.
|
|
|
|
Created by `PGconn.get_cancel()`.
|
|
"""
|
|
|
|
__slots__ = ("pgcancel_ptr",)
|
|
|
|
def __init__(self, pgcancel_ptr: impl.PGcancel_struct):
|
|
self.pgcancel_ptr: Optional[impl.PGcancel_struct] = pgcancel_ptr
|
|
|
|
def __del__(self) -> None:
|
|
self.free()
|
|
|
|
def free(self) -> None:
|
|
"""
|
|
Free the data structure created by :pq:`PQgetCancel()`.
|
|
|
|
Automatically invoked by `!__del__()`.
|
|
|
|
See :pq:`PQfreeCancel()` for details.
|
|
"""
|
|
self.pgcancel_ptr, p = None, self.pgcancel_ptr
|
|
if p:
|
|
PQfreeCancel(p)
|
|
|
|
def cancel(self) -> None:
|
|
"""Requests that the server abandon processing of the current command.
|
|
|
|
See :pq:`PQcancel()` for details.
|
|
"""
|
|
buf = create_string_buffer(256)
|
|
res = impl.PQcancel(
|
|
self.pgcancel_ptr,
|
|
byref(buf), # type: ignore[arg-type]
|
|
len(buf),
|
|
)
|
|
if not res:
|
|
raise e.OperationalError(
|
|
f"cancel failed: {buf.value.decode('utf8', 'ignore')}"
|
|
)
|
|
|
|
|
|
class Conninfo:
|
|
"""
|
|
Utility object to manipulate connection strings.
|
|
"""
|
|
|
|
@classmethod
|
|
def get_defaults(cls) -> List[ConninfoOption]:
|
|
opts = impl.PQconndefaults()
|
|
if not opts:
|
|
raise MemoryError("couldn't allocate connection defaults")
|
|
try:
|
|
return cls._options_from_array(opts)
|
|
finally:
|
|
impl.PQconninfoFree(opts)
|
|
|
|
@classmethod
|
|
def parse(cls, conninfo: bytes) -> List[ConninfoOption]:
|
|
if not isinstance(conninfo, bytes):
|
|
raise TypeError(f"bytes expected, got {type(conninfo)} instead")
|
|
|
|
errmsg = c_char_p()
|
|
rv = impl.PQconninfoParse(conninfo, byref(errmsg)) # type: ignore[arg-type]
|
|
if not rv:
|
|
if not errmsg:
|
|
raise MemoryError("couldn't allocate on conninfo parse")
|
|
else:
|
|
exc = e.OperationalError(
|
|
(errmsg.value or b"").decode("utf8", "replace")
|
|
)
|
|
impl.PQfreemem(errmsg)
|
|
raise exc
|
|
|
|
try:
|
|
return cls._options_from_array(rv)
|
|
finally:
|
|
impl.PQconninfoFree(rv)
|
|
|
|
@classmethod
|
|
def _options_from_array(
|
|
cls, opts: Sequence[impl.PQconninfoOption_struct]
|
|
) -> List[ConninfoOption]:
|
|
rv = []
|
|
skws = "keyword envvar compiled val label dispchar".split()
|
|
for opt in opts:
|
|
if not opt.keyword:
|
|
break
|
|
d = {kw: getattr(opt, kw) for kw in skws}
|
|
d["dispsize"] = opt.dispsize
|
|
rv.append(ConninfoOption(**d))
|
|
|
|
return rv
|
|
|
|
|
|
class Escaping:
|
|
"""
|
|
Utility object to escape strings for SQL interpolation.
|
|
"""
|
|
|
|
def __init__(self, conn: Optional[PGconn] = None):
|
|
self.conn = conn
|
|
|
|
def escape_literal(self, data: "abc.Buffer") -> bytes:
|
|
if not self.conn:
|
|
raise e.OperationalError("escape_literal failed: no connection provided")
|
|
|
|
self.conn._ensure_pgconn()
|
|
# TODO: might be done without copy (however C does that)
|
|
if not isinstance(data, bytes):
|
|
data = bytes(data)
|
|
out = impl.PQescapeLiteral(self.conn._pgconn_ptr, data, len(data))
|
|
if not out:
|
|
raise e.OperationalError(
|
|
f"escape_literal failed: {error_message(self.conn)} bytes"
|
|
)
|
|
rv = string_at(out)
|
|
impl.PQfreemem(out)
|
|
return rv
|
|
|
|
def escape_identifier(self, data: "abc.Buffer") -> bytes:
|
|
if not self.conn:
|
|
raise e.OperationalError("escape_identifier failed: no connection provided")
|
|
|
|
self.conn._ensure_pgconn()
|
|
|
|
if not isinstance(data, bytes):
|
|
data = bytes(data)
|
|
out = impl.PQescapeIdentifier(self.conn._pgconn_ptr, data, len(data))
|
|
if not out:
|
|
raise e.OperationalError(
|
|
f"escape_identifier failed: {error_message(self.conn)} bytes"
|
|
)
|
|
rv = string_at(out)
|
|
impl.PQfreemem(out)
|
|
return rv
|
|
|
|
def escape_string(self, data: "abc.Buffer") -> bytes:
|
|
if not isinstance(data, bytes):
|
|
data = bytes(data)
|
|
|
|
if self.conn:
|
|
self.conn._ensure_pgconn()
|
|
error = c_int()
|
|
out = create_string_buffer(len(data) * 2 + 1)
|
|
impl.PQescapeStringConn(
|
|
self.conn._pgconn_ptr,
|
|
byref(out), # type: ignore[arg-type]
|
|
data,
|
|
len(data),
|
|
byref(error), # type: ignore[arg-type]
|
|
)
|
|
|
|
if error:
|
|
raise e.OperationalError(
|
|
f"escape_string failed: {error_message(self.conn)} bytes"
|
|
)
|
|
|
|
else:
|
|
out = create_string_buffer(len(data) * 2 + 1)
|
|
impl.PQescapeString(
|
|
byref(out), # type: ignore[arg-type]
|
|
data,
|
|
len(data),
|
|
)
|
|
|
|
return out.value
|
|
|
|
def escape_bytea(self, data: "abc.Buffer") -> bytes:
|
|
len_out = c_size_t()
|
|
# TODO: might be able to do without a copy but it's a mess.
|
|
# the C library does it better anyway, so maybe not worth optimising
|
|
# https://mail.python.org/pipermail/python-dev/2012-September/121780.html
|
|
if not isinstance(data, bytes):
|
|
data = bytes(data)
|
|
if self.conn:
|
|
self.conn._ensure_pgconn()
|
|
out = impl.PQescapeByteaConn(
|
|
self.conn._pgconn_ptr,
|
|
data,
|
|
len(data),
|
|
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
|
|
)
|
|
else:
|
|
out = impl.PQescapeBytea(
|
|
data,
|
|
len(data),
|
|
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
|
|
)
|
|
if not out:
|
|
raise MemoryError(
|
|
f"couldn't allocate for escape_bytea of {len(data)} bytes"
|
|
)
|
|
|
|
rv = string_at(out, len_out.value - 1) # out includes final 0
|
|
impl.PQfreemem(out)
|
|
return rv
|
|
|
|
def unescape_bytea(self, data: "abc.Buffer") -> bytes:
|
|
# not needed, but let's keep it symmetric with the escaping:
|
|
# if a connection is passed in, it must be valid.
|
|
if self.conn:
|
|
self.conn._ensure_pgconn()
|
|
|
|
len_out = c_size_t()
|
|
if not isinstance(data, bytes):
|
|
data = bytes(data)
|
|
out = impl.PQunescapeBytea(
|
|
data,
|
|
byref(t_cast(c_ulong, len_out)), # type: ignore[arg-type]
|
|
)
|
|
if not out:
|
|
raise MemoryError(
|
|
f"couldn't allocate for unescape_bytea of {len(data)} bytes"
|
|
)
|
|
|
|
rv = string_at(out, len_out.value)
|
|
impl.PQfreemem(out)
|
|
return rv
|
|
|
|
|
|
# importing the ssl module sets up Python's libcrypto callbacks
|
|
import ssl # noqa
|
|
|
|
# disable libcrypto setup in libpq, so it won't stomp on the callbacks
|
|
# that have already been set up
|
|
impl.PQinitOpenSSL(1, 0)
|
|
|
|
__build_version__ = version()
|