Storage
beautyspot.storage は、バイナリデータの永続化を抽象化するインターフェースと、その具体的な実装を提供します。
beautyspot.storage
AlwaysBlobPolicy
dataclass
Bases: StoragePolicyProtocol
Policy that always saves data as a blob.
Equivalent to setting default_save_blob=True.
Source code in src/beautyspot/storage.py
| @dataclass
class AlwaysBlobPolicy(StoragePolicyProtocol):
"""
Policy that always saves data as a blob.
Equivalent to setting `default_save_blob=True`.
"""
def should_save_as_blob(self, data: bytes) -> bool:
return True
|
BlobStorageBase
Bases: ABC
Abstract base class for large object storage (BLOBs).
Implementations should at least fulfill BlobStorageCore.
Source code in src/beautyspot/storage.py
| class BlobStorageBase(ABC):
"""
Abstract base class for large object storage (BLOBs).
Implementations should at least fulfill BlobStorageCore.
"""
@abstractmethod
def save(self, key: str, data: ReadableBuffer) -> str:
"""
Persist the data associated with the given key.
Returns a location identifier.
"""
pass
@abstractmethod
def load(self, location: str) -> bytes:
"""
Retrieve data from the specified location.
"""
pass
@abstractmethod
def delete(self, location: str) -> None:
"""
Delete the blob at the specified location.
Should be idempotent (no error if file missing).
"""
pass
@abstractmethod
def list_keys(self) -> Iterator[str]:
"""
Yields location identifiers for all stored blobs.
Used for Garbage Collection.
MUST yield the same format (path/URI) that is accepted by `delete`.
"""
pass
@abstractmethod
def get_mtime(self, location: str) -> float:
"""
Get the last modified time of the blob as a POSIX timestamp.
Used to prevent race conditions during Garbage Collection.
"""
pass
|
delete(location)
abstractmethod
Delete the blob at the specified location.
Should be idempotent (no error if file missing).
Source code in src/beautyspot/storage.py
| @abstractmethod
def delete(self, location: str) -> None:
"""
Delete the blob at the specified location.
Should be idempotent (no error if file missing).
"""
pass
|
get_mtime(location)
abstractmethod
Get the last modified time of the blob as a POSIX timestamp.
Used to prevent race conditions during Garbage Collection.
Source code in src/beautyspot/storage.py
| @abstractmethod
def get_mtime(self, location: str) -> float:
"""
Get the last modified time of the blob as a POSIX timestamp.
Used to prevent race conditions during Garbage Collection.
"""
pass
|
list_keys()
abstractmethod
Yields location identifiers for all stored blobs.
Used for Garbage Collection.
MUST yield the same format (path/URI) that is accepted by delete.
Source code in src/beautyspot/storage.py
| @abstractmethod
def list_keys(self) -> Iterator[str]:
"""
Yields location identifiers for all stored blobs.
Used for Garbage Collection.
MUST yield the same format (path/URI) that is accepted by `delete`.
"""
pass
|
load(location)
abstractmethod
Retrieve data from the specified location.
Source code in src/beautyspot/storage.py
| @abstractmethod
def load(self, location: str) -> bytes:
"""
Retrieve data from the specified location.
"""
pass
|
save(key, data)
abstractmethod
Persist the data associated with the given key.
Returns a location identifier.
Source code in src/beautyspot/storage.py
| @abstractmethod
def save(self, key: str, data: ReadableBuffer) -> str:
"""
Persist the data associated with the given key.
Returns a location identifier.
"""
pass
|
BlobStorageCore
Bases: Protocol
Core interface for large object storage required during execution.
Source code in src/beautyspot/storage.py
| @runtime_checkable
class BlobStorageCore(Protocol):
"""
Core interface for large object storage required during execution.
"""
def save(self, key: str, data: ReadableBuffer) -> str: ...
def load(self, location: str) -> bytes: ...
def delete(self, location: str) -> None: ...
|
LocalStorage
Bases: BlobStorageMaintenable
Source code in src/beautyspot/storage.py
| class LocalStorage(BlobStorageMaintenable):
def __init__(self, base_dir: str | Path):
# Resolve to absolute path explicitly on init
self.base_dir = Path(base_dir).resolve()
self._ensure_cache_dir(self.base_dir)
@staticmethod
def _ensure_cache_dir(directory: Path) -> None:
"""
ディレクトリを作成し、Gitの管理下に入らないよう .gitignore を配置する。
"""
directory.mkdir(parents=True, exist_ok=True)
gitignore_path = directory / ".gitignore"
if not gitignore_path.exists():
try:
gitignore_path.write_text("*\n")
except OSError as e:
# 権限問題などで書けない場合は処理を続行(ログのみ)
logging.warning(f"Failed to create .gitignore in {directory}: {e}")
def _validate_key(self, key: str):
"""save() に渡されるキャッシュキーを検証する。
Note:
この検証は save() の引数(通常は SHA-256 ハッシュ)にのみ適用される。
list_keys() が返すロケーション文字列(例: 'subdir/hash.bin')は
レガシーデータとの互換性のためにパス区切り文字を含む場合があり、
load() / delete() では別途パストラバーサルチェックを行う。
"""
# Prevent Path Traversal
if ".." in key or "/" in key or "\\" in key:
raise ValidationError(
f"Invalid key: '{key}'. Keys must not contain path separators."
)
def save(self, key: str, data: ReadableBuffer) -> str:
"""
指定されたキーでデータをローカルディスクに保存し、ファイル名(location)を返す。
単純な `open(..., 'wb')` による上書きは行わず、`tempfile.mkstemp` で一意な
一時ファイルを作成して書き込んだ後、`os.replace` でアトミックにリネームする手法を採用している。
これは以下の2点を防ぐためである。
1. 並行実行時(複数スレッド/プロセス)に同じキャッシュキーに同時に書き込もうとした際のファイルの競合・破損。
2. 書き込み中のプロセス強制終了などによる、不完全で壊れたファイルの残留。
Args:
key (str): 保存するキャッシュキー
data (ReadableBuffer): 保存するバイトデータ
Returns:
str: 保存されたファイル名
"""
self._validate_key(key)
filename = f"{key}.bin"
filepath = self.base_dir / filename
# Atomic write: mkstemp generates a unique temp file to avoid collisions
# when multiple threads/processes write concurrently.
# flush + fsync ensures data reaches disk before rename,
# so a crash between write and rename never leaves a corrupt file.
fd, temp_path_str = tempfile.mkstemp(dir=self.base_dir, suffix=".spot_tmp")
try:
with os.fdopen(fd, "wb", closefd=True) as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
Path(temp_path_str).replace(filepath)
except BaseException:
try:
os.unlink(temp_path_str)
except OSError:
# PermissionError等で消せなかった場合は残留するが、後でGCが回収する
pass
raise
return filename
def load(self, location: str) -> bytes:
# [CHANGED] Resolve location relative to base_dir.
# Note: If 'location' is an absolute path (legacy data), pathlib behavior
# (base / abs) returns abs, so backward compatibility on the same machine is preserved.
full_path = (self.base_dir / location).resolve()
# Security check: Ensure the path is strictly within the base_dir
if not full_path.is_relative_to(self.base_dir):
raise CacheCorruptedError(
f"Access denied: {location} resolves to {full_path}, which is outside {self.base_dir}"
)
if not full_path.exists():
raise CacheCorruptedError(f"Local blob lost: {full_path}")
try:
with open(full_path, "rb") as f:
return f.read()
except OSError as e:
raise CacheCorruptedError(f"Failed to read blob: {e}")
def delete(self, location: str) -> None:
"""
Delete the file at the given location.
Note:
For performance reasons, this method does not synchronously remove
empty parent directories. Directory cleanup is deferred to the
asynchronous maintenance task (`prune_empty_dirs` / `beautyspot gc`).
"""
full_path = (self.base_dir / location).resolve()
if not full_path.is_relative_to(self.base_dir):
return
try:
os.remove(full_path)
except FileNotFoundError:
pass
except (PermissionError, OSError) as e:
# 他のプロセスによるロックや権限の問題をログに残すが、処理は継続する
logging.warning(
f"Failed to delete blob at {full_path}: {e}. It will be handled by subsequent GC."
)
def list_keys(self) -> Iterator[str]:
"""
Yields relative paths of all .bin files, including subdirectories.
Example: 'hash.bin' or 'subdir/hash.bin'
"""
if not self.base_dir.exists():
return
# rglob で再帰的に探索
for entry in self.base_dir.rglob("*.bin"):
if entry.is_file():
# base_dir からの相対パスを返す
yield str(entry.relative_to(self.base_dir).as_posix())
def get_mtime(self, location: str) -> float:
full_path = (self.base_dir / location).resolve()
if not full_path.is_relative_to(self.base_dir):
raise ValueError(f"Access denied: {location}")
try:
return full_path.stat().st_mtime
except OSError as e:
raise CacheCorruptedError(f"Failed to get mtime for blob: {e}")
def clean_temp_files(self, max_age_seconds: int = 86400) -> int:
"""
Remove '.spot_tmp' files that are older than max_age_seconds.
Provides a fail-safe against leaked temporary files due to file locks.
"""
if not self.base_dir.exists():
return 0
removed_count = 0
now = time.time()
for entry in self.base_dir.rglob("*.spot_tmp"):
if entry.is_file():
try:
# 猶予期間(デフォルト24時間)を経過しているかチェック
if now - entry.stat().st_mtime > max_age_seconds:
entry.unlink()
removed_count += 1
except OSError:
# アンチウイルスソフト等で現在もロックされている場合はスキップ
pass
return removed_count
def prune_empty_dirs(self) -> int:
"""
Recursively remove empty directories under base_dir.
Also removes directories containing only system generated files (.DS_Store, etc).
Returns the count of removed directories.
Note:
base_dir 自体は削除しません。base_dir が削除されると以降の
save() で FileNotFoundError が発生するためです。
"""
if not self.base_dir.exists():
return 0
IGNORED_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
removed_count = 0
# os.walk(topdown=False) で深い階層から順に処理
for root, dirs, files in os.walk(self.base_dir, topdown=False):
path = Path(root)
# base_dir 自体は絶対に削除しない
if path == self.base_dir:
continue
existing_files = set(files)
# 無視リスト以外のファイルがある場合 -> 削除不可
if existing_files - IGNORED_FILES:
continue
# 無視リストにあるファイルしか残っていない場合、それらを消して空にする
for f in existing_files:
try:
(path / f).unlink()
except OSError:
pass
# ディレクトリ削除を試みる
try:
path.rmdir()
removed_count += 1
except OSError:
pass
return removed_count
|
clean_temp_files(max_age_seconds=86400)
Remove '.spot_tmp' files that are older than max_age_seconds.
Provides a fail-safe against leaked temporary files due to file locks.
Source code in src/beautyspot/storage.py
| def clean_temp_files(self, max_age_seconds: int = 86400) -> int:
"""
Remove '.spot_tmp' files that are older than max_age_seconds.
Provides a fail-safe against leaked temporary files due to file locks.
"""
if not self.base_dir.exists():
return 0
removed_count = 0
now = time.time()
for entry in self.base_dir.rglob("*.spot_tmp"):
if entry.is_file():
try:
# 猶予期間(デフォルト24時間)を経過しているかチェック
if now - entry.stat().st_mtime > max_age_seconds:
entry.unlink()
removed_count += 1
except OSError:
# アンチウイルスソフト等で現在もロックされている場合はスキップ
pass
return removed_count
|
delete(location)
Delete the file at the given location.
Note
For performance reasons, this method does not synchronously remove
empty parent directories. Directory cleanup is deferred to the
asynchronous maintenance task (prune_empty_dirs / beautyspot gc).
Source code in src/beautyspot/storage.py
| def delete(self, location: str) -> None:
"""
Delete the file at the given location.
Note:
For performance reasons, this method does not synchronously remove
empty parent directories. Directory cleanup is deferred to the
asynchronous maintenance task (`prune_empty_dirs` / `beautyspot gc`).
"""
full_path = (self.base_dir / location).resolve()
if not full_path.is_relative_to(self.base_dir):
return
try:
os.remove(full_path)
except FileNotFoundError:
pass
except (PermissionError, OSError) as e:
# 他のプロセスによるロックや権限の問題をログに残すが、処理は継続する
logging.warning(
f"Failed to delete blob at {full_path}: {e}. It will be handled by subsequent GC."
)
|
list_keys()
Yields relative paths of all .bin files, including subdirectories.
Example: 'hash.bin' or 'subdir/hash.bin'
Source code in src/beautyspot/storage.py
| def list_keys(self) -> Iterator[str]:
"""
Yields relative paths of all .bin files, including subdirectories.
Example: 'hash.bin' or 'subdir/hash.bin'
"""
if not self.base_dir.exists():
return
# rglob で再帰的に探索
for entry in self.base_dir.rglob("*.bin"):
if entry.is_file():
# base_dir からの相対パスを返す
yield str(entry.relative_to(self.base_dir).as_posix())
|
prune_empty_dirs()
Recursively remove empty directories under base_dir.
Also removes directories containing only system generated files (.DS_Store, etc).
Returns the count of removed directories.
Note
base_dir 自体は削除しません。base_dir が削除されると以降の
save() で FileNotFoundError が発生するためです。
Source code in src/beautyspot/storage.py
| def prune_empty_dirs(self) -> int:
"""
Recursively remove empty directories under base_dir.
Also removes directories containing only system generated files (.DS_Store, etc).
Returns the count of removed directories.
Note:
base_dir 自体は削除しません。base_dir が削除されると以降の
save() で FileNotFoundError が発生するためです。
"""
if not self.base_dir.exists():
return 0
IGNORED_FILES = {".DS_Store", "Thumbs.db", "desktop.ini"}
removed_count = 0
# os.walk(topdown=False) で深い階層から順に処理
for root, dirs, files in os.walk(self.base_dir, topdown=False):
path = Path(root)
# base_dir 自体は絶対に削除しない
if path == self.base_dir:
continue
existing_files = set(files)
# 無視リスト以外のファイルがある場合 -> 削除不可
if existing_files - IGNORED_FILES:
continue
# 無視リストにあるファイルしか残っていない場合、それらを消して空にする
for f in existing_files:
try:
(path / f).unlink()
except OSError:
pass
# ディレクトリ削除を試みる
try:
path.rmdir()
removed_count += 1
except OSError:
pass
return removed_count
|
save(key, data)
指定されたキーでデータをローカルディスクに保存し、ファイル名(location)を返す。
単純な open(..., 'wb') による上書きは行わず、tempfile.mkstemp で一意な
一時ファイルを作成して書き込んだ後、os.replace でアトミックにリネームする手法を採用している。
これは以下の2点を防ぐためである。
1. 並行実行時(複数スレッド/プロセス)に同じキャッシュキーに同時に書き込もうとした際のファイルの競合・破損。
2. 書き込み中のプロセス強制終了などによる、不完全で壊れたファイルの残留。
Parameters:
| Name |
Type |
Description |
Default |
key
|
str
|
|
required
|
data
|
ReadableBuffer
|
|
required
|
Returns:
| Name | Type |
Description |
str |
str
|
|
Source code in src/beautyspot/storage.py
| def save(self, key: str, data: ReadableBuffer) -> str:
"""
指定されたキーでデータをローカルディスクに保存し、ファイル名(location)を返す。
単純な `open(..., 'wb')` による上書きは行わず、`tempfile.mkstemp` で一意な
一時ファイルを作成して書き込んだ後、`os.replace` でアトミックにリネームする手法を採用している。
これは以下の2点を防ぐためである。
1. 並行実行時(複数スレッド/プロセス)に同じキャッシュキーに同時に書き込もうとした際のファイルの競合・破損。
2. 書き込み中のプロセス強制終了などによる、不完全で壊れたファイルの残留。
Args:
key (str): 保存するキャッシュキー
data (ReadableBuffer): 保存するバイトデータ
Returns:
str: 保存されたファイル名
"""
self._validate_key(key)
filename = f"{key}.bin"
filepath = self.base_dir / filename
# Atomic write: mkstemp generates a unique temp file to avoid collisions
# when multiple threads/processes write concurrently.
# flush + fsync ensures data reaches disk before rename,
# so a crash between write and rename never leaves a corrupt file.
fd, temp_path_str = tempfile.mkstemp(dir=self.base_dir, suffix=".spot_tmp")
try:
with os.fdopen(fd, "wb", closefd=True) as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
Path(temp_path_str).replace(filepath)
except BaseException:
try:
os.unlink(temp_path_str)
except OSError:
# PermissionError等で消せなかった場合は残留するが、後でGCが回収する
pass
raise
return filename
|
Maintenable
Bases: Protocol
Extended interface for maintenance tasks (GC).
Source code in src/beautyspot/storage.py
| @runtime_checkable
class Maintenable(Protocol):
"""
Extended interface for maintenance tasks (GC).
"""
def list_keys(self) -> Iterator[str]: ...
def get_mtime(self, location: str) -> float: ...
|
S3Storage
Bases: BlobStorageMaintenable
Source code in src/beautyspot/storage.py
| class S3Storage(BlobStorageMaintenable):
def __init__(
self,
s3_uri: str,
s3_opts: dict[str, Any] | None = None,
):
if not boto3:
raise ImportError("Run `pip install beautyspot[s3]` to use S3 storage.")
parts = s3_uri.replace("s3://", "").split("/", 1)
self.bucket_name = parts[0]
raw_prefix = parts[1].rstrip("/") if len(parts) > 1 else ""
self.prefix = raw_prefix if raw_prefix else "blobs"
opts = s3_opts or {}
self.s3 = boto3.client("s3", **opts)
@staticmethod
def _parse_s3_uri(location: str) -> tuple[str, str]:
"""Parse an s3:// URI into (bucket, key). Raises ValueError for invalid URIs."""
if not location.startswith("s3://"):
raise ValidationError(f"Expected an s3:// URI, got: {location!r}")
path = location[len("s3://") :]
parts = path.split("/", 1)
if len(parts) != 2 or not parts[0] or not parts[1]:
raise ValidationError(
f"Invalid S3 URI (expected s3://bucket/key): {location!r}"
)
return parts[0], parts[1]
def save(self, key: str, data: ReadableBuffer) -> str:
s3_key = f"{self.prefix}/{key}.bin"
buffer = io.BytesIO(data)
# upload_fileobj は大容量データ(>5GB)に対してマルチパートアップロードを
# 自動的に使用し、put_object の 5GB 上限を回避する。
self.s3.upload_fileobj(buffer, self.bucket_name, s3_key)
return f"s3://{self.bucket_name}/{s3_key}"
def load(self, location: str) -> bytes:
bucket, key = self._parse_s3_uri(location)
try:
resp = self.s3.get_object(Bucket=bucket, Key=key)
body = resp["Body"]
try:
return body.read()
finally:
body.close()
except ClientError as e:
raise CacheCorruptedError(f"S3 blob lost: {location}") from e
def delete(self, location: str) -> None:
bucket, key = self._parse_s3_uri(location)
try:
self.s3.delete_object(Bucket=bucket, Key=key)
except ClientError as e:
# S3 の delete_object は存在しないオブジェクトに対してもエラーを返さないため、
# ここに到達するのは権限エラーやネットワーク障害などの深刻なケース。
# 握り潰さずにログへ記録し、GC が後続で回収できるようにする。
logging.warning(
f"Failed to delete S3 object {location}: {e}. "
"It will be handled by subsequent GC."
)
def get_mtime(self, location: str) -> float:
bucket, key = self._parse_s3_uri(location)
try:
resp = self.s3.head_object(Bucket=bucket, Key=key)
# LastModified is a datetime object, convert to POSIX timestamp
return resp["LastModified"].timestamp()
except ClientError as e:
raise CacheCorruptedError(
f"S3 blob lost or inaccessible: {location}"
) from e
def list_keys(self) -> Iterator[str]:
"""Yields s3:// URIs for all objects in the prefix."""
paginator = self.s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix):
for obj in page.get("Contents", []):
yield f"s3://{self.bucket_name}/{obj['Key']}"
|
list_keys()
Yields s3:// URIs for all objects in the prefix.
Source code in src/beautyspot/storage.py
| def list_keys(self) -> Iterator[str]:
"""Yields s3:// URIs for all objects in the prefix."""
paginator = self.s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=self.bucket_name, Prefix=self.prefix):
for obj in page.get("Contents", []):
yield f"s3://{self.bucket_name}/{obj['Key']}"
|
StoragePolicyProtocol
Bases: Protocol
Protocol to determine if data should be saved as a blob (file/object storage)
or directly in the database based on the data content (usually size).
Source code in src/beautyspot/storage.py
| @runtime_checkable
class StoragePolicyProtocol(Protocol):
"""
Protocol to determine if data should be saved as a blob (file/object storage)
or directly in the database based on the data content (usually size).
"""
def should_save_as_blob(self, data: bytes) -> bool: ...
|
ThresholdStoragePolicy
dataclass
Bases: StoragePolicyProtocol
Policy that saves data as a blob if its size exceeds a configured threshold.
This is the recommended policy for automatic optimization.
Source code in src/beautyspot/storage.py
| @dataclass
class ThresholdStoragePolicy(StoragePolicyProtocol):
"""
Policy that saves data as a blob if its size exceeds a configured threshold.
This is the recommended policy for automatic optimization.
"""
threshold: int
def should_save_as_blob(self, data: bytes) -> bool:
return len(data) > self.threshold
|
WarningOnlyPolicy
dataclass
Bases: StoragePolicyProtocol
Policy for backward compatibility (v2.0 behavior).
Does not force blob storage, but logs a warning if size exceeds threshold.
Source code in src/beautyspot/storage.py
| @dataclass
class WarningOnlyPolicy(StoragePolicyProtocol):
"""
Policy for backward compatibility (v2.0 behavior).
Does not force blob storage, but logs a warning if size exceeds threshold.
"""
warning_threshold: int
# logger は比較・repr 対象外にする。
# dataclass の自動生成 __eq__ に logger インスタンスが混入するのを防ぐ。
logger: logging.Logger = field(
default_factory=lambda: logging.getLogger("beautyspot"),
compare=False,
repr=False,
)
def should_save_as_blob(self, data: bytes) -> bool:
if len(data) > self.warning_threshold:
self.logger.warning(
f"⚠️ Large data detected ({len(data)} bytes). "
f"Consider using `save_blob=True` or a stricter StoragePolicy."
)
return False
|
概要
beautyspot は、キャッシュ結果をデータベース内に保存するか、外部ストレージに保存するかを自動的、あるいは明示的に切り替えることができます。BlobStorageBase を継承することで、独自のストレージバックエンド(例:Google Cloud Storage, Azure Blob Storage)を実装することも可能です。
インターフェース: BlobStorageBase
すべてのストレージ実装が提供すべき基本メソッドです。
save(key, data): データを保存し、その場所を示す一意のロケーション(パスや URI)を返します。
load(location): 指定されたロケーションからバイナリデータを取得します。
delete(location): 指定されたロケーションのデータを削除します。
list_keys(): ストレージ内のすべてのロケーションを列挙します(ガベージコレクション用)。
標準実装
LocalStorage
ローカルのファイルシステムを使用する実装です。
- 自動生成: コンストラクタで指定されたディレクトリが存在しない場合は自動的に作成します。
- 安全性: パス・トラバーサル攻撃を防ぐため、キーにパス区切り文字が含まれていないかバリデーションを行います。
- アトミックな書き込み: 一時ファイルに書き込んだ後でリネーム(replace)を行うことで、書き込み中のクラッシュによるデータ破損を防ぎます。
S3Storage
AWS S3 または互換ストレージを使用する実装です。
- 依存関係: 使用には
boto3 が必要です。
- URI 形式:
s3://bucket-name/prefix 形式の URI をサポートします。
ファクトリ関数: create_storage
パスの形式(s3:// で始まるかどうか)を判別し、適切なストレージインスタンスを動的に生成します。MaintenanceService.from_path などで内部的に利用されています。
使用例
明示的な初期化
from beautyspot.storage import LocalStorage, S3Storage
# ローカルストレージ
local_store = LocalStorage("./.beautyspot/blobs")
# S3ストレージ
s3_store = S3Storage("s3://my-bucket/cache", s3_opts={"region_name": "ap-northeast-1"})
Spot への注入
from beautyspot import Spot
from beautyspot.storage import create_storage
# ファクトリ関数を使用してストレージを準備
storage = create_storage("./.beautyspot/blobs")
spot = Spot(
name="my_app",
db=db,
serializer=serializer,
storage=storage,
default_save_blob=True # デフォルトですべてのタスクを外部ストレージに保存
)
例外
CacheCorruptedError: コードの変更などにより、取得した Blob データのデシリアライズに失敗した場合に送出されます。