Skip to content

Storage

beautyspot.storage は、バイナリデータの永続化を抽象化するインターフェースと、その具体的な実装を提供します。

beautyspot.storage

AlwaysBlobPolicy dataclass

Bases: StoragePolicyProtocol

Policy that always saves data as a blob. Equivalent to setting default_save_blob=True.

Source code in src/beautyspot/storage.py
@dataclass
class AlwaysBlobPolicy(StoragePolicyProtocol):
    """
    Policy that always saves data as a blob.
    Equivalent to setting `default_save_blob=True`.
    """

    def should_save_as_blob(self, data: bytes) -> bool:
        return True

BlobStorageBase

Bases: ABC

Abstract base class for large object storage (BLOBs). Implementations should at least fulfill BlobStorageCore.

Source code in src/beautyspot/storage.py
class BlobStorageBase(ABC):
    """
    Abstract base class for large object storage (BLOBs).
    Implementations should at least fulfill BlobStorageCore.
    """

    @abstractmethod
    def save(self, key: str, data: ReadableBuffer) -> str:
        """
        Persist the data associated with the given key.
        Returns a location identifier.
        """
        pass

    @abstractmethod
    def load(self, location: str) -> bytes:
        """
        Retrieve data from the specified location.
        """
        pass

    @abstractmethod
    def delete(self, location: str) -> None:
        """
        Delete the blob at the specified location.
        Should be idempotent (no error if file missing).
        """
        pass

    @abstractmethod
    def list_keys(self) -> Iterator[str]:
        """
        Yields location identifiers for all stored blobs.
        Used for Garbage Collection.
        MUST yield the same format (path/URI) that is accepted by `delete`.
        """
        pass

    @abstractmethod
    def get_mtime(self, location: str) -> float:
        """
        Get the last modified time of the blob as a POSIX timestamp.
        Used to prevent race conditions during Garbage Collection.
        """
        pass

delete(location) abstractmethod

Delete the blob at the specified location. Should be idempotent (no error if file missing).

Source code in src/beautyspot/storage.py
@abstractmethod
def delete(self, location: str) -> None:
    """
    Delete the blob at the specified location.
    Should be idempotent (no error if file missing).
    """
    pass

get_mtime(location) abstractmethod

Get the last modified time of the blob as a POSIX timestamp. Used to prevent race conditions during Garbage Collection.

Source code in src/beautyspot/storage.py
@abstractmethod
def get_mtime(self, location: str) -> float:
    """
    Get the last modified time of the blob as a POSIX timestamp.
    Used to prevent race conditions during Garbage Collection.
    """
    pass

list_keys() abstractmethod

Yields location identifiers for all stored blobs. Used for Garbage Collection. MUST yield the same format (path/URI) that is accepted by delete.

Source code in src/beautyspot/storage.py
@abstractmethod
def list_keys(self) -> Iterator[str]:
    """
    Yields location identifiers for all stored blobs.
    Used for Garbage Collection.
    MUST yield the same format (path/URI) that is accepted by `delete`.
    """
    pass

load(location) abstractmethod

Retrieve data from the specified location.

Source code in src/beautyspot/storage.py
@abstractmethod
def load(self, location: str) -> bytes:
    """
    Retrieve data from the specified location.
    """
    pass

save(key, data) abstractmethod

Persist the data associated with the given key. Returns a location identifier.

Source code in src/beautyspot/storage.py
@abstractmethod
def save(self, key: str, data: ReadableBuffer) -> str:
    """
    Persist the data associated with the given key.
    Returns a location identifier.
    """
    pass

BlobStorageCore

Bases: Protocol

Core interface for large object storage required during execution.

Source code in src/beautyspot/storage.py
@runtime_checkable
class BlobStorageCore(Protocol):
    """
    Core interface for large object storage required during execution.
    """

    def save(self, key: str, data: ReadableBuffer) -> str: ...
    def load(self, location: str) -> bytes: ...
    def delete(self, location: str) -> None: ...

LocalStorage

Bases: BlobStorageMaintenable

