VWAP/TWAP執行アルゴリズムの設計と実装 — 大口注文の最適執行を目指す

自動売買証券APIPython

戦略概要

大口注文の執行では、マーケットインパクト(価格への影響)を最小限に抑えることが重要です。VWAP(Volume Weighted Average Price)とTWAP(Time Weighted Average Price)は、機関投資家が標準的に使用する執行アルゴリズムであり、個人投資家もkabuステーションAPIなどを活用して実装可能です。

執行アルゴリズムが解決する課題

課題従来の手動執行VWAP/TWAP執行
マーケットインパクト大口注文で価格が動く注文分割で影響を最小化
スリッページ不利な価格で約定ベンチマーク価格に近い執行
執行タイミング主観的判断に依存客観的ルールで自動化
検知リスク大口注文が見えるアイスバーグ注文で隠蔽
パフォーマンス測定基準がないVWAP/TWAPとの乖離で評価

2026年のグローバルアルゴリズム取引市場規模は約2.72億米ドルに達し、東証でのアルゴ取引比率は50%を超えています。個人投資家でもVWAP執行を活用することで、平均0.5-1%のコスト削減が期待できます。

VWAPの定義と計算方法

VWAPは取引期間中の平均価格を出来高で加重した指標です。

計算式

VWAP = Σ(価格 × 出来高) / Σ(出来高)

計算例

時間帯価格出来高価格×出来高
9:00-9:301,000円10,000株10,000,000
9:30-10:001,020円15,000株15,300,000
10:00-10:301,010円8,000株8,080,000
合計-33,000株33,380,000

VWAP = 33,380,000 / 33,000 = 1,011.5円

Pythonでの実装

import pandas as pd
from dataclasses import dataclass

@dataclass
class VWAPCalculator:
    """VWAP計算クラス"""

    def calculate(self, df: pd.DataFrame) -> float:
        """
        VWAPを計算

        Args:
            df: 'price'と'volume'列を含むDataFrame

        Returns:
            VWAP値
        """
        if df.empty or df["volume"].sum() == 0:
            return 0.0

        df["pv"] = df["price"] * df["volume"]
        vwap = df["pv"].sum() / df["volume"].sum()
        return round(vwap, 2)

    def calculate_bands(
        self,
        df: pd.DataFrame,
        num_std: float = 2.0
    ) -> tuple[float, float, float]:
        """
        VWAPバンドを計算(トレンド判断用)

        Returns:
            (VWAP, 上限バンド, 下限バンド)
        """
        vwap = self.calculate(df)

        # 出来高加重標準偏差
        df["pv"] = df["price"] * df["volume"]
        df["variance"] = df["volume"] * (df["price"] - vwap) ** 2
        weighted_std = (df["variance"].sum() / df["volume"].sum()) ** 0.5

        upper = vwap + num_std * weighted_std
        lower = vwap - num_std * weighted_std

        return vwap, round(upper, 2), round(lower, 2)

VWAP執行アルゴリズムの設計

VWAP執行アルゴリズムは、過去の出来高パターンに基づいて注文を分割し、VWAPに近い平均執行価格を目指します。

出来高プロファイル推定

from datetime import time
import numpy as np

def estimate_volume_profile(
    historical_data: pd.DataFrame,
    lookback_days: int = 20
) -> dict[time, float]:
    """
    過去データから時間帯別出来高比率を推定

    Args:
        historical_data: 過去の分足データ
        lookback_days: 参照日数

    Returns:
        時間帯別の出来高比率
    """
    # 時間帯別に集計
    historical_data["time_slot"] = historical_data["datetime"].dt.time

    # 日次出来高に対する比率を計算
    daily_totals = historical_data.groupby(
        historical_data["datetime"].dt.date
    )["volume"].transform("sum")

    historical_data["volume_ratio"] = historical_data["volume"] / daily_totals

    # 時間帯別の平均比率
    profile = historical_data.groupby("time_slot")["volume_ratio"].mean()

    return profile.to_dict()


