731 lines
24 KiB
Python
731 lines
24 KiB
Python
"""
|
|
Adapters for date/time types.
|
|
"""
|
|
|
|
# Copyright (C) 2020 The Psycopg Team
|
|
|
|
import re
|
|
import struct
|
|
from datetime import date, datetime, time, timedelta, timezone
|
|
from typing import Any, Callable, cast, Optional, Tuple, TYPE_CHECKING
|
|
|
|
from .. import postgres
|
|
from ..pq import Format
|
|
from .._tz import get_tzinfo
|
|
from ..abc import AdaptContext, DumperKey
|
|
from ..adapt import Buffer, Dumper, Loader, PyFormat
|
|
from ..errors import InterfaceError, DataError
|
|
from .._struct import pack_int4, pack_int8, unpack_int4, unpack_int8
|
|
|
|
if TYPE_CHECKING:
|
|
from ..connection import BaseConnection
|
|
|
|
_struct_timetz = struct.Struct("!qi") # microseconds, sec tz offset
|
|
_pack_timetz = cast(Callable[[int, int], bytes], _struct_timetz.pack)
|
|
_unpack_timetz = cast(Callable[[Buffer], Tuple[int, int]], _struct_timetz.unpack)
|
|
|
|
_struct_interval = struct.Struct("!qii") # microseconds, days, months
|
|
_pack_interval = cast(Callable[[int, int, int], bytes], _struct_interval.pack)
|
|
_unpack_interval = cast(
|
|
Callable[[Buffer], Tuple[int, int, int]], _struct_interval.unpack
|
|
)
|
|
|
|
utc = timezone.utc
|
|
_pg_date_epoch_days = date(2000, 1, 1).toordinal()
|
|
_pg_datetime_epoch = datetime(2000, 1, 1)
|
|
_pg_datetimetz_epoch = datetime(2000, 1, 1, tzinfo=utc)
|
|
_py_date_min_days = date.min.toordinal()
|
|
|
|
|
|
class DateDumper(Dumper):
|
|
oid = postgres.types["date"].oid
|
|
|
|
def dump(self, obj: date) -> bytes:
|
|
# NOTE: whatever the PostgreSQL DateStyle input format (DMY, MDY, YMD)
|
|
# the YYYY-MM-DD is always understood correctly.
|
|
return str(obj).encode()
|
|
|
|
|
|
class DateBinaryDumper(Dumper):
|
|
format = Format.BINARY
|
|
oid = postgres.types["date"].oid
|
|
|
|
def dump(self, obj: date) -> bytes:
|
|
days = obj.toordinal() - _pg_date_epoch_days
|
|
return pack_int4(days)
|
|
|
|
|
|
class _BaseTimeDumper(Dumper):
|
|
def get_key(self, obj: time, format: PyFormat) -> DumperKey:
|
|
# Use (cls,) to report the need to upgrade to a dumper for timetz (the
|
|
# Frankenstein of the data types).
|
|
if not obj.tzinfo:
|
|
return self.cls
|
|
else:
|
|
return (self.cls,)
|
|
|
|
def upgrade(self, obj: time, format: PyFormat) -> Dumper:
|
|
raise NotImplementedError
|
|
|
|
|
|
class _BaseTimeTextDumper(_BaseTimeDumper):
|
|
def dump(self, obj: time) -> bytes:
|
|
return str(obj).encode()
|
|
|
|
|
|
class TimeDumper(_BaseTimeTextDumper):
|
|
oid = postgres.types["time"].oid
|
|
|
|
def upgrade(self, obj: time, format: PyFormat) -> Dumper:
|
|
if not obj.tzinfo:
|
|
return self
|
|
else:
|
|
return TimeTzDumper(self.cls)
|
|
|
|
|
|
class TimeTzDumper(_BaseTimeTextDumper):
|
|
oid = postgres.types["timetz"].oid
|
|
|
|
|
|
class TimeBinaryDumper(_BaseTimeDumper):
|
|
format = Format.BINARY
|
|
oid = postgres.types["time"].oid
|
|
|
|
def dump(self, obj: time) -> bytes:
|
|
us = obj.microsecond + 1_000_000 * (
|
|
obj.second + 60 * (obj.minute + 60 * obj.hour)
|
|
)
|
|
return pack_int8(us)
|
|
|
|
def upgrade(self, obj: time, format: PyFormat) -> Dumper:
|
|
if not obj.tzinfo:
|
|
return self
|
|
else:
|
|
return TimeTzBinaryDumper(self.cls)
|
|
|
|
|
|
class TimeTzBinaryDumper(_BaseTimeDumper):
|
|
format = Format.BINARY
|
|
oid = postgres.types["timetz"].oid
|
|
|
|
def dump(self, obj: time) -> bytes:
|
|
us = obj.microsecond + 1_000_000 * (
|
|
obj.second + 60 * (obj.minute + 60 * obj.hour)
|
|
)
|
|
off = obj.utcoffset()
|
|
assert off is not None
|
|
return _pack_timetz(us, -int(off.total_seconds()))
|
|
|
|
|
|
class _BaseDatetimeDumper(Dumper):
|
|
def get_key(self, obj: datetime, format: PyFormat) -> DumperKey:
|
|
# Use (cls,) to report the need to upgrade (downgrade, actually) to a
|
|
# dumper for naive timestamp.
|
|
if obj.tzinfo:
|
|
return self.cls
|
|
else:
|
|
return (self.cls,)
|
|
|
|
def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
|
|
raise NotImplementedError
|
|
|
|
|
|
class _BaseDatetimeTextDumper(_BaseDatetimeDumper):
|
|
def dump(self, obj: datetime) -> bytes:
|
|
# NOTE: whatever the PostgreSQL DateStyle input format (DMY, MDY, YMD)
|
|
# the YYYY-MM-DD is always understood correctly.
|
|
return str(obj).encode()
|
|
|
|
|
|
class DatetimeDumper(_BaseDatetimeTextDumper):
|
|
oid = postgres.types["timestamptz"].oid
|
|
|
|
def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
|
|
if obj.tzinfo:
|
|
return self
|
|
else:
|
|
return DatetimeNoTzDumper(self.cls)
|
|
|
|
|
|
class DatetimeNoTzDumper(_BaseDatetimeTextDumper):
|
|
oid = postgres.types["timestamp"].oid
|
|
|
|
|
|
class DatetimeBinaryDumper(_BaseDatetimeDumper):
|
|
format = Format.BINARY
|
|
oid = postgres.types["timestamptz"].oid
|
|
|
|
def dump(self, obj: datetime) -> bytes:
|
|
delta = obj - _pg_datetimetz_epoch
|
|
micros = delta.microseconds + 1_000_000 * (86_400 * delta.days + delta.seconds)
|
|
return pack_int8(micros)
|
|
|
|
def upgrade(self, obj: datetime, format: PyFormat) -> Dumper:
|
|
if obj.tzinfo:
|
|
return self
|
|
else:
|
|
return DatetimeNoTzBinaryDumper(self.cls)
|
|
|
|
|
|
class DatetimeNoTzBinaryDumper(_BaseDatetimeDumper):
|
|
format = Format.BINARY
|
|
oid = postgres.types["timestamp"].oid
|
|
|
|
def dump(self, obj: datetime) -> bytes:
|
|
delta = obj - _pg_datetime_epoch
|
|
micros = delta.microseconds + 1_000_000 * (86_400 * delta.days + delta.seconds)
|
|
return pack_int8(micros)
|
|
|
|
|
|
class TimedeltaDumper(Dumper):
|
|
oid = postgres.types["interval"].oid
|
|
|
|
def __init__(self, cls: type, context: Optional[AdaptContext] = None):
|
|
super().__init__(cls, context)
|
|
if self.connection:
|
|
if (
|
|
self.connection.pgconn.parameter_status(b"IntervalStyle")
|
|
== b"sql_standard"
|
|
):
|
|
setattr(self, "dump", self._dump_sql)
|
|
|
|
def dump(self, obj: timedelta) -> bytes:
|
|
# The comma is parsed ok by PostgreSQL but it's not documented
|
|
# and it seems brittle to rely on it. CRDB doesn't consume it well.
|
|
return str(obj).encode().replace(b",", b"")
|
|
|
|
def _dump_sql(self, obj: timedelta) -> bytes:
|
|
# sql_standard format needs explicit signs
|
|
# otherwise -1 day 1 sec will mean -1 sec
|
|
return b"%+d day %+d second %+d microsecond" % (
|
|
obj.days,
|
|
obj.seconds,
|
|
obj.microseconds,
|
|
)
|
|
|
|
|
|
class TimedeltaBinaryDumper(Dumper):
|
|
format = Format.BINARY
|
|
oid = postgres.types["interval"].oid
|
|
|
|
def dump(self, obj: timedelta) -> bytes:
|
|
micros = 1_000_000 * obj.seconds + obj.microseconds
|
|
return _pack_interval(micros, obj.days, 0)
|
|
|
|
|
|
class DateLoader(Loader):
|
|
_ORDER_YMD = 0
|
|
_ORDER_DMY = 1
|
|
_ORDER_MDY = 2
|
|
|
|
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
|
|
super().__init__(oid, context)
|
|
ds = _get_datestyle(self.connection)
|
|
if ds.startswith(b"I"): # ISO
|
|
self._order = self._ORDER_YMD
|
|
elif ds.startswith(b"G"): # German
|
|
self._order = self._ORDER_DMY
|
|
elif ds.startswith(b"S") or ds.startswith(b"P"): # SQL or Postgres
|
|
self._order = self._ORDER_DMY if ds.endswith(b"DMY") else self._ORDER_MDY
|
|
else:
|
|
raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
|
|
|
|
def load(self, data: Buffer) -> date:
|
|
if self._order == self._ORDER_YMD:
|
|
ye = data[:4]
|
|
mo = data[5:7]
|
|
da = data[8:]
|
|
elif self._order == self._ORDER_DMY:
|
|
da = data[:2]
|
|
mo = data[3:5]
|
|
ye = data[6:]
|
|
else:
|
|
mo = data[:2]
|
|
da = data[3:5]
|
|
ye = data[6:]
|
|
|
|
try:
|
|
return date(int(ye), int(mo), int(da))
|
|
except ValueError as ex:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
if s == "infinity" or (s and len(s.split()[0]) > 10):
|
|
raise DataError(f"date too large (after year 10K): {s!r}") from None
|
|
elif s == "-infinity" or "BC" in s:
|
|
raise DataError(f"date too small (before year 1): {s!r}") from None
|
|
else:
|
|
raise DataError(f"can't parse date {s!r}: {ex}") from None
|
|
|
|
|
|
class DateBinaryLoader(Loader):
|
|
format = Format.BINARY
|
|
|
|
def load(self, data: Buffer) -> date:
|
|
days = unpack_int4(data)[0] + _pg_date_epoch_days
|
|
try:
|
|
return date.fromordinal(days)
|
|
except (ValueError, OverflowError):
|
|
if days < _py_date_min_days:
|
|
raise DataError("date too small (before year 1)") from None
|
|
else:
|
|
raise DataError("date too large (after year 10K)") from None
|
|
|
|
|
|
class TimeLoader(Loader):
|
|
_re_format = re.compile(rb"^(\d+):(\d+):(\d+)(?:\.(\d+))?")
|
|
|
|
def load(self, data: Buffer) -> time:
|
|
m = self._re_format.match(data)
|
|
if not m:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
raise DataError(f"can't parse time {s!r}")
|
|
|
|
ho, mi, se, fr = m.groups()
|
|
|
|
# Pad the fraction of second to get micros
|
|
if fr:
|
|
us = int(fr)
|
|
if len(fr) < 6:
|
|
us *= _uspad[len(fr)]
|
|
else:
|
|
us = 0
|
|
|
|
try:
|
|
return time(int(ho), int(mi), int(se), us)
|
|
except ValueError as e:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
raise DataError(f"can't parse time {s!r}: {e}") from None
|
|
|
|
|
|
class TimeBinaryLoader(Loader):
|
|
format = Format.BINARY
|
|
|
|
def load(self, data: Buffer) -> time:
|
|
val = unpack_int8(data)[0]
|
|
val, us = divmod(val, 1_000_000)
|
|
val, s = divmod(val, 60)
|
|
h, m = divmod(val, 60)
|
|
try:
|
|
return time(h, m, s, us)
|
|
except ValueError:
|
|
raise DataError(f"time not supported by Python: hour={h}") from None
|
|
|
|
|
|
class TimetzLoader(Loader):
|
|
_re_format = re.compile(
|
|
rb"""(?ix)
|
|
^
|
|
(\d+) : (\d+) : (\d+) (?: \. (\d+) )? # Time and micros
|
|
([-+]) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
|
|
$
|
|
"""
|
|
)
|
|
|
|
def load(self, data: Buffer) -> time:
|
|
m = self._re_format.match(data)
|
|
if not m:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
raise DataError(f"can't parse timetz {s!r}")
|
|
|
|
ho, mi, se, fr, sgn, oh, om, os = m.groups()
|
|
|
|
# Pad the fraction of second to get the micros
|
|
if fr:
|
|
us = int(fr)
|
|
if len(fr) < 6:
|
|
us *= _uspad[len(fr)]
|
|
else:
|
|
us = 0
|
|
|
|
# Calculate timezone
|
|
off = 60 * 60 * int(oh)
|
|
if om:
|
|
off += 60 * int(om)
|
|
if os:
|
|
off += int(os)
|
|
tz = timezone(timedelta(0, off if sgn == b"+" else -off))
|
|
|
|
try:
|
|
return time(int(ho), int(mi), int(se), us, tz)
|
|
except ValueError as e:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
raise DataError(f"can't parse timetz {s!r}: {e}") from None
|
|
|
|
|
|
class TimetzBinaryLoader(Loader):
|
|
format = Format.BINARY
|
|
|
|
def load(self, data: Buffer) -> time:
|
|
val, off = _unpack_timetz(data)
|
|
|
|
val, us = divmod(val, 1_000_000)
|
|
val, s = divmod(val, 60)
|
|
h, m = divmod(val, 60)
|
|
|
|
try:
|
|
return time(h, m, s, us, timezone(timedelta(seconds=-off)))
|
|
except ValueError:
|
|
raise DataError(f"time not supported by Python: hour={h}") from None
|
|
|
|
|
|
class TimestampLoader(Loader):
|
|
_re_format = re.compile(
|
|
rb"""(?ix)
|
|
^
|
|
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Date
|
|
(?: T | [^a-z0-9] ) # Separator, including T
|
|
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
|
|
(?: \.(\d+) )? # Micros
|
|
$
|
|
"""
|
|
)
|
|
_re_format_pg = re.compile(
|
|
rb"""(?ix)
|
|
^
|
|
[a-z]+ [^a-z0-9] # DoW, separator
|
|
(\d+|[a-z]+) [^a-z0-9] # Month or day
|
|
(\d+|[a-z]+) [^a-z0-9] # Month or day
|
|
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
|
|
(?: \.(\d+) )? # Micros
|
|
[^a-z0-9] (\d+) # Year
|
|
$
|
|
"""
|
|
)
|
|
|
|
_ORDER_YMD = 0
|
|
_ORDER_DMY = 1
|
|
_ORDER_MDY = 2
|
|
_ORDER_PGDM = 3
|
|
_ORDER_PGMD = 4
|
|
|
|
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
|
|
super().__init__(oid, context)
|
|
|
|
ds = _get_datestyle(self.connection)
|
|
if ds.startswith(b"I"): # ISO
|
|
self._order = self._ORDER_YMD
|
|
elif ds.startswith(b"G"): # German
|
|
self._order = self._ORDER_DMY
|
|
elif ds.startswith(b"S"): # SQL
|
|
self._order = self._ORDER_DMY if ds.endswith(b"DMY") else self._ORDER_MDY
|
|
elif ds.startswith(b"P"): # Postgres
|
|
self._order = self._ORDER_PGDM if ds.endswith(b"DMY") else self._ORDER_PGMD
|
|
self._re_format = self._re_format_pg
|
|
else:
|
|
raise InterfaceError(f"unexpected DateStyle: {ds.decode('ascii')}")
|
|
|
|
def load(self, data: Buffer) -> datetime:
|
|
m = self._re_format.match(data)
|
|
if not m:
|
|
raise _get_timestamp_load_error(self.connection, data) from None
|
|
|
|
if self._order == self._ORDER_YMD:
|
|
ye, mo, da, ho, mi, se, fr = m.groups()
|
|
imo = int(mo)
|
|
elif self._order == self._ORDER_DMY:
|
|
da, mo, ye, ho, mi, se, fr = m.groups()
|
|
imo = int(mo)
|
|
elif self._order == self._ORDER_MDY:
|
|
mo, da, ye, ho, mi, se, fr = m.groups()
|
|
imo = int(mo)
|
|
else:
|
|
if self._order == self._ORDER_PGDM:
|
|
da, mo, ho, mi, se, fr, ye = m.groups()
|
|
else:
|
|
mo, da, ho, mi, se, fr, ye = m.groups()
|
|
try:
|
|
imo = _month_abbr[mo]
|
|
except KeyError:
|
|
s = mo.decode("utf8", "replace")
|
|
raise DataError(f"can't parse month: {s!r}") from None
|
|
|
|
# Pad the fraction of second to get the micros
|
|
if fr:
|
|
us = int(fr)
|
|
if len(fr) < 6:
|
|
us *= _uspad[len(fr)]
|
|
else:
|
|
us = 0
|
|
|
|
try:
|
|
return datetime(int(ye), imo, int(da), int(ho), int(mi), int(se), us)
|
|
except ValueError as ex:
|
|
raise _get_timestamp_load_error(self.connection, data, ex) from None
|
|
|
|
|
|
class TimestampBinaryLoader(Loader):
|
|
format = Format.BINARY
|
|
|
|
def load(self, data: Buffer) -> datetime:
|
|
micros = unpack_int8(data)[0]
|
|
try:
|
|
return _pg_datetime_epoch + timedelta(microseconds=micros)
|
|
except OverflowError:
|
|
if micros <= 0:
|
|
raise DataError("timestamp too small (before year 1)") from None
|
|
else:
|
|
raise DataError("timestamp too large (after year 10K)") from None
|
|
|
|
|
|
class TimestamptzLoader(Loader):
|
|
_re_format = re.compile(
|
|
rb"""(?ix)
|
|
^
|
|
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Date
|
|
(?: T | [^a-z0-9] ) # Separator, including T
|
|
(\d+) [^a-z0-9] (\d+) [^a-z0-9] (\d+) # Time
|
|
(?: \.(\d+) )? # Micros
|
|
([-+]) (\d+) (?: : (\d+) )? (?: : (\d+) )? # Timezone
|
|
$
|
|
"""
|
|
)
|
|
|
|
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
|
|
super().__init__(oid, context)
|
|
self._timezone = get_tzinfo(self.connection.pgconn if self.connection else None)
|
|
|
|
ds = _get_datestyle(self.connection)
|
|
if not ds.startswith(b"I"): # not ISO
|
|
setattr(self, "load", self._load_notimpl)
|
|
|
|
def load(self, data: Buffer) -> datetime:
|
|
m = self._re_format.match(data)
|
|
if not m:
|
|
raise _get_timestamp_load_error(self.connection, data) from None
|
|
|
|
ye, mo, da, ho, mi, se, fr, sgn, oh, om, os = m.groups()
|
|
|
|
# Pad the fraction of second to get the micros
|
|
if fr:
|
|
us = int(fr)
|
|
if len(fr) < 6:
|
|
us *= _uspad[len(fr)]
|
|
else:
|
|
us = 0
|
|
|
|
# Calculate timezone offset
|
|
soff = 60 * 60 * int(oh)
|
|
if om:
|
|
soff += 60 * int(om)
|
|
if os:
|
|
soff += int(os)
|
|
tzoff = timedelta(0, soff if sgn == b"+" else -soff)
|
|
|
|
# The return value is a datetime with the timezone of the connection
|
|
# (in order to be consistent with the binary loader, which is the only
|
|
# thing it can return). So create a temporary datetime object, in utc,
|
|
# shift it by the offset parsed from the timestamp, and then move it to
|
|
# the connection timezone.
|
|
dt = None
|
|
ex: Exception
|
|
try:
|
|
dt = datetime(int(ye), int(mo), int(da), int(ho), int(mi), int(se), us, utc)
|
|
return (dt - tzoff).astimezone(self._timezone)
|
|
except OverflowError as e:
|
|
# If we have created the temporary 'dt' it means that we have a
|
|
# datetime close to max, the shift pushed it past max, overflowing.
|
|
# In this case return the datetime in a fixed offset timezone.
|
|
if dt is not None:
|
|
return dt.replace(tzinfo=timezone(tzoff))
|
|
else:
|
|
ex = e
|
|
except ValueError as e:
|
|
ex = e
|
|
|
|
raise _get_timestamp_load_error(self.connection, data, ex) from None
|
|
|
|
def _load_notimpl(self, data: Buffer) -> datetime:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
ds = _get_datestyle(self.connection).decode("ascii")
|
|
raise NotImplementedError(
|
|
f"can't parse timestamptz with DateStyle {ds!r}: {s!r}"
|
|
)
|
|
|
|
|
|
class TimestamptzBinaryLoader(Loader):
|
|
format = Format.BINARY
|
|
|
|
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
|
|
super().__init__(oid, context)
|
|
self._timezone = get_tzinfo(self.connection.pgconn if self.connection else None)
|
|
|
|
def load(self, data: Buffer) -> datetime:
|
|
micros = unpack_int8(data)[0]
|
|
try:
|
|
ts = _pg_datetimetz_epoch + timedelta(microseconds=micros)
|
|
return ts.astimezone(self._timezone)
|
|
except OverflowError:
|
|
# If we were asked about a timestamp which would overflow in UTC,
|
|
# but not in the desired timezone (e.g. datetime.max at Chicago
|
|
# timezone) we can still save the day by shifting the value by the
|
|
# timezone offset and then replacing the timezone.
|
|
if self._timezone:
|
|
utcoff = self._timezone.utcoffset(
|
|
datetime.min if micros < 0 else datetime.max
|
|
)
|
|
if utcoff:
|
|
usoff = 1_000_000 * int(utcoff.total_seconds())
|
|
try:
|
|
ts = _pg_datetime_epoch + timedelta(microseconds=micros + usoff)
|
|
except OverflowError:
|
|
pass # will raise downstream
|
|
else:
|
|
return ts.replace(tzinfo=self._timezone)
|
|
|
|
if micros <= 0:
|
|
raise DataError("timestamp too small (before year 1)") from None
|
|
else:
|
|
raise DataError("timestamp too large (after year 10K)") from None
|
|
|
|
|
|
class IntervalLoader(Loader):
|
|
_re_interval = re.compile(
|
|
rb"""
|
|
(?: ([-+]?\d+) \s+ years? \s* )? # Years
|
|
(?: ([-+]?\d+) \s+ mons? \s* )? # Months
|
|
(?: ([-+]?\d+) \s+ days? \s* )? # Days
|
|
(?: ([-+])? (\d+) : (\d+) : (\d+ (?:\.\d+)?) # Time
|
|
)?
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
|
|
def __init__(self, oid: int, context: Optional[AdaptContext] = None):
|
|
super().__init__(oid, context)
|
|
if self.connection:
|
|
ints = self.connection.pgconn.parameter_status(b"IntervalStyle")
|
|
if ints != b"postgres":
|
|
setattr(self, "load", self._load_notimpl)
|
|
|
|
def load(self, data: Buffer) -> timedelta:
|
|
m = self._re_interval.match(data)
|
|
if not m:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
raise DataError(f"can't parse interval {s!r}")
|
|
|
|
ye, mo, da, sgn, ho, mi, se = m.groups()
|
|
days = 0
|
|
seconds = 0.0
|
|
|
|
if ye:
|
|
days += 365 * int(ye)
|
|
if mo:
|
|
days += 30 * int(mo)
|
|
if da:
|
|
days += int(da)
|
|
|
|
if ho:
|
|
seconds = 3600 * int(ho) + 60 * int(mi) + float(se)
|
|
if sgn == b"-":
|
|
seconds = -seconds
|
|
|
|
try:
|
|
return timedelta(days=days, seconds=seconds)
|
|
except OverflowError as e:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
raise DataError(f"can't parse interval {s!r}: {e}") from None
|
|
|
|
def _load_notimpl(self, data: Buffer) -> timedelta:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
ints = (
|
|
self.connection
|
|
and self.connection.pgconn.parameter_status(b"IntervalStyle")
|
|
or b"unknown"
|
|
).decode("utf8", "replace")
|
|
raise NotImplementedError(
|
|
f"can't parse interval with IntervalStyle {ints}: {s!r}"
|
|
)
|
|
|
|
|
|
class IntervalBinaryLoader(Loader):
|
|
format = Format.BINARY
|
|
|
|
def load(self, data: Buffer) -> timedelta:
|
|
micros, days, months = _unpack_interval(data)
|
|
if months > 0:
|
|
years, months = divmod(months, 12)
|
|
days = days + 30 * months + 365 * years
|
|
elif months < 0:
|
|
years, months = divmod(-months, 12)
|
|
days = days - 30 * months - 365 * years
|
|
|
|
try:
|
|
return timedelta(days=days, microseconds=micros)
|
|
except OverflowError as e:
|
|
raise DataError(f"can't parse interval: {e}") from None
|
|
|
|
|
|
def _get_datestyle(conn: Optional["BaseConnection[Any]"]) -> bytes:
|
|
if conn:
|
|
ds = conn.pgconn.parameter_status(b"DateStyle")
|
|
if ds:
|
|
return ds
|
|
|
|
return b"ISO, DMY"
|
|
|
|
|
|
def _get_timestamp_load_error(
|
|
conn: Optional["BaseConnection[Any]"], data: Buffer, ex: Optional[Exception] = None
|
|
) -> Exception:
|
|
s = bytes(data).decode("utf8", "replace")
|
|
|
|
def is_overflow(s: str) -> bool:
|
|
if not s:
|
|
return False
|
|
|
|
ds = _get_datestyle(conn)
|
|
if not ds.startswith(b"P"): # Postgres
|
|
return len(s.split()[0]) > 10 # date is first token
|
|
else:
|
|
return len(s.split()[-1]) > 4 # year is last token
|
|
|
|
if s == "-infinity" or s.endswith("BC"):
|
|
return DataError("timestamp too small (before year 1): {s!r}")
|
|
elif s == "infinity" or is_overflow(s):
|
|
return DataError(f"timestamp too large (after year 10K): {s!r}")
|
|
else:
|
|
return DataError(f"can't parse timestamp {s!r}: {ex or '(unknown)'}")
|
|
|
|
|
|
_month_abbr = {
|
|
n: i
|
|
for i, n in enumerate(b"Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec".split(), 1)
|
|
}
|
|
|
|
# Pad to get microseconds from a fraction of seconds
|
|
_uspad = [0, 100_000, 10_000, 1_000, 100, 10, 1]
|
|
|
|
|
|
def register_default_adapters(context: AdaptContext) -> None:
|
|
adapters = context.adapters
|
|
adapters.register_dumper("datetime.date", DateDumper)
|
|
adapters.register_dumper("datetime.date", DateBinaryDumper)
|
|
|
|
# first register dumpers for 'timetz' oid, then the proper ones on time type.
|
|
adapters.register_dumper("datetime.time", TimeTzDumper)
|
|
adapters.register_dumper("datetime.time", TimeTzBinaryDumper)
|
|
adapters.register_dumper("datetime.time", TimeDumper)
|
|
adapters.register_dumper("datetime.time", TimeBinaryDumper)
|
|
|
|
# first register dumpers for 'timestamp' oid, then the proper ones
|
|
# on the datetime type.
|
|
adapters.register_dumper("datetime.datetime", DatetimeNoTzDumper)
|
|
adapters.register_dumper("datetime.datetime", DatetimeNoTzBinaryDumper)
|
|
adapters.register_dumper("datetime.datetime", DatetimeDumper)
|
|
adapters.register_dumper("datetime.datetime", DatetimeBinaryDumper)
|
|
|
|
adapters.register_dumper("datetime.timedelta", TimedeltaDumper)
|
|
adapters.register_dumper("datetime.timedelta", TimedeltaBinaryDumper)
|
|
|
|
adapters.register_loader("date", DateLoader)
|
|
adapters.register_loader("date", DateBinaryLoader)
|
|
adapters.register_loader("time", TimeLoader)
|
|
adapters.register_loader("time", TimeBinaryLoader)
|
|
adapters.register_loader("timetz", TimetzLoader)
|
|
adapters.register_loader("timetz", TimetzBinaryLoader)
|
|
adapters.register_loader("timestamp", TimestampLoader)
|
|
adapters.register_loader("timestamp", TimestampBinaryLoader)
|
|
adapters.register_loader("timestamptz", TimestamptzLoader)
|
|
adapters.register_loader("timestamptz", TimestamptzBinaryLoader)
|
|
adapters.register_loader("interval", IntervalLoader)
|
|
adapters.register_loader("interval", IntervalBinaryLoader)
|