🛠️ Custom Database Backend Guide
beautyspot はデフォルトで SQLite を使用しますが、大規模な分散処理や、クラウドネイティブな環境(Kubernetesなど)で動作させる場合、PostgreSQL や MySQL、あるいは DynamoDB といった外部データベースを利用したくなるでしょう。
v1.0.0 から導入された Dependency Injection (DI) 機構を利用することで、ライブラリのコードを変更することなく、バックエンドを自由に差し替えることができます。
1. The Interface: TaskDBBase
カスタムバックエンドを作成するには、beautyspot.db.TaskDBBase 抽象基底クラス(Abstract Base Class)を継承し、以下のメソッドを実装する必要があります。
Bases: ABC
Abstract base class providing default no-op implementations for maintenance methods. Actual backends should implement TaskDBCore and optionally TaskDBMaintenance.
Source code in src/beautyspot/db.py
delete_all(func_name=None)
delete_expired()
get_blob_refs()
get_history(limit=1000)
Get task history. Returns an empty DataFrame by default.
Source code in src/beautyspot/db.py
get_keys_start_with(prefix)
get_outdated_tasks(older_than, func_name=None)
Retrieve tasks older than the specified datetime (Preview for prune).
実装の要件 (Contract)
- Thread Safety:
Spotはマルチスレッドで動作する可能性があります。データベースアダプタはスレッドセーフである必要があります。 - Schema Initialization:
init_schema()はSpot初期化時に毎回呼ばれます。「テーブルがなければ作成する(IF NOT EXISTS)」ように実装してください。 - Idempotency:
save()は同じキーで何度も呼ばれる可能性があります。INSERT OR REPLACE(Upsert) の挙動を実装してください。 - Function Identity:
func_identifier(完全修飾名) が渡された場合は保存し、未指定ならfunc_nameにフォールバックしてください。同名関数の衝突回避に使われます。
2. Implementation Example
ここでは例として、開発やテストに便利な「インメモリデータベース(辞書ベース)」の実装を示します。 本番環境で PostgreSQL 等を使用する場合も、基本的な構造は同じです。
from typing import Any, Dict, Optional
from datetime import datetime
import pandas as pd
from beautyspot.db import TaskDBBase
class MemoryTaskDB(TaskDBBase):
"""
オンメモリで動作する揮発性のバックエンド。
テストや、永続化が不要な一時的なスクリプトに最適です。
"""
def __init__(self):
self._storage: Dict[str, Dict[str, Any]] = {}
def init_schema(self):
# メモリ上の辞書なのでスキーマ作成は不要
pass
def get(self, cache_key: str, *, include_expired: bool = False) -> Optional[Dict[str, Any]]:
return self._storage.get(cache_key)
def save(
self,
cache_key: str,
func_name: str,
func_identifier: Optional[str],
input_id: str,
version: Optional[str],
result_type: str,
content_type: Optional[str],
result_value: Optional[str] = None,
result_data: Optional[bytes] = None,
expires_at: Optional[datetime] = None,
):
# 辞書に保存(Upsert)
self._storage[cache_key] = {
"func_name": func_name,
"func_identifier": func_identifier or func_name,
"input_id": input_id,
"version": version,
"result_type": result_type,
"content_type": content_type,
"result_value": result_value,
"result_data": result_data,
"expires_at": expires_at,
"updated_at": pd.Timestamp.now() # 履歴用
}
def get_history(self, limit: int = 1000) -> pd.DataFrame:
if not self._storage:
return pd.DataFrame()
# 辞書からDataFrameを作成
df = pd.DataFrame(list(self._storage.values()))
df["cache_key"] = list(self._storage.keys())
return df.sort_values("updated_at", ascending=False).head(limit)
3. Injection (How to use)
作成したカスタムクラスのインスタンスを、Spot の db 引数に渡すだけです。
import beautyspot as bs
# 1. カスタムDBをインスタンス化
my_memory_db = MemoryTaskDB()
# 2. Spotに注入 (パス文字列ではなく、インスタンスを渡す)
spot = bs.Spot("memory_app", db=my_memory_db)
@spot.mark
def calc(x):
return x * 2
# この結果は SQLite ファイルではなく、メモリ上に保存されます
print(calc(10))
4. Advanced: Using PostgreSQL / MySQL
RDBMS を使用する場合は、sqlalchemy や psycopg2 を使用して TaskDBBase を実装します。
src/beautyspot/db.py 内の SQLiteTaskDB の実装が参考になります。
特に以下の点に注意してください:
- 接続管理:
saveやgetのたびに接続を開くか、コネクションプールを使用するかを適切に設計してください。 - JSONシリアライズ:
beautyspotは結果を JSON 文字列として渡します。DB側にはTEXT型またはJSONB型のカラムを用意してください。