Skip to content

serializer

beautyspot.serializer モジュールは、計算結果やオブジェクトツリーを永続化可能なバイナリ形式に変換するための、安全で拡張可能なシリアライゼーション機構を提供します。

beautyspot.serializer

MsgpackSerializer

Bases: SerializerProtocol, TypeRegistryProtocol

A secure and extensible serializer based on MessagePack.

Allows registering custom types via register(). Automatically handles packing/unpacking of custom type payloads.

Thread Safety (No-GIL Compatible): This class is entirely thread-safe and avoids lock contention on the critical path (during serialization/deserialization). It achieves this by using: 1. Copy-on-Write (CoW) for the shared type registry (_encoders, _decoders). Registrations are rare, but reads happen per-node. CoW ensures readers always see a consistent, immutable snapshot of the registry without locking. 2. Thread-Local Storage for the LRU subclass cache (_local.subclass_cache). This eliminates lock contention completely when traversing deep object trees concurrently across multiple threads.

Note

To prevent memory leaks in environments where types are generated dynamically (e.g., namedtuples, dynamic Pydantic models), subclass resolution results are cached using an LRU strategy with a configurable maximum size per thread.

Source code in src/beautyspot/serializer.py
class MsgpackSerializer(SerializerProtocol, TypeRegistryProtocol):
    """
    A secure and extensible serializer based on MessagePack.

    Allows registering custom types via `register()`.
    Automatically handles packing/unpacking of custom type payloads.

    Thread Safety (No-GIL Compatible):
        This class is entirely thread-safe and avoids lock contention on the critical
        path (during serialization/deserialization). It achieves this by using:
        1.  **Copy-on-Write (CoW)** for the shared type registry (`_encoders`, `_decoders`).
            Registrations are rare, but reads happen per-node. CoW ensures readers always
            see a consistent, immutable snapshot of the registry without locking.
        2.  **Thread-Local Storage** for the LRU subclass cache (`_local.subclass_cache`).
            This eliminates lock contention completely when traversing deep object trees
            concurrently across multiple threads.

    Note:
        To prevent memory leaks in environments where types are generated dynamically
        (e.g., namedtuples, dynamic Pydantic models), subclass resolution results
        are cached using an LRU strategy with a configurable maximum size per thread.
    """

    def __init__(self, max_cache_size: int = 1024):
        # 共有レジストリ(Copy-on-Write)
        self._encoders: Dict[Type, Tuple[int, Callable[[Any], Any]]] = {}
        self._decoders: Dict[int, Callable[[Any], Any]] = {}

        self._max_cache_size = max_cache_size

        # スレッドローカルなLRUキャッシュ
        self._local = threading.local()

        # 書き込み(register)を直列化するためのロック
        self._write_lock = threading.Lock()

        # レジストリ世代カウンタ: register() のたびにインクリメントされ、
        # スレッドローカルキャッシュの無効化に使用する。
        self._registry_generation = 0

    def _get_local_cache(
        self,
    ) -> OrderedDict[Type, Tuple[int, Callable[[Any], Any]] | None]:
        """現在のスレッド固有のLRUキャッシュを取得(必要なら初期化)する。

        register() によりレジストリ世代が進んでいた場合、
        キャッシュをクリアして stale エントリの参照を防ぐ。
        """
        gen = self._registry_generation
        if (
            not hasattr(self._local, "subclass_cache")
            or getattr(self._local, "_cache_generation", -1) != gen
        ):
            self._local.subclass_cache = OrderedDict()
            self._local._cache_generation = gen
        return self._local.subclass_cache

    def _enforce_cache_size(self, cache: OrderedDict):
        """スレッドローカルキャッシュのサイズを制限する"""
        while len(cache) > self._max_cache_size:
            cache.popitem(last=False)

    def register(
        self,
        type_class: Type,
        code: int,
        encoder: Callable[[Any], Any],
        decoder: Callable[[Any], Any],
    ):
        if not (0 <= code <= 127):
            raise ValueError(f"ExtCode must be between 0 and 127, got {code}.")

        with self._write_lock:
            if code in self._decoders:
                raise ValueError(f"ExtCode {code} is already registered.")
            if type_class in self._encoders:
                existing_code = self._encoders[type_class][0]
                raise ValueError(
                    f"Type '{type_class.__name__}' is already registered "
                    f"(code={existing_code}). "
                    "Registering the same type twice would silently overwrite the "
                    "encoder while leaving the old decoder orphaned."
                )

            # Copy-on-Write (CoW)
            # 現在の辞書のコピーを作成し、新しい要素を追加
            new_encoders = self._encoders.copy()
            new_decoders = self._decoders.copy()

            new_encoders[type_class] = (code, encoder)
            new_decoders[code] = decoder

            # PEP 703 (free-threading) 対応: 世代カウンタを参照差し替えの**前**に
            # インクリメントする。リーダーが新しい世代番号を見た時点で古いキャッシュを
            # 破棄するが、参照はまだ旧レジストリを指している可能性がある。
            # この順序であれば、リーダーは旧レジストリで安全に動作し、
            # 次回のアクセスで新レジストリを取得する。
            # 逆順(差し替え→インクリメント)だと、リーダーが新世代番号を見て
            # キャッシュを破棄した後、旧レジストリを参照して新しい型を見つけられない
            # 問題が生じる。
            self._registry_generation += 1
            self._encoders = new_encoders
            self._decoders = new_decoders

    # src/beautyspot/serializer.py

    def _default_packer(self, obj: Any) -> Any:
        obj_type = type(obj)
        target_code = None
        target_encoder = None

        # Lock-free read: スナップショットへの参照を取得
        current_encoders = self._encoders
        local_cache = self._get_local_cache()

        if obj_type in current_encoders:
            target_code, target_encoder = current_encoders[obj_type]
        elif obj_type in local_cache:
            cached = local_cache[obj_type]
            local_cache.move_to_end(obj_type)
            if cached is not None:
                target_code, target_encoder = cached
        else:
            # MROをスキャンして登録済みの基底クラスを探す
            for base in obj_type.__mro__:
                if base in current_encoders:
                    target_code, target_encoder = current_encoders[base]
                    local_cache[obj_type] = (target_code, target_encoder)
                    self._enforce_cache_size(local_cache)
                    break
            else:
                local_cache[obj_type] = None
                self._enforce_cache_size(local_cache)

        # Execute & Wrap
        if target_encoder:
            try:
                intermediate = target_encoder(obj)
            except Exception as e:
                raise SerializationError(
                    f"Error occurred within the custom encoder for type '{obj_type.__name__}'."
                ) from e

            try:
                payload = msgpack.packb(
                    intermediate, default=self._default_packer, use_bin_type=True
                )
                return msgpack.ExtType(target_code, payload)
            except (TypeError, SerializationError) as e:
                raise SerializationError(
                    f"Encoder for '{obj_type.__name__}' returned a value that msgpack cannot serialize.\n"
                    f"Hint: Ensure your encoder returns a primitive type (dict, list, str, int, bytes, etc.).\n"
                    f"      returned type: {type(intermediate).__name__}"
                ) from e

        try:
            obj_repr = str(obj)[:200]
        except Exception:
            obj_repr = f"<{obj_type.__name__} (str() failed)>"
        raise SerializationError(
            f"Object of type '{obj_type.__name__}' is not serializable.\n"
            f"Value: {obj_repr}...\n"
            "Hint: Use `spot.register(...)` to handle this custom type."
        )

    def _ext_hook(self, code: int, data: bytes) -> Any:
        # Lock-free read: スナップショットへの参照を取得
        decoder = self._decoders.get(code)

        if decoder is not None:
            try:
                intermediate = msgpack.unpackb(data, ext_hook=self._ext_hook, raw=False)
                return decoder(intermediate)
            except SerializationError:
                # Bug Fix (Bug7): 再帰的な _ext_hook から来た SerializationError を
                # 再度ラップすると、元のエラーメッセージが「CRITICAL:...」で上書きされ
                # 根本原因が隠れてしまう。そのままチェーンを保持して再送出する。
                raise
            except Exception as e:
                raise SerializationError(
                    f"CRITICAL: Failed to decode custom type (ExtCode={code}).\n"
                    "The cached data might be corrupted or incompatible with the current decoder."
                ) from e
        raise SerializationError(
            f"Received ExtType with unregistered code={code}. "
            "The cache may have been created with a different serializer configuration."
        )

    def dumps(self, obj: Any) -> bytes:
        try:
            result = msgpack.packb(obj, default=self._default_packer, use_bin_type=True)
            if result is None:
                raise SerializationError("msgpack.packb returned None unexpectedly.")
            return result
        except Exception as e:
            if isinstance(e, SerializationError):
                raise e
            raise SerializationError("Failed to serialize object tree.") from e

    def loads(self, data: bytes) -> Any:
        try:
            return msgpack.unpackb(data, ext_hook=self._ext_hook, raw=False)
        except Exception as e:
            if isinstance(e, SerializationError):
                raise e
            raise SerializationError("Failed to deserialize data.") from e

