Skip to content

limiter

beautyspot.limiter モジュールは、外部サービスへの過度なアクセスを制御し、実行の安定性を担保するためのレートリミッターを提供します。

beautyspot.limiter

TokenBucket

Bases: LimiterProtocol

A smooth rate limiter based on the GCRA (Generic Cell Rate Algorithm).

Features: - No burst after long idle (Strict Pacing). - No start-up delay for the very first request. - Fails fast if a task cost exceeds the TPM limit. - Thread-safe and Async-compatible. - Uses monotonic clock.

Source code in src/beautyspot/limiter.py
class TokenBucket(LimiterProtocol):
    """
    A smooth rate limiter based on the GCRA (Generic Cell Rate Algorithm).

    Features:
    - No burst after long idle (Strict Pacing).
    - No start-up delay for the very first request.
    - Fails fast if a task cost exceeds the TPM limit.
    - Thread-safe and Async-compatible.
    - Uses monotonic clock.
    """

    def __init__(self, tokens_per_minute: int):
        if tokens_per_minute <= 0:
            raise ValueError("tokens_per_minute must be positive")

        # Rate: tokens per second
        self.rate = float(tokens_per_minute) / 60.0

        # Maximum allowed cost per task.
        # A single task consuming more than the TPM limit is physically impossible
        # to process within the rate window, so it should be forbidden.
        self.max_cost = int(tokens_per_minute)

        # Theoretical Arrival Time (TAT)
        self.tat = time.monotonic()
        self.lock = threading.Lock()

    def _consume_reservation(self, cost: int) -> float:
        """
        Calculates wait time and updates TAT.
        Returns seconds to wait.
        """
        if cost <= 0:
            return 0.0

        # Guard: Prevent requests that exceed the rate limit capacity entirely
        if cost > self.max_cost:
            raise ValueError(
                f"Requested cost ({cost}) exceeds the maximum limit of {self.max_cost} tokens per minute. "
                "This task cannot be processed within the defined rate limit."
            )

        increment = cost / self.rate

        with self.lock:
            now = time.monotonic()
            if now > self.tat:
                self.tat = now

            wait_time = self.tat - now
            if wait_time < 0:
                wait_time = 0.0

            self.tat += increment

            return wait_time

    def consume(self, cost: int):
        """
        Acquire tokens from the bucket, blocking if necessary.

        If the bucket does not have enough tokens, this method sleeps (blocks the thread)
        until the tokens become available based on the refill rate.

        Args:
            cost (int): Number of tokens to consume.

        Raises:
            ValueError: If `cost` exceeds the bucket's total capacity (`tpm`).
                        (i.e., the request is too expensive to ever be processed)
        """
        wait_time = self._consume_reservation(cost)
        if wait_time > 0:
            time.sleep(wait_time)

    async def consume_async(self, cost: int):
        """
        Acquire tokens asynchronously.

        If the bucket does not have enough tokens, this method awaits (non-blocking sleep)
        until the tokens become available.

        Args:
            cost (int): Number of tokens to consume.

        Raises:
            ValueError: If `cost` exceeds the bucket's total capacity.
        """
        wait_time = self._consume_reservation(cost)
        if wait_time > 0:
            await asyncio.sleep(wait_time)

consume(cost)

Acquire tokens from the bucket, blocking if necessary.

If the bucket does not have enough tokens, this method sleeps (blocks the thread) until the tokens become available based on the refill rate.

Parameters:

Name Type Description Default
cost int

Number of tokens to consume.

required

Raises:

Type Description
ValueError

If cost exceeds the bucket's total capacity (tpm). (i.e., the request is too expensive to ever be processed)

Source code in src/beautyspot/limiter.py
def consume(self, cost: int):
    """
    Acquire tokens from the bucket, blocking if necessary.

    If the bucket does not have enough tokens, this method sleeps (blocks the thread)
    until the tokens become available based on the refill rate.

    Args:
        cost (int): Number of tokens to consume.

    Raises:
        ValueError: If `cost` exceeds the bucket's total capacity (`tpm`).
                    (i.e., the request is too expensive to ever be processed)
    """
    wait_time = self._consume_reservation(cost)
    if wait_time > 0:
        time.sleep(wait_time)

consume_async(cost) async

Acquire tokens asynchronously.

If the bucket does not have enough tokens, this method awaits (non-blocking sleep) until the tokens become available.

Parameters:

Name Type Description Default
cost int

Number of tokens to consume.

required

Raises:

Type Description
ValueError

If cost exceeds the bucket's total capacity.

Source code in src/beautyspot/limiter.py
async def consume_async(self, cost: int):
    """
    Acquire tokens asynchronously.

    If the bucket does not have enough tokens, this method awaits (non-blocking sleep)
    until the tokens become available.

    Args:
        cost (int): Number of tokens to consume.

    Raises:
        ValueError: If `cost` exceeds the bucket's total capacity.
    """
    wait_time = self._consume_reservation(cost)
    if wait_time > 0:
        await asyncio.sleep(wait_time)

プロトコル定義

beautyspot は、レートリミットの具体的なアルゴリズムを抽象化するため、以下のプロトコルを定義しています。

LimiterProtocol

リミッターが実装すべき標準インターフェースです。

  • consume(cost: int) -> None: 指定したコスト分のトークンを消費します。不足している場合は、利用可能になるまで現在のスレッドをブロックします。
  • consume_async(cost: int) -> None: consume の非同期版です。非ブロッキングで待機します。

TokenBucket クラス

GCRA (Generic Cell Rate Algorithm) に基づいた、滑らかなレート制限を実現する実装です。

技術的特徴

  • 厳格なペース調整 (Strict Pacing): 長期間アイドル状態だった後にリクエストが集中しても、バースト(一気読み)を発生させず、設定されたレートに従って均等に処理を分散させます。
  • ハイブリッド対応: 内部で threading.Lock を使用しており、マルチスレッド環境と asyncio 環境の両方で安全に使用できます。
  • モノトニッククロックの採用: 時刻の計算には time.monotonic() を使用しているため、システムの時刻変更やうるう秒の影響を受けず、正確なインターバル計算が可能です。
  • フェイルファスト設計: 1つのタスクのコストが、設定された「1分あたりの最大トークン数 (TPM)」を超える場合、決して完了できないリクエストとして即座に ValueError を送出します。

基本設定

引数 説明
tokens_per_minute int 1分間に許容される合計コスト(TPM)。

内部アルゴリズム: GCRA

TokenBucket は、内部的に Theoretical Arrival Time (TAT) という概念を使用して、次のリクエストが許可される最短時刻を管理しています。

  1. リクエストが発生すると、現在の時刻と TAT を比較します。
  2. 現在時刻 < TAT の場合、その差分だけ待機(sleep)が必要と判断されます。
  3. リクエストのコストに応じて TAT を将来へと更新し、予約(Reservation)を行います。

例外

  • ValueError:
  • tokens_per_minute に 0 以下の値を指定した場合。
  • consume に渡された cost が、リミッターの最大容量(1分間の制限値)を超えている場合。