mysteriendrama/lib/python3.11/site-packages/psycopg/_encodings.py
2023-07-26 21:33:29 +02:00

171 lines
4.4 KiB
Python

"""
Mappings between PostgreSQL and Python encodings.
"""
# Copyright (C) 2020 The Psycopg Team
import re
import string
import codecs
from typing import Any, Dict, Optional, TYPE_CHECKING
from .pq._enums import ConnStatus
from .errors import NotSupportedError
from ._compat import cache
if TYPE_CHECKING:
from .pq.abc import PGconn
from .connection import BaseConnection
OK = ConnStatus.OK
_py_codecs = {
"BIG5": "big5",
"EUC_CN": "gb2312",
"EUC_JIS_2004": "euc_jis_2004",
"EUC_JP": "euc_jp",
"EUC_KR": "euc_kr",
# "EUC_TW": not available in Python
"GB18030": "gb18030",
"GBK": "gbk",
"ISO_8859_5": "iso8859-5",
"ISO_8859_6": "iso8859-6",
"ISO_8859_7": "iso8859-7",
"ISO_8859_8": "iso8859-8",
"JOHAB": "johab",
"KOI8R": "koi8-r",
"KOI8U": "koi8-u",
"LATIN1": "iso8859-1",
"LATIN10": "iso8859-16",
"LATIN2": "iso8859-2",
"LATIN3": "iso8859-3",
"LATIN4": "iso8859-4",
"LATIN5": "iso8859-9",
"LATIN6": "iso8859-10",
"LATIN7": "iso8859-13",
"LATIN8": "iso8859-14",
"LATIN9": "iso8859-15",
# "MULE_INTERNAL": not available in Python
"SHIFT_JIS_2004": "shift_jis_2004",
"SJIS": "shift_jis",
# this actually means no encoding, see PostgreSQL docs
# it is special-cased by the text loader.
"SQL_ASCII": "ascii",
"UHC": "cp949",
"UTF8": "utf-8",
"WIN1250": "cp1250",
"WIN1251": "cp1251",
"WIN1252": "cp1252",
"WIN1253": "cp1253",
"WIN1254": "cp1254",
"WIN1255": "cp1255",
"WIN1256": "cp1256",
"WIN1257": "cp1257",
"WIN1258": "cp1258",
"WIN866": "cp866",
"WIN874": "cp874",
}
py_codecs: Dict[bytes, str] = {}
py_codecs.update((k.encode(), v) for k, v in _py_codecs.items())
# Add an alias without underscore, for lenient lookups
py_codecs.update(
(k.replace("_", "").encode(), v) for k, v in _py_codecs.items() if "_" in k
)
pg_codecs = {v: k.encode() for k, v in _py_codecs.items()}
def conn_encoding(conn: "Optional[BaseConnection[Any]]") -> str:
"""
Return the Python encoding name of a psycopg connection.
Default to utf8 if the connection has no encoding info.
"""
if not conn or conn.closed:
return "utf-8"
pgenc = conn.pgconn.parameter_status(b"client_encoding") or b"UTF8"
return pg2pyenc(pgenc)
def pgconn_encoding(pgconn: "PGconn") -> str:
"""
Return the Python encoding name of a libpq connection.
Default to utf8 if the connection has no encoding info.
"""
if pgconn.status != OK:
return "utf-8"
pgenc = pgconn.parameter_status(b"client_encoding") or b"UTF8"
return pg2pyenc(pgenc)
def conninfo_encoding(conninfo: str) -> str:
"""
Return the Python encoding name passed in a conninfo string. Default to utf8.
Because the input is likely to come from the user and not normalised by the
server, be somewhat lenient (non-case-sensitive lookup, ignore noise chars).
"""
from .conninfo import conninfo_to_dict
params = conninfo_to_dict(conninfo)
pgenc = params.get("client_encoding")
if pgenc:
try:
return pg2pyenc(pgenc.encode())
except NotSupportedError:
pass
return "utf-8"
@cache
def py2pgenc(name: str) -> bytes:
"""Convert a Python encoding name to PostgreSQL encoding name.
Raise LookupError if the Python encoding is unknown.
"""
return pg_codecs[codecs.lookup(name).name]
@cache
def pg2pyenc(name: bytes) -> str:
"""Convert a PostgreSQL encoding name to Python encoding name.
Raise NotSupportedError if the PostgreSQL encoding is not supported by
Python.
"""
try:
return py_codecs[name.replace(b"-", b"").replace(b"_", b"").upper()]
except KeyError:
sname = name.decode("utf8", "replace")
raise NotSupportedError(f"codec not available in Python: {sname!r}")
def _as_python_identifier(s: str, prefix: str = "f") -> str:
"""
Reduce a string to a valid Python identifier.
Replace all non-valid chars with '_' and prefix the value with `!prefix` if
the first letter is an '_'.
"""
if not s.isidentifier():
if s[0] in "1234567890":
s = prefix + s
if not s.isidentifier():
s = _re_clean.sub("_", s)
# namedtuple fields cannot start with underscore. So...
if s[0] == "_":
s = prefix + s
return s
_re_clean = re.compile(
f"[^{string.ascii_lowercase}{string.ascii_uppercase}{string.digits}_]"
)