Skip to content

🛠️ Custom Database Backend Guide

beautyspot はデフォルトで SQLite を使用しますが、大規模な分散処理や、クラウドネイティブな環境(Kubernetesなど)で動作させる場合、PostgreSQL や MySQL、あるいは DynamoDB といった外部データベースを利用したくなるでしょう。

v1.0.0 から導入された Dependency Injection (DI) 機構を利用することで、ライブラリのコードを変更することなく、バックエンドを自由に差し替えることができます。

1. The Interface: TaskDBBase

カスタムバックエンドを作成するには、beautyspot.db.TaskDBBase 抽象基底クラス(Abstract Base Class)を継承し、以下のメソッドを実装する必要があります。

Bases: ABC

Abstract base class providing default no-op implementations for maintenance methods. Actual backends should implement TaskDBCore and optionally TaskDBMaintenance.

Source code in src/beautyspot/db.py
class TaskDBBase(ABC):
    """
    Abstract base class providing default no-op implementations for maintenance methods.
    Actual backends should implement TaskDBCore and optionally TaskDBMaintenance.
    """

    @abstractmethod
    def init_schema(self):
        pass

    @abstractmethod
    def get(
        self, cache_key: str, *, include_expired: bool = False
    ) -> Optional[TaskRecord]:
        pass

    @abstractmethod
    def save(
        self,
        cache_key: str,
        func_name: str,
        func_identifier: Optional[str],
        input_id: str,
        version: Optional[str],
        result_type: str,
        content_type: Optional[str],
        result_value: Optional[str] = None,
        result_data: Optional[bytes] = None,
        expires_at: Optional[datetime] = None,
    ):
        pass

    @abstractmethod
    def delete(self, cache_key: str) -> bool:
        pass

    # --- Maintenance Methods (Default implementations) ---
    def delete_expired(self) -> int:
        """Delete tasks that have passed their expiration time."""
        return 0

    def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int:
        """Delete tasks older than the specified datetime."""
        return 0

    def get_outdated_tasks(
        self, older_than: datetime, func_name: Optional[str] = None
    ) -> list[tuple[str, str, str]]:
        """Retrieve tasks older than the specified datetime (Preview for prune)."""
        return []

    def get_blob_refs(self) -> Optional[set[str]]:
        """Retrieve all 'result_value' entries that point to external storage."""
        return None

    def delete_all(self, func_name: Optional[str] = None) -> int:
        """Delete all tasks, optionally filtered by function name."""
        return 0

    def get_keys_start_with(self, prefix: str) -> list[str]:
        """Retrieve cache keys that start with the given prefix."""
        return []

    def get_history(self, limit: int = 1000) -> "pd.DataFrame":
        """Get task history. Returns an empty DataFrame by default."""
        try:
            import pandas as pd

            return pd.DataFrame()
        except ImportError:
            raise ImportError("Pandas is required for this feature.")

delete_all(func_name=None)

Delete all tasks, optionally filtered by function name.

Source code in src/beautyspot/db.py
def delete_all(self, func_name: Optional[str] = None) -> int:
    """Delete all tasks, optionally filtered by function name."""
    return 0

delete_expired()

Delete tasks that have passed their expiration time.

Source code in src/beautyspot/db.py
def delete_expired(self) -> int:
    """Delete tasks that have passed their expiration time."""
    return 0

get_blob_refs()

Retrieve all 'result_value' entries that point to external storage.

Source code in src/beautyspot/db.py
def get_blob_refs(self) -> Optional[set[str]]:
    """Retrieve all 'result_value' entries that point to external storage."""
    return None

get_history(limit=1000)

Get task history. Returns an empty DataFrame by default.

Source code in src/beautyspot/db.py
def get_history(self, limit: int = 1000) -> "pd.DataFrame":
    """Get task history. Returns an empty DataFrame by default."""
    try:
        import pandas as pd

        return pd.DataFrame()
    except ImportError:
        raise ImportError("Pandas is required for this feature.")

get_keys_start_with(prefix)

Retrieve cache keys that start with the given prefix.

Source code in src/beautyspot/db.py
def get_keys_start_with(self, prefix: str) -> list[str]:
    """Retrieve cache keys that start with the given prefix."""
    return []

get_outdated_tasks(older_than, func_name=None)