# 東証の典型的な出来高プロファイル
TYPICAL_TOKYO_PROFILE = {
    # 前場寄付き付近: 出来高集中
    time(9, 0): 0.08,
    time(9, 30): 0.06,
    time(10, 0): 0.05,
    time(10, 30): 0.04,
    time(11, 0): 0.03,
    # 前場引け付近
    time(11, 15): 0.04,
    # 後場寄付き
    time(12, 30): 0.06,
    time(13, 0): 0.04,
    time(13, 30): 0.04,
    time(14, 0): 0.04,
    time(14, 30): 0.05,
    # 大引け付近: 出来高集中
    time(15, 0): 0.10,
    time(15, 20): 0.12,
}

VWAP執行エンジン

from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable

@dataclass
class VWAPOrder:
    """VWAP執行対象の注文"""
    symbol: str
    side: str              # "buy" or "sell"
    total_quantity: int    # 総注文数量
    start_time: datetime   # 執行開始時刻
    end_time: datetime     # 執行終了時刻

@dataclass
class SliceOrder:
    """分割された子注文"""
    symbol: str
    side: str
    quantity: int
    scheduled_time: datetime
    executed: bool = False
    fill_price: float | None = None
    fill_quantity: int = 0

@dataclass
class VWAPExecutionEngine:
    """VWAP執行エンジン"""

    volume_profile: dict[time, float]
    slice_interval_minutes: int = 5
    participation_rate: float = 0.10  # 市場出来高の最大参加率

    def generate_execution_schedule(
        self,
        order: VWAPOrder
    ) -> list[SliceOrder]:
        """
        出来高プロファイルに基づいて執行スケジュールを生成
        """
        slices = []
        current_time = order.start_time
        total_allocated = 0

        while current_time < order.end_time:
            time_slot = current_time.time()

            # 該当時間帯の出来高比率を取得
            volume_ratio = self._get_volume_ratio(time_slot)

            # この時間帯に割り当てる数量
            slice_qty = int(order.total_quantity * volume_ratio)

            # 最小注文単位で調整
            slice_qty = max(100, (slice_qty // 100) * 100)

            if slice_qty > 0 and total_allocated < order.total_quantity:
                remaining = order.total_quantity - total_allocated
                slice_qty = min(slice_qty, remaining)

                slices.append(SliceOrder(
                    symbol=order.symbol,
                    side=order.side,
                    quantity=slice_qty,
                    scheduled_time=current_time
                ))
                total_allocated += slice_qty

            current_time += timedelta(minutes=self.slice_interval_minutes)

        return slices

    def _get_volume_ratio(self, time_slot: time) -> float:
        """時間帯の出来高比率を取得(補間あり)"""
        if time_slot in self.volume_profile:
            return self.volume_profile[time_slot]

        # 最も近い時間帯の値を使用
        slots = sorted(self.volume_profile.keys())
        for i, slot in enumerate(slots):
            if slot > time_slot and i > 0:
                return self.volume_profile[slots[i-1]]

        return 0.05  # デフォルト値


# 使用例
engine = VWAPExecutionEngine(
    volume_profile=TYPICAL_TOKYO_PROFILE,
    slice_interval_minutes=5,
    participation_rate=0.10
)

order = VWAPOrder(
    symbol="7203",  # トヨタ自動車
    side="buy",
    total_quantity=10000,
    start_time=datetime(2026, 3, 14, 9, 0),
    end_time=datetime(2026, 3, 14, 15, 0)
)

schedule = engine.generate_execution_schedule(order)
for slice_order in schedule[:5]:
    print(f"{slice_order.scheduled_time}: {slice_order.quantity}株")

TWAPの設計

TWAP(Time Weighted Average Price)は時間均等に注文を分割する手法です。

VWAPとTWAPの比較

特性VWAPTWAP
分割基準出来高プロファイル時間均等
計算複雑度高(プロファイル推定必要)低(シンプル)
マーケットインパクト出来高多い時間に分散均等に分散
最適ケース出来高予測が正確な場合出来高予測が困難な場合
パターン検知リスク高(規則的すぎる)
機会コスト中(トレンドを逃す可能性)

TWAP実装

@dataclass
class TWAPExecutionEngine:
    """TWAP執行エンジン"""

    slice_interval_minutes: int = 5
    randomization: float = 0.2  # タイミングのランダム化係数

    def generate_execution_schedule(
        self,
        order: VWAPOrder
    ) -> list[SliceOrder]:
        """時間均等に分割した執行スケジュールを生成"""

        duration = order.end_time - order.start_time
        num_slices = int(duration.total_seconds() / 60 / self.slice_interval_minutes)

        if num_slices == 0:
            num_slices = 1

        base_quantity = order.total_quantity // num_slices
        remainder = order.total_quantity % num_slices

        slices = []
        current_time = order.start_time

        for i in range(num_slices):
            # 端数を最初のスライスに追加
            quantity = base_quantity + (remainder if i == 0 else 0)

            # タイミングをランダム化(検知回避)
            random_offset = int(
                self.slice_interval_minutes * 60 *
                self.randomization *
                (np.random.random() - 0.5)
            )
            scheduled = current_time + timedelta(seconds=random_offset)

            slices.append(SliceOrder(
                symbol=order.symbol,
                side=order.side,
                quantity=quantity,
                scheduled_time=scheduled
            ))

            current_time += timedelta(minutes=self.slice_interval_minutes)

        return slices

マーケットインパクトの最小化

インパクトモデル

def estimate_market_impact(
    order_size: int,
    avg_daily_volume: int,
    volatility: float,
    spread: float
) -> float:
    """
    マーケットインパクトを推定(Almgren-Chrissモデル簡略版)

    Args:
        order_size: 注文サイズ(株数)
        avg_daily_volume: 日次平均出来高
        volatility: 年率ボラティリティ
        spread: ビッド・アスクスプレッド(%)

    Returns:
        推定インパクト(%)
    """
    # 参加率
    participation = order_size / avg_daily_volume

    # 一時的インパクト(即時執行の場合)
    temporary_impact = spread / 2 + 0.1 * volatility * (participation ** 0.5)

    # 恒久的インパクト
    permanent_impact = 0.05 * volatility * participation

    total_impact = temporary_impact + permanent_impact

    return round(total_impact * 100, 3)  # bps


# 使用例
impact = estimate_market_impact(
    order_size=50000,
    avg_daily_volume=5000000,
    volatility=0.25,
    spread=0.001
)
print(f"推定マーケットインパクト: {impact}bps")

インパクト低減テクニック

  1. 注文分割(スライシング): 大口注文を小分けして執行
  2. 参加率制限: 市場出来高の10-20%以下に抑制
  3. ランダム化: 執行タイミングをランダム化してパターン検知を回避
  4. 流動性追随: 出来高が多い時間帯に多く執行

スライサー・アイスバーグ注文

アイスバーグ注文の概念

アイスバーグ注文は、総注文量を隠し、一部のみを市場に表示する手法です。表示部分が約定すると、自動的に次の部分が表示されます。

@dataclass
class IcebergOrder:
    """アイスバーグ注文"""
    symbol: str
    side: str
    total_quantity: int
    display_quantity: int  # 表示数量
    price: float

    def __post_init__(self):
        if self.display_quantity > self.total_quantity:
            raise ValueError("表示数量は総数量以下である必要があります")

        self.remaining = self.total_quantity
        self.fills = []

    def get_visible_order(self) -> dict | None:
        """現在の表示注文を取得"""
        if self.remaining <= 0:
            return None

        visible_qty = min(self.display_quantity, self.remaining)

        return {
            "symbol": self.symbol,
            "side": self.side,
            "quantity": visible_qty,
            "price": self.price,
            "order_type": "limit"
        }

    def on_fill(self, fill_quantity: int, fill_price: float):
        """約定処理"""
        self.remaining -= fill_quantity
        self.fills.append({
            "quantity": fill_quantity,
            "price": fill_price,
            "timestamp": datetime.now()
        })


# VWAP + アイスバーグの組み合わせ
def execute_vwap_iceberg(
    order: VWAPOrder,
    display_ratio: float = 0.2
) -> list[IcebergOrder]:
    """
    VWAPスケジュールの各スライスをアイスバーグ注文で執行
    """
    engine = VWAPExecutionEngine(volume_profile=TYPICAL_TOKYO_PROFILE)
    schedule = engine.generate_execution_schedule(order)

    iceberg_orders = []
    for slice_order in schedule:
        display_qty = max(100, int(slice_order.quantity * display_ratio))

        iceberg = IcebergOrder(
            symbol=slice_order.symbol,
            side=slice_order.side,
            total_quantity=slice_order.quantity,
            display_quantity=display_qty,
            price=0.0  # 成行の場合
        )
        iceberg_orders.append(iceberg)

    return iceberg_orders

kabuステーションAPIでの実装

kabuステーション(auカブコム証券)のREST APIを使用した実装例です。

API接続

import requests
from dataclasses import dataclass
import hashlib

@dataclass
class KabuStationClient:
    """kabuステーションAPIクライアント"""

    api_password: str
    base_url: str = "http://localhost:18080/kabusapi"
    token: str | None = None

    def authenticate(self) -> bool:
        """APIトークンを取得"""
        endpoint = f"{self.base_url}/token"

        response = requests.post(
            endpoint,
            json={"APIPassword": self.api_password}
        )

        if response.status_code == 200:
            self.token = response.json()["Token"]
            return True
        return False

    def _headers(self) -> dict:
        return {"X-API-KEY": self.token}

    def get_board(self, symbol: str, exchange: int = 1) -> dict:
        """板情報を取得"""
        endpoint = f"{self.base_url}/board/{symbol}@{exchange}"
        response = requests.get(endpoint, headers=self._headers())
        return response.json()

    def place_order(
        self,
        symbol: str,
        side: str,  # "buy" or "sell"
        quantity: int,
        order_type: str = "market",
        price: float | None = None
    ) -> dict:
        """注文を発注"""
        endpoint = f"{self.base_url}/sendorder"

        # 売買区分: 2=買, 1=売
        trade_type = "2" if side == "buy" else "1"

        # 執行条件
        front_order_type = 10 if order_type == "market" else 20

        payload = {
            "Password": self.api_password,
            "Symbol": symbol,
            "Exchange": 1,  # 東証
            "SecurityType": 1,  # 株式
            "Side": trade_type,
            "CashMargin": 1,  # 現物
            "DelivType": 2,  # お預り金
            "AccountType": 2,  # 特定
            "Qty": quantity,
            "FrontOrderType": front_order_type,
            "Price": price if price else 0,
            "ExpireDay": 0,  # 当日
        }

        response = requests.post(
            endpoint,
            json=payload,
            headers=self._headers()
        )
        return response.json()

VWAP執行システム

import asyncio
from datetime import datetime, time

class KabuVWAPExecutor:
    """kabuステーションを使用したVWAP執行"""

    def __init__(self, client: KabuStationClient):
        self.client = client
        self.vwap_engine = VWAPExecutionEngine(
            volume_profile=TYPICAL_TOKYO_PROFILE
        )
        self.executed_orders = []

    async def execute_vwap_order(self, order: VWAPOrder) -> dict:
        """VWAP注文を執行"""

        # 執行スケジュールを生成
        schedule = self.vwap_engine.generate_execution_schedule(order)

        results = {
            "total_quantity": order.total_quantity,
            "executed_quantity": 0,
            "average_price": 0.0,
            "slices": []
        }

        total_pv = 0.0

        for slice_order in schedule:
            # 予定時刻まで待機
            await self._wait_until(slice_order.scheduled_time)

            # 板情報を確認
            board = self.client.get_board(order.symbol)
            current_price = board.get("CurrentPrice", 0)

            # 注文発注
            response = self.client.place_order(
                symbol=order.symbol,
                side=order.side,
                quantity=slice_order.quantity,
                order_type="market"
            )

            if response.get("Result") == 0:
                # 約定を記録
                fill_price = current_price  # 簡略化
                results["executed_quantity"] += slice_order.quantity
                total_pv += slice_order.quantity * fill_price

                results["slices"].append({
                    "time": slice_order.scheduled_time.isoformat(),
                    "quantity": slice_order.quantity,
                    "price": fill_price
                })

        if results["executed_quantity"] > 0:
            results["average_price"] = total_pv / results["executed_quantity"]

        return results

    async def _wait_until(self, target_time: datetime):
        """指定時刻まで待機"""
        now = datetime.now()
        if target_time > now:
            wait_seconds = (target_time - now).total_seconds()
            await asyncio.sleep(wait_seconds)

パフォーマンス評価

執行品質の測定

@dataclass
class ExecutionQuality:
    """執行品質評価"""

    @staticmethod
    def calculate_vwap_slippage(
        executed_price: float,
        vwap: float
    ) -> float:
        """
        VWAPスリッページを計算

        正の値: VWAPより不利に執行
        負の値: VWAPより有利に執行
        """
        return (executed_price - vwap) / vwap * 10000  # bps

    @staticmethod
    def calculate_implementation_shortfall(
        decision_price: float,
        executed_price: float,
        side: str
    ) -> float:
        """
        Implementation Shortfallを計算

        投資判断時点の価格と実際の執行価格の差
        """
        if side == "buy":
            return (executed_price - decision_price) / decision_price * 10000
        else:
            return (decision_price - executed_price) / decision_price * 10000


# 評価例
quality = ExecutionQuality()

# VWAPスリッページ: 買い注文が1,015円で執行、VWAPは1,012円
slippage = quality.calculate_vwap_slippage(
    executed_price=1015,
    vwap=1012
)
print(f"VWAPスリッページ: {slippage:.1f}bps")  # 約30bps

リスクと注意点

主要リスク

リスク説明対策
出来高予測誤差プロファイルが実際と乖離適応型アルゴリズム、リアルタイム調整
機会コストトレンド相場で不利IS(Implementation Shortfall)との併用
パターン検知HFTに狙われるランダム化、アイスバーグ注文
システム障害API障害で執行失敗フェイルセーフ機構、手動介入準備
流動性リスク小型株で出来高不足参加率上限設定、執行期間延長

個人投資家向けの現実的な適用

def is_vwap_suitable(
    order_size: int,
    avg_daily_volume: int,
    price: float
) -> dict:
    """
    VWAP執行が適切かを判定

    Returns:
        適切性評価と推奨事項
    """
    participation_rate = order_size / avg_daily_volume
    order_value = order_size * price

    result = {
        "suitable": False,
        "reason": "",
        "recommendation": ""
    }

    # 注文金額が小さすぎる場合
    if order_value < 500000:  # 50万円未満
        result["reason"] = "注文金額が小さいため、分割のメリットが限定的"
        result["recommendation"] = "成行または指値で一括執行"
        return result

    # 参加率が高すぎる場合
    if participation_rate > 0.2:  # 20%超
        result["reason"] = "注文サイズが出来高に対して大きすぎる"
        result["recommendation"] = "複数日に分けて執行、または執行期間を延長"
        return result

    # 流動性が低い場合
    if avg_daily_volume < 100000:  # 10万株未満
        result["reason"] = "流動性が低く、VWAP執行のリスクが高い"
        result["recommendation"] = "指値注文で慎重に執行"
        return result

    result["suitable"] = True
    result["recommendation"] = "VWAP執行が効果的"

    return result

まとめ

VWAP/TWAP執行アルゴリズムは、大口注文のマーケットインパクトを最小化する標準的な手法です。

実装のポイント:

  1. 出来高プロファイル: 過去データから時間帯別の出来高分布を推定
  2. 参加率制限: 市場出来高の10-20%以下に抑制
  3. ランダム化: 執行パターンの検知を回避
  4. アイスバーグ注文: 注文総量を隠蔽
  5. パフォーマンス評価: VWAPスリッページで執行品質を測定

個人投資家にとっての実用性:

  • kabuステーションAPIで実装可能
  • 50万円以上の注文で効果を発揮
  • 平均0.5-1%のコスト削減が期待
  • 自動化により執行の一貫性が向上

免責事項

本記事は情報提供を目的としており、特定の金融商品の売買を推奨するものではありません。投資判断は自己責任で行ってください。アルゴリズム取引にはシステム障害や予期せぬ市場変動によるリスクが伴います。実運用前に十分なバックテストとペーパートレーディングを行うことを推奨します。