プロトコル定義

beautyspot では、特定のクラスに依存しない疎結合な設計を維持するため、2つのプロトコルを定義しています。

1. SerializerProtocol

シリアライザーが実装すべき最小限のインターフェースです。

  • dumps(obj: Any) -> bytes: オブジェクトをバイナリに変換します。
  • loads(data: bytes) -> Any: バイナリをオブジェクトに復元します。

2. TypeRegistryProtocol

カスタム型の登録を受け入れるためのインターフェースです。

  • register(type_class, code, encoder, decoder): 特定の型に対して、一意の識別コード(0-127)と変換ロジックを紐付けます。

MsgpackSerializer クラス

MessagePack をバックエンドに使用した、本ライブラリの標準シリアライザーです。

技術的特徴

  • スレッドセーフ設計: 内部的な threading.Lock により、レジストリの更新、LRU キャッシュの操作、サブクラス解決が保護されています。これにより、バックグラウンドでの非同期保存中であっても安全に共有・利用が可能です。
  • 知的なサブクラス解決: 登録されていない型が渡された場合、そのクラスの MRO (Method Resolution Order) をスキャンして、登録済みの基底クラスが存在するかを確認します。
  • LRU キャッシュによる最適化: サブクラス解決の結果は内部でキャッシュされます。動的な型生成(namedtuples や Pydantic モデルなど)によるメモリ肥大化を防ぐため、最大サイズ(デフォルト 1024)を超えると古いエントリから自動的に破棄されます。

カスタム型の登録手順

from beautyspot.serializer import MsgpackSerializer

serializer = MsgpackSerializer()

# numpy.ndarray などを登録する例
serializer.register(
    type_class=MyCustomClass,
    code=10,  # 0-127 の一意な数値
    encoder=lambda obj: obj.to_dict(),
    decoder=lambda data: MyCustomClass.from_dict(data)
)

例外ハンドリング

シリアライズ過程で発生する問題は、すべて beautyspot.exceptions.SerializationError として集約されます。

  • エンコード失敗: カスタムエンコーダ内で例外が発生した場合、原因となった型名と共に報告されます。
  • 非シリアライズ型: 登録されていない型をシリアライズしようとした場合、修正のヒント(spot.register の使用推奨)を含む詳細なメッセージが表示されます。
  • データ破損: デコード時にデータが不整合な場合、キャッシュの破損(Corrupted)として扱われます。