Source code in src/beautyspot/storage.py
class LocalStorage(BlobStorageMaintenable):
    def __init__(self, base_dir: str | Path):
        # Resolve to absolute path explicitly on init
        self.base_dir = Path(base_dir).resolve()
        self._ensure_cache_dir(self.base_dir)

    @staticmethod
    def _ensure_cache_dir(directory: Path) -> None:
        """
        ディレクトリを作成し、Gitの管理下に入らないよう .gitignore を配置する。
        """
        directory.mkdir(parents=True, exist_ok=True)
        gitignore_path = directory / ".gitignore"
        if not gitignore_path.exists():
            try:
                gitignore_path.write_text("*\n")
            except OSError as e:
                # 権限問題などで書けない場合は処理を続行(ログのみ)
                logging.warning(f"Failed to create .gitignore in {directory}: {e}")

    def _validate_key(self, key: str):
        """save() に渡されるキャッシュキーを検証する。

        Note:
            この検証は save() の引数(通常は SHA-256 ハッシュ)にのみ適用される。
            list_keys() が返すロケーション文字列(例: 'subdir/hash.bin')は
            レガシーデータとの互換性のためにパス区切り文字を含む場合があり、
            load() / delete() では別途パストラバーサルチェックを行う。
        """
        # Prevent Path Traversal
        if ".." in key or "/" in key or "\\" in key:
            raise ValidationError(
                f"Invalid key: '{key}'. Keys must not contain path separators."
            )

    def save(self, key: str, data: ReadableBuffer) -> str:
        """
        指定されたキーでデータをローカルディスクに保存し、ファイル名(location)を返す。

        単純な `open(..., 'wb')` による上書きは行わず、`tempfile.mkstemp` で一意な
        一時ファイルを作成して書き込んだ後、`os.replace` でアトミックにリネームする手法を採用している。
        これは以下の2点を防ぐためである。
        1. 並行実行時(複数スレッド/プロセス)に同じキャッシュキーに同時に書き込もうとした際のファイルの競合・破損。
        2. 書き込み中のプロセス強制終了などによる、不完全で壊れたファイルの残留。

        Args:
            key (str): 保存するキャッシュキー
            data (ReadableBuffer): 保存するバイトデータ

        Returns:
            str: 保存されたファイル名
        """
        self._validate_key(key)
        filename = f"{key}.bin"
        filepath = self.base_dir / filename

        # Atomic write: mkstemp generates a unique temp file to avoid collisions
        # when multiple threads/processes write concurrently.
        # flush + fsync ensures data reaches disk before rename,
        # so a crash between write and rename never leaves a corrupt file.
        fd, temp_path_str = tempfile.mkstemp(dir=self.base_dir, suffix=".spot_tmp")
        try:
            with os.fdopen(fd, "wb", closefd=True) as f:
                f.write(data)
                f.flush()
                os.fsync(f.fileno())
            Path(temp_path_str).replace(filepath)
        except BaseException:
            try:
                os.unlink(temp_path_str)
            except OSError:
                # PermissionError等で消せなかった場合は残留するが、後でGCが回収する
                pass
            raise

        return filename

    def load(self, location: str) -> bytes:
        # [CHANGED] Resolve location relative to base_dir.
        # Note: If 'location' is an absolute path (legacy data), pathlib behavior
        # (base / abs) returns abs, so backward compatibility on the same machine is preserved.
        full_path = (self.base_dir / location).resolve()

        # Security check: Ensure the path is strictly within the base_dir
        if not full_path.is_relative_to(self.base_dir):
            raise CacheCorruptedError(
                f"Access denied: {location} resolves to {full_path}, which is outside {self.base_dir}"
            )

        if not full_path.exists():
            raise CacheCorruptedError(f"Local blob lost: {full_path}")

        try:
            with open(full_path, "rb") as f:
                return f.read()
        except OSError as e:
            raise CacheCorruptedError(f"Failed to read blob: {e}")

    def delete(self, location: str) -> None:
        """
        Delete the file at the given location.

        Note:
            For performance reasons, this method does not synchronously remove
            empty parent directories. Directory cleanup is deferred to the
            asynchronous maintenance task (`prune_empty_dirs` / `beautyspot gc`).
        """
        full_path = (self.base_dir / location).resolve()

        if not full_path.is_relative_to(self.base_dir):
            return

        try:
            os.remove(full_path)
        except FileNotFoundError:
            pass
        except (PermissionError, OSError) as e:
            # 他のプロセスによるロックや権限の問題をログに残すが、処理は継続する
            logging.warning(
                f"Failed to delete blob at {full_path}: {e}. It will be handled by subsequent GC."
            )

    def list_keys(self) -> Iterator[str]:
        """
        Yields relative paths of all .bin files, including subdirectories.
        Example: 'hash.bin' or 'subdir/hash.bin'
        """
        if not self.base_dir.exists():
            return

        # rglob で再帰的に探索
        for entry in self.base_dir.rglob("*.bin"):
            if entry.is_file():
                # base_dir からの相対パスを返す
                yield str(entry.relative_to(self.base_dir).as_posix())

    def get_mtime(self, location: str) -> float:
        full_path = (self.base_dir / location).resolve()
        if not full_path.is_relative_to(self.base_dir):
            raise ValueError(f"Access denied: {location}")
        try:
            return full_path.stat().st_mtime
        except OSError as e:
            raise CacheCorruptedError(f"Failed to get mtime for blob: {e}")

    def clean_temp_files(self, max_age_seconds: int = 86400) -> int:
        """
        Remove '.spot_tmp' files that are older than max_age_seconds.
        Provides a fail-safe against leaked temporary files due to file locks.
        """
        if not self.base_dir.exists():
            return 0

        removed_count = 0
        now = time.time()

        for entry in self.base_dir.rglob("*.spot_tmp"):
            if entry.is_file():
                try:
                    # 猶予期間(デフォルト24時間)を経過しているかチェック
                    if now - entry.stat().st_mtime > max_age_seconds:
                        entry.unlink()
                        removed_count += 1
                except OSError:
                    # アンチウイルスソフト等で現在もロックされている場合はスキップ
                    pass

        return removed_count

    def prune_empty_dirs(self) -> int:
        """
        Recursively remove empty directories under base_dir.
        Also removes directories containing only system generated files (.DS_Store, etc).
        Returns the count of removed directories.

        Note:
            base_dir 自体は削除しません。base_dir が削除されると以降の
            save() で FileNotFoundError が発生するためです。
        """
        if not self.base_dir.exists():
            return 0

        IGNORED_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
        removed_count = 0

        # os.walk(topdown=False) で深い階層から順に処理
        for root, dirs, files in os.walk(self.base_dir, topdown=False):
            path = Path(root)

            # base_dir 自体は絶対に削除しない
            if path == self.base_dir:
                continue

            existing_files = set(files)

            # 無視リスト以外のファイルがある場合 -> 削除不可
            if existing_files - IGNORED_FILES:
                continue

            # 無視リストにあるファイルしか残っていない場合、それらを消して空にする
            for f in existing_files:
                try:
                    (path / f).unlink()
                except OSError:
                    pass

            # ディレクトリ削除を試みる
            try:
                path.rmdir()
                removed_count += 1
            except OSError:
                pass

        return removed_count

