Source code for ulid

from __future__ import annotations

import functools
import os
import time
import uuid
from datetime import datetime
from datetime import timezone
from threading import Lock
from typing import Any
from typing import cast
from typing import Generic
from typing import TYPE_CHECKING
from typing import TypeVar

from typing_extensions import Self

from ulid import base32
from ulid import constants


if TYPE_CHECKING:  # pragma: no cover
    from collections.abc import Callable

    from pydantic import GetCoreSchemaHandler
    from pydantic import ValidatorFunctionWrapHandler
    from pydantic_core import CoreSchema

try:
    from importlib.metadata import version
except ImportError:  # pragma: no cover
    from importlib_metadata import version  # type: ignore


__version__ = version("python-ulid")

T = TypeVar("T", bound=type)
R = TypeVar("R")


class validate_type(Generic[T]):  # noqa: N801
    def __init__(self, *types: T) -> None:
        self.types = types

    def __call__(self, func: Callable[..., R]) -> Callable[..., R]:
        @functools.wraps(func)
        def wrapped(cls: Any, value: T) -> R:
            if not isinstance(value, self.types):
                message = "Value has to be of type "
                message += " or ".join([t.__name__ for t in self.types])
                raise TypeError(message)
            return func(cls, value)

        return wrapped


class ValueProvider:
    def __init__(self) -> None:
        self.lock = Lock()
        self.prev_timestamp = constants.MIN_TIMESTAMP
        self.prev_randomness = constants.MIN_RANDOMNESS

    def timestamp(self, value: float | None = None) -> int:
        if value is None:
            value = time.time_ns() // constants.NANOSECS_IN_MILLISECS
        elif isinstance(value, float):
            value = int(value * constants.MILLISECS_IN_SECS)
        if value > constants.MAX_TIMESTAMP:
            raise ValueError("Value exceeds maximum possible timestamp")
        return value

    def randomness(self) -> bytes:
        with self.lock:
            current_timestamp = self.timestamp()
            if current_timestamp == self.prev_timestamp:
                if self.prev_randomness == constants.MAX_RANDOMNESS:
                    raise ValueError("Randomness within same millisecond exhausted")
                randomness = self.increment_bytes(self.prev_randomness)
            else:
                randomness = os.urandom(constants.RANDOMNESS_LEN)

            self.prev_randomness = randomness
            self.prev_timestamp = current_timestamp
        return randomness

    def increment_bytes(self, value: bytes) -> bytes:
        length = len(value)
        return (int.from_bytes(value, byteorder="big") + 1).to_bytes(length, byteorder="big")