Retrieve tasks older than the specified datetime (Preview for prune).

Source code in src/beautyspot/db.py
def get_outdated_tasks(
    self, older_than: datetime, func_name: Optional[str] = None
) -> list[tuple[str, str, str]]:
    """Retrieve tasks older than the specified datetime (Preview for prune)."""
    return []

prune(older_than, func_name=None)

Delete tasks older than the specified datetime.

Source code in src/beautyspot/db.py
def prune(self, older_than: datetime, func_name: Optional[str] = None) -> int:
    """Delete tasks older than the specified datetime."""
    return 0

実装の要件 (Contract)

  • Thread Safety: Spot はマルチスレッドで動作する可能性があります。データベースアダプタはスレッドセーフである必要があります。
  • Schema Initialization: init_schema()Spot 初期化時に毎回呼ばれます。「テーブルがなければ作成する(IF NOT EXISTS)」ように実装してください。
  • Idempotency: save() は同じキーで何度も呼ばれる可能性があります。INSERT OR REPLACE (Upsert) の挙動を実装してください。
  • Function Identity: func_identifier (完全修飾名) が渡された場合は保存し、未指定なら func_name にフォールバックしてください。同名関数の衝突回避に使われます。

2. Implementation Example

ここでは例として、開発やテストに便利な「インメモリデータベース(辞書ベース)」の実装を示します。 本番環境で PostgreSQL 等を使用する場合も、基本的な構造は同じです。

from typing import Any, Dict, Optional
from datetime import datetime
import pandas as pd
from beautyspot.db import TaskDBBase

class MemoryTaskDB(TaskDBBase):
    """
    オンメモリで動作する揮発性のバックエンド。
    テストや、永続化が不要な一時的なスクリプトに最適です。
    """
    def __init__(self):
        self._storage: Dict[str, Dict[str, Any]] = {}

    def init_schema(self):
        # メモリ上の辞書なのでスキーマ作成は不要
        pass

    def get(self, cache_key: str, *, include_expired: bool = False) -> Optional[Dict[str, Any]]:
        return self._storage.get(cache_key)

    def save(
        self,
        cache_key: str,
        func_name: str,
        func_identifier: Optional[str],
        input_id: str,
        version: Optional[str],
        result_type: str,
        content_type: Optional[str],
        result_value: Optional[str] = None,
        result_data: Optional[bytes] = None,
        expires_at: Optional[datetime] = None,
    ):
        # 辞書に保存(Upsert)
        self._storage[cache_key] = {
            "func_name": func_name,
            "func_identifier": func_identifier or func_name,
            "input_id": input_id,
            "version": version,
            "result_type": result_type,
            "content_type": content_type,
            "result_value": result_value,
            "result_data": result_data,
            "expires_at": expires_at,
            "updated_at": pd.Timestamp.now() # 履歴用
        }

    def get_history(self, limit: int = 1000) -> pd.DataFrame:
        if not self._storage:
            return pd.DataFrame()

        # 辞書からDataFrameを作成
        df = pd.DataFrame(list(self._storage.values()))
        df["cache_key"] = list(self._storage.keys())
        return df.sort_values("updated_at", ascending=False).head(limit)

3. Injection (How to use)

作成したカスタムクラスのインスタンスを、Spotdb 引数に渡すだけです。

import beautyspot as bs

# 1. カスタムDBをインスタンス化
my_memory_db = MemoryTaskDB()

# 2. Spotに注入 (パス文字列ではなく、インスタンスを渡す)
spot = bs.Spot("memory_app", db=my_memory_db)

@spot.mark
def calc(x):
    return x * 2

# この結果は SQLite ファイルではなく、メモリ上に保存されます
print(calc(10)) 

4. Advanced: Using PostgreSQL / MySQL

RDBMS を使用する場合は、sqlalchemypsycopg2 を使用して TaskDBBase を実装します。 src/beautyspot/db.py 内の SQLiteTaskDB の実装が参考になります。

特に以下の点に注意してください:

  • 接続管理: saveget のたびに接続を開くか、コネクションプールを使用するかを適切に設計してください。
  • JSONシリアライズ: beautyspot は結果を JSON 文字列として渡します。DB側には TEXT 型または JSONB 型のカラムを用意してください。