clean_temp_files(max_age_seconds=86400)

Remove '.spot_tmp' files that are older than max_age_seconds. Provides a fail-safe against leaked temporary files due to file locks.

Source code in src/beautyspot/storage.py
def clean_temp_files(self, max_age_seconds: int = 86400) -> int:
    """
    Remove '.spot_tmp' files that are older than max_age_seconds.
    Provides a fail-safe against leaked temporary files due to file locks.
    """
    if not self.base_dir.exists():
        return 0

    removed_count = 0
    now = time.time()

    for entry in self.base_dir.rglob("*.spot_tmp"):
        if entry.is_file():
            try:
                # 猶予期間(デフォルト24時間)を経過しているかチェック
                if now - entry.stat().st_mtime > max_age_seconds:
                    entry.unlink()
                    removed_count += 1
            except OSError:
                # アンチウイルスソフト等で現在もロックされている場合はスキップ
                pass

    return removed_count

delete(location)

Delete the file at the given location.

Note

For performance reasons, this method does not synchronously remove empty parent directories. Directory cleanup is deferred to the asynchronous maintenance task (prune_empty_dirs / beautyspot gc).

Source code in src/beautyspot/storage.py
def delete(self, location: str) -> None:
    """
    Delete the file at the given location.

    Note:
        For performance reasons, this method does not synchronously remove
        empty parent directories. Directory cleanup is deferred to the
        asynchronous maintenance task (`prune_empty_dirs` / `beautyspot gc`).
    """
    full_path = (self.base_dir / location).resolve()

    if not full_path.is_relative_to(self.base_dir):
        return

    try:
        os.remove(full_path)
    except FileNotFoundError:
        pass
    except (PermissionError, OSError) as e:
        # 他のプロセスによるロックや権限の問題をログに残すが、処理は継続する
        logging.warning(
            f"Failed to delete blob at {full_path}: {e}. It will be handled by subsequent GC."
        )