[docs] @functools.total_ordering class ULID: provider = ValueProvider() """The :class:`ULID` object consists of a timestamp part of 48 bits and of 80 random bits. .. code-block:: text 01AN4Z07BY 79KA1307SR9X4MV3 |----------| |----------------| Timestamp Randomness 48bits 80bits You usually create a new :class:`ULID`-object by calling the default constructor with no arguments. In that case it will fill the timestamp part with the current datetime. To encode the object you usually convert it to a string: >>> ulid = ULID() >>> str(ulid) '01E75PVKXA3GFABX1M1J9NZZNF' Args: value (bytes, None): A sequence of 16 bytes representing an encoded ULID. Raises: ValueError: If the provided value is not a valid encoded ULID. """ def __init__(self, value: bytes | None = None) -> None: if value is not None and len(value) != constants.BYTES_LEN: raise ValueError("ULID has to be exactly 16 bytes long.") self.bytes: bytes = value or ULID.from_timestamp(self.provider.timestamp()).bytes
[docs] @classmethod @validate_type(datetime) def from_datetime(cls, value: datetime) -> Self: """Create a new :class:`ULID`-object from a :class:`datetime`. The timestamp part of the `ULID` will be set to the corresponding timestamp of the datetime. Examples: >>> from datetime import datetime >>> ULID.from_datetime(datetime.now()) ULID(01E75QRYCAMM1MKQ9NYMYT6SAV) """ return cls.from_timestamp(value.timestamp())
[docs] @classmethod @validate_type(int, float) def from_timestamp(cls, value: float) -> Self: """Create a new :class:`ULID`-object from a timestamp. The timestamp can be either a `float` representing the time in seconds (as it would be returned by :func:`time.time()`) or an `int` in milliseconds. Examples: >>> import time >>> ULID.from_timestamp(time.time()) ULID(01E75QWN5HKQ0JAVX9FG1K4YP4) """ timestamp = int.to_bytes(cls.provider.timestamp(value), constants.TIMESTAMP_LEN, "big") randomness = cls.provider.randomness() return cls.from_bytes(timestamp + randomness)
[docs] @classmethod @validate_type(uuid.UUID) def from_uuid(cls, value: uuid.UUID) -> Self: """Create a new :class:`ULID`-object from a :class:`uuid.UUID`. The timestamp part will be random in that case. Examples: >>> from uuid import uuid4 >>> ULID.from_uuid(uuid4()) ULID(27Q506DP7E9YNRXA0XVD8Z5YSG) """ return cls(value.bytes)
[docs] @classmethod @validate_type(bytes) def from_bytes(cls, bytes_: bytes) -> Self: """Create a new :class:`ULID`-object from sequence of 16 bytes.""" return cls(bytes_)
[docs] @classmethod @validate_type(str) def from_hex(cls, value: str) -> Self: """Create a new :class:`ULID`-object from 32 character string of hex values.""" return cls.from_bytes(bytes.fromhex(value))
[docs] @classmethod @validate_type(str) def from_str(cls, string: str) -> Self: """Create a new :class:`ULID`-object from a 26 char long string representation.""" return cls(base32.decode(string))
[docs] @classmethod @validate_type(int) def from_int(cls, value: int) -> Self: """Create a new :class:`ULID`-object from an `int`.""" return cls(int.to_bytes(value, constants.BYTES_LEN, "big"))
[docs] @classmethod def parse(cls, value: Any) -> Self: """Create a new :class:`ULID`-object from a given value. .. note:: This method should only be used when the caller is trying to parse a ULID from a value when they're unsure what format/primitive type it will be given in. """ if isinstance(value, ULID): return cast(Self, value) if isinstance(value, uuid.UUID): return cls.from_uuid(value) if isinstance(value, str): len_value = len(value) if len_value == constants.UUID_REPR_LEN: return cls.from_uuid(uuid.UUID(value)) if len_value == constants.HEX_REPR_LEN: return cls.from_hex(value) if len_value == constants.REPR_LEN: return cls.from_str(value) raise ValueError(f"Cannot parse ULID from string of length {len_value}") if isinstance(value, int): if len(str(value)) == constants.INT_REPR_LEN: return cls.from_int(value) return cls.from_timestamp(value) if isinstance(value, float): return cls.from_timestamp(value) if isinstance(value, datetime): return cls.from_datetime(value) if isinstance(value, bytes): return cls.from_bytes(value) raise TypeError(f"Cannot parse ULID from type {type(value)}")
[docs] @functools.cached_property def milliseconds(self) -> int: """The timestamp part as epoch time in milliseconds. Examples: >>> ulid.milliseconds 1588257207560 """ return int.from_bytes(self.bytes[: constants.TIMESTAMP_LEN], byteorder="big")
[docs] @functools.cached_property def timestamp(self) -> float: """The timestamp part as epoch time in seconds. Examples: >>> ulid.timestamp 1588257207.56 """ return self.milliseconds / constants.MILLISECS_IN_SECS
[docs] @functools.cached_property def datetime(self) -> datetime: """Return the timestamp part as timezone-aware :class:`datetime` in UTC. Examples: >>> ulid.datetime datetime.datetime(2020, 4, 30, 14, 33, 27, 560000, tzinfo=datetime.timezone.utc) """ return datetime.fromtimestamp(self.timestamp, timezone.utc)
[docs] @functools.cached_property def hex(self) -> str: """Encode the :class:`ULID`-object as a 32 char sequence of hex values.""" return self.bytes.hex()
[docs] def to_uuid(self) -> uuid.UUID: """Convert the :class:`ULID` to a :class:`uuid.UUID`.""" return uuid.UUID(bytes=self.bytes)
[docs] def to_uuid4(self) -> uuid.UUID: """Convert the :class:`ULID` to a :class:`uuid.UUID` compliant to version 4 of RFC 4122. This conversion is destructive in the sense that the :class:`uuid.UUID` cannot be converted back to the same :class:`ULID`. This is because the bits for the `variant` and `version` information have to be set accordingly changing the original byte sequence. Examples: >>> ulid = ULID() >>> uuid = ulid.to_uuid4() >>> uuid.version 4 """ return uuid.UUID(bytes=self.bytes, version=4)
def __repr__(self) -> str: return f"ULID({self!s})" def __str__(self) -> str: """Encode this object as a 26 character string sequence.""" return base32.encode(self.bytes) def __int__(self) -> int: """Encode this object as an integer.""" return int.from_bytes(self.bytes, byteorder="big") def __bytes__(self) -> bytes: """Encode this object as byte sequence.""" return self.bytes def __lt__(self, other: Any) -> bool: if isinstance(other, ULID): return self.bytes < other.bytes if isinstance(other, int): return int(self) < other if isinstance(other, bytes): return self.bytes < other if isinstance(other, str): return str(self) < other return NotImplemented def __eq__(self, other: object) -> bool: if isinstance(other, ULID): return self.bytes == other.bytes if isinstance(other, int): return int(self) == other if isinstance(other, bytes): return self.bytes == other if isinstance(other, str): return str(self) == other return NotImplemented def __hash__(self) -> int: return hash(self.bytes) @classmethod def __get_pydantic_core_schema__(cls, source: Any, handler: GetCoreSchemaHandler) -> CoreSchema: from pydantic_core import core_schema return core_schema.no_info_wrap_validator_function( cls._pydantic_validate, core_schema.union_schema([ core_schema.is_instance_schema(ULID), core_schema.no_info_plain_validator_function(ULID), core_schema.str_schema( pattern=rf"[0-7][{base32.ENCODE}]{{25}}", min_length=26, max_length=26, ), core_schema.bytes_schema(min_length=16, max_length=16), ]), serialization=core_schema.to_string_ser_schema( when_used="json-unless-none", ), ) @classmethod def _pydantic_validate(cls, value: Any, handler: ValidatorFunctionWrapHandler) -> Any: from pydantic_core import PydanticCustomError ulid: ULID try: if isinstance(value, int): ulid = cls.from_int(value) elif isinstance(value, str): ulid = cls.from_str(value) elif isinstance(value, ULID): ulid = value else: ulid = cls.from_bytes(value) except ValueError as err: raise PydanticCustomError("ulid_format", "Unrecognized format") from err return handler(ulid)