list_keys()

Yields relative paths of all .bin files, including subdirectories. Example: 'hash.bin' or 'subdir/hash.bin'

Source code in src/beautyspot/storage.py
def list_keys(self) -> Iterator[str]:
    """
    Yields relative paths of all .bin files, including subdirectories.
    Example: 'hash.bin' or 'subdir/hash.bin'
    """
    if not self.base_dir.exists():
        return

    # rglob で再帰的に探索
    for entry in self.base_dir.rglob("*.bin"):
        if entry.is_file():
            # base_dir からの相対パスを返す
            yield str(entry.relative_to(self.base_dir).as_posix())

prune_empty_dirs()

Recursively remove empty directories under base_dir. Also removes directories containing only system generated files (.DS_Store, etc). Returns the count of removed directories.

Note

base_dir 自体は削除しません。base_dir が削除されると以降の save() で FileNotFoundError が発生するためです。

Source code in src/beautyspot/storage.py
def prune_empty_dirs(self) -> int:
    """
    Recursively remove empty directories under base_dir.
    Also removes directories containing only system generated files (.DS_Store, etc).
    Returns the count of removed directories.

    Note:
        base_dir 自体は削除しません。base_dir が削除されると以降の
        save() で FileNotFoundError が発生するためです。
    """
    if not self.base_dir.exists():
        return 0

    IGNORED_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
    removed_count = 0

    # os.walk(topdown=False) で深い階層から順に処理
    for root, dirs, files in os.walk(self.base_dir, topdown=False):
        path = Path(root)

        # base_dir 自体は絶対に削除しない
        if path == self.base_dir:
            continue

        existing_files = set(files)

        # 無視リスト以外のファイルがある場合 -> 削除不可
        if existing_files - IGNORED_FILES:
            continue

        # 無視リストにあるファイルしか残っていない場合、それらを消して空にする
        for f in existing_files:
            try:
                (path / f).unlink()
            except OSError:
                pass

        # ディレクトリ削除を試みる
        try:
            path.rmdir()
            removed_count += 1
        except OSError:
            pass

    return removed_count

save(key, data)

指定されたキーでデータをローカルディスクに保存し、ファイル名(location)を返す。

単純な open(..., 'wb') による上書きは行わず、tempfile.mkstemp で一意な 一時ファイルを作成して書き込んだ後、os.replace でアトミックにリネームする手法を採用している。 これは以下の2点を防ぐためである。 1. 並行実行時(複数スレッド/プロセス)に同じキャッシュキーに同時に書き込もうとした際のファイルの競合・破損。 2. 書き込み中のプロセス強制終了などによる、不完全で壊れたファイルの残留。

Parameters:

Name Type Description Default
key str

保存するキャッシュキー

required
data ReadableBuffer

保存するバイトデータ

required

Returns:

Name Type Description
str str

保存されたファイル名

Source code in src/beautyspot/storage.py
def save(self, key: str, data: ReadableBuffer) -> str:
    """
    指定されたキーでデータをローカルディスクに保存し、ファイル名(location)を返す。

    単純な `open(..., 'wb')` による上書きは行わず、`tempfile.mkstemp` で一意な
    一時ファイルを作成して書き込んだ後、`os.replace` でアトミックにリネームする手法を採用している。
    これは以下の2点を防ぐためである。
    1. 並行実行時(複数スレッド/プロセス)に同じキャッシュキーに同時に書き込もうとした際のファイルの競合・破損。
    2. 書き込み中のプロセス強制終了などによる、不完全で壊れたファイルの残留。

    Args:
        key (str): 保存するキャッシュキー
        data (ReadableBuffer): 保存するバイトデータ

    Returns:
        str: 保存されたファイル名
    """
    self._validate_key(key)
    filename = f"{key}.bin"
    filepath = self.base_dir / filename

    # Atomic write: mkstemp generates a unique temp file to avoid collisions
    # when multiple threads/processes write concurrently.
    # flush + fsync ensures data reaches disk before rename,
    # so a crash between write and rename never leaves a corrupt file.
    fd, temp_path_str = tempfile.mkstemp(dir=self.base_dir, suffix=".spot_tmp")
    try:
        with os.fdopen(fd, "wb", closefd=True) as f:
            f.write(data)
            f.flush()
            os.fsync(f.fileno())
        Path(temp_path_str).replace(filepath)
    except BaseException:
        try:
            os.unlink(temp_path_str)
        except OSError:
            # PermissionError等で消せなかった場合は残留するが、後でGCが回収する
            pass
        raise

    return filename

Maintenable

Bases: Protocol

Extended interface for maintenance tasks (GC).

Source code in src/beautyspot/storage.py
@runtime_checkable
class Maintenable(Protocol):
    """
    Extended interface for maintenance tasks (GC).
    """

    def list_keys(self) -> Iterator[str]: ...
    def get_mtime(self, location: str) -> float: ...

S3Storage

Bases: BlobStorageMaintenable

Source code in src/beautyspot/storage.py
class S3Storage(BlobStorageMaintenable):
    def __init__(
        self,
        s3_uri: str,
        s3_opts: dict[str, Any] | None = None,
    ):
        if not boto3:
            raise ImportError("Run `pip install beautyspot[s3]` to use S3 storage.")

        parts = s3_uri.replace("s3://", "").split("/", 1)
        self.bucket_name = parts[0]
        raw_prefix = parts[1].rstrip("/") if len(parts) > 1 else ""
        self.prefix = raw_prefix if raw_prefix else "blobs"

        opts = s3_opts or {}
        self.s3 = boto3.client("s3", **opts)

    @staticmethod
    def _parse_s3_uri(location: str) -> tuple[str, str]:
        """Parse an s3:// URI into (bucket, key). Raises ValueError for invalid URIs."""
        if not location.startswith("s3://"):
            raise ValidationError(f"Expected an s3:// URI, got: {location!r}")
        path = location[len("s3://") :]
        parts = path.split("/", 1)
        if len(parts) != 2 or not parts[0] or not parts[1]:
            raise ValidationError(
                f"Invalid S3 URI (expected s3://bucket/key): {location!r}"
            )
        return parts[0], parts[1]

    def save(self, key: str, data: ReadableBuffer) -> str:
        s3_key = f"{self.prefix}/{key}.bin"
        buffer = io.BytesIO(data)
        # upload_fileobj は大容量データ(>5GB)に対してマルチパートアップロードを
        # 自動的に使用し、put_object の 5GB 上限を回避する。
        self.s3.upload_fileobj(buffer, self.bucket_name, s3_key)
        return f"s3://{self.bucket_name}/{s3_key}"

    def load(self, location: str) -> bytes:
        bucket, key = self._parse_s3_uri(location)
        try:
            resp = self.s3.get_object(Bucket=bucket, Key=key)
            body = resp["Body"]
            try:
                return body.read()
            finally:
                body.close()
        except ClientError as e:
            raise CacheCorruptedError(f"S3 blob lost: {location}") from e

    def delete(self, location: str) -> None:
        bucket, key = self._parse_s3_uri(location)
        try:
            self.s3.delete_object(Bucket=bucket, Key=key)
        except ClientError as e:
            # S3 の delete_object は存在しないオブジェクトに対してもエラーを返さないため、
            # ここに到達するのは権限エラーやネットワーク障害などの深刻なケース。
            # 握り潰さずにログへ記録し、GC が後続で回収できるようにする。
            logging.warning(
                f"Failed to delete S3 object {location}: {e}. "
                "It will be handled by subsequent GC."
            )

    def get_mtime(self, location: str) -> float:
        bucket, key = self._parse_s3_uri(location)
        try:
            resp = self.s3.head_object(Bucket=bucket, Key=key)
            # LastModified is a datetime object, convert to POSIX timestamp
            return resp["LastModified"].timestamp()
        except ClientError as e:
            raise CacheCorruptedError(
                f"S3 blob lost or inaccessible: {location}"
            ) from e

    def list_keys(self) -> Iterator[str]:
        """Yields s3:// URIs for all objects in the prefix."""
        paginator = self.s3.get_paginator("list_objects_v2")
        for page in paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix):
            for obj in page.get("Contents", []):
                yield f"s3://{self.bucket_name}/{obj['Key']}"

list_keys()

Yields s3:// URIs for all objects in the prefix.

Source code in src/beautyspot/storage.py
def list_keys(self) -> Iterator[str]:
    """Yields s3:// URIs for all objects in the prefix."""
    paginator = self.s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix):
        for obj in page.get("Contents", []):
            yield f"s3://{self.bucket_name}/{obj['Key']}"

StoragePolicyProtocol

Bases: Protocol

Protocol to determine if data should be saved as a blob (file/object storage) or directly in the database based on the data content (usually size).

Source code in src/beautyspot/storage.py
@runtime_checkable
class StoragePolicyProtocol(Protocol):
    """
    Protocol to determine if data should be saved as a blob (file/object storage)
    or directly in the database based on the data content (usually size).
    """

    def should_save_as_blob(self, data: bytes) -> bool: ...

ThresholdStoragePolicy dataclass

Bases: StoragePolicyProtocol

Policy that saves data as a blob if its size exceeds a configured threshold. This is the recommended policy for automatic optimization.

Source code in src/beautyspot/storage.py
@dataclass
class ThresholdStoragePolicy(StoragePolicyProtocol):
    """
    Policy that saves data as a blob if its size exceeds a configured threshold.
    This is the recommended policy for automatic optimization.
    """

    threshold: int

    def should_save_as_blob(self, data: bytes) -> bool:
        return len(data) > self.threshold

WarningOnlyPolicy dataclass

Bases: StoragePolicyProtocol

Policy for backward compatibility (v2.0 behavior). Does not force blob storage, but logs a warning if size exceeds threshold.

Source code in src/beautyspot/storage.py
@dataclass
class WarningOnlyPolicy(StoragePolicyProtocol):
    """
    Policy for backward compatibility (v2.0 behavior).
    Does not force blob storage, but logs a warning if size exceeds threshold.
    """

    warning_threshold: int
    # logger は比較・repr 対象外にする。
    # dataclass の自動生成 __eq__ に logger インスタンスが混入するのを防ぐ。
    logger: logging.Logger = field(
        default_factory=lambda: logging.getLogger("beautyspot"),
        compare=False,
        repr=False,
    )

    def should_save_as_blob(self, data: bytes) -> bool:
        if len(data) > self.warning_threshold:
            self.logger.warning(
                f"⚠️ Large data detected ({len(data)} bytes). "
                f"Consider using `save_blob=True` or a stricter StoragePolicy."
            )
        return False

概要

beautyspot は、キャッシュ結果をデータベース内に保存するか、外部ストレージに保存するかを自動的、あるいは明示的に切り替えることができます。BlobStorageBase を継承することで、独自のストレージバックエンド(例:Google Cloud Storage, Azure Blob Storage)を実装することも可能です。

インターフェース: BlobStorageBase

すべてのストレージ実装が提供すべき基本メソッドです。

  • save(key, data): データを保存し、その場所を示す一意のロケーション(パスや URI)を返します。
  • load(location): 指定されたロケーションからバイナリデータを取得します。
  • delete(location): 指定されたロケーションのデータを削除します。
  • list_keys(): ストレージ内のすべてのロケーションを列挙します(ガベージコレクション用)。

標準実装

LocalStorage

ローカルのファイルシステムを使用する実装です。

  • 自動生成: コンストラクタで指定されたディレクトリが存在しない場合は自動的に作成します。
  • 安全性: パス・トラバーサル攻撃を防ぐため、キーにパス区切り文字が含まれていないかバリデーションを行います。
  • アトミックな書き込み: 一時ファイルに書き込んだ後でリネーム(replace)を行うことで、書き込み中のクラッシュによるデータ破損を防ぎます。

S3Storage

AWS S3 または互換ストレージを使用する実装です。

  • 依存関係: 使用には boto3 が必要です。
  • URI 形式: s3://bucket-name/prefix 形式の URI をサポートします。

ファクトリ関数: create_storage

パスの形式(s3:// で始まるかどうか)を判別し、適切なストレージインスタンスを動的に生成します。MaintenanceService.from_path などで内部的に利用されています。

使用例

明示的な初期化

from beautyspot.storage import LocalStorage, S3Storage

# ローカルストレージ
local_store = LocalStorage("./.beautyspot/blobs")

# S3ストレージ
s3_store = S3Storage("s3://my-bucket/cache", s3_opts={"region_name": "ap-northeast-1"})

Spot への注入

from beautyspot import Spot
from beautyspot.storage import create_storage

# ファクトリ関数を使用してストレージを準備
storage = create_storage("./.beautyspot/blobs")

spot = Spot(
    name="my_app",
    db=db,
    serializer=serializer,
    storage=storage,
    default_save_blob=True  # デフォルトですべてのタスクを外部ストレージに保存
)

例外

  • CacheCorruptedError: コードの変更などにより、取得した Blob データのデシリアライズに失敗した場合に送出されます。