VWAP/TWAP執行アルゴリズムの設計と実装 — 大口注文の最適執行を目指す
戦略概要
大口注文の執行では、マーケットインパクト(価格への影響)を最小限に抑えることが重要です。VWAP(Volume Weighted Average Price)とTWAP(Time Weighted Average Price)は、機関投資家が標準的に使用する執行アルゴリズムであり、個人投資家もkabuステーションAPIなどを活用して実装可能です。
執行アルゴリズムが解決する課題
| 課題 | 従来の手動執行 | VWAP/TWAP執行 |
|---|---|---|
| マーケットインパクト | 大口注文で価格が動く | 注文分割で影響を最小化 |
| スリッページ | 不利な価格で約定 | ベンチマーク価格に近い執行 |
| 執行タイミング | 主観的判断に依存 | 客観的ルールで自動化 |
| 検知リスク | 大口注文が見える | アイスバーグ注文で隠蔽 |
| パフォーマンス測定 | 基準がない | VWAP/TWAPとの乖離で評価 |
2026年のグローバルアルゴリズム取引市場規模は約2.72億米ドルに達し、東証でのアルゴ取引比率は50%を超えています。個人投資家でもVWAP執行を活用することで、平均0.5-1%のコスト削減が期待できます。
VWAPの定義と計算方法
VWAPは取引期間中の平均価格を出来高で加重した指標です。
計算式
VWAP = Σ(価格 × 出来高) / Σ(出来高)
計算例
| 時間帯 | 価格 | 出来高 | 価格×出来高 |
|---|---|---|---|
| 9:00-9:30 | 1,000円 | 10,000株 | 10,000,000 |
| 9:30-10:00 | 1,020円 | 15,000株 | 15,300,000 |
| 10:00-10:30 | 1,010円 | 8,000株 | 8,080,000 |
| 合計 | - | 33,000株 | 33,380,000 |
VWAP = 33,380,000 / 33,000 = 1,011.5円
Pythonでの実装
import pandas as pd
from dataclasses import dataclass
@dataclass
class VWAPCalculator:
"""VWAP計算クラス"""
def calculate(self, df: pd.DataFrame) -> float:
"""
VWAPを計算
Args:
df: 'price'と'volume'列を含むDataFrame
Returns:
VWAP値
"""
if df.empty or df["volume"].sum() == 0:
return 0.0
df["pv"] = df["price"] * df["volume"]
vwap = df["pv"].sum() / df["volume"].sum()
return round(vwap, 2)
def calculate_bands(
self,
df: pd.DataFrame,
num_std: float = 2.0
) -> tuple[float, float, float]:
"""
VWAPバンドを計算(トレンド判断用)
Returns:
(VWAP, 上限バンド, 下限バンド)
"""
vwap = self.calculate(df)
# 出来高加重標準偏差
df["pv"] = df["price"] * df["volume"]
df["variance"] = df["volume"] * (df["price"] - vwap) ** 2
weighted_std = (df["variance"].sum() / df["volume"].sum()) ** 0.5
upper = vwap + num_std * weighted_std
lower = vwap - num_std * weighted_std
return vwap, round(upper, 2), round(lower, 2)
VWAP執行アルゴリズムの設計
VWAP執行アルゴリズムは、過去の出来高パターンに基づいて注文を分割し、VWAPに近い平均執行価格を目指します。
出来高プロファイル推定
from datetime import time
import numpy as np
def estimate_volume_profile(
historical_data: pd.DataFrame,
lookback_days: int = 20
) -> dict[time, float]:
"""
過去データから時間帯別出来高比率を推定
Args:
historical_data: 過去の分足データ
lookback_days: 参照日数
Returns:
時間帯別の出来高比率
"""
# 時間帯別に集計
historical_data["time_slot"] = historical_data["datetime"].dt.time
# 日次出来高に対する比率を計算
daily_totals = historical_data.groupby(
historical_data["datetime"].dt.date
)["volume"].transform("sum")
historical_data["volume_ratio"] = historical_data["volume"] / daily_totals
# 時間帯別の平均比率
profile = historical_data.groupby("time_slot")["volume_ratio"].mean()
return profile.to_dict()
# 東証の典型的な出来高プロファイル
TYPICAL_TOKYO_PROFILE = {
# 前場寄付き付近: 出来高集中
time(9, 0): 0.08,
time(9, 30): 0.06,
time(10, 0): 0.05,
time(10, 30): 0.04,
time(11, 0): 0.03,
# 前場引け付近
time(11, 15): 0.04,
# 後場寄付き
time(12, 30): 0.06,
time(13, 0): 0.04,
time(13, 30): 0.04,
time(14, 0): 0.04,
time(14, 30): 0.05,
# 大引け付近: 出来高集中
time(15, 0): 0.10,
time(15, 20): 0.12,
}
VWAP執行エンジン
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable
@dataclass
class VWAPOrder:
"""VWAP執行対象の注文"""
symbol: str
side: str # "buy" or "sell"
total_quantity: int # 総注文数量
start_time: datetime # 執行開始時刻
end_time: datetime # 執行終了時刻
@dataclass
class SliceOrder:
"""分割された子注文"""
symbol: str
side: str
quantity: int
scheduled_time: datetime
executed: bool = False
fill_price: float | None = None
fill_quantity: int = 0
@dataclass
class VWAPExecutionEngine:
"""VWAP執行エンジン"""
volume_profile: dict[time, float]
slice_interval_minutes: int = 5
participation_rate: float = 0.10 # 市場出来高の最大参加率
def generate_execution_schedule(
self,
order: VWAPOrder
) -> list[SliceOrder]:
"""
出来高プロファイルに基づいて執行スケジュールを生成
"""
slices = []
current_time = order.start_time
total_allocated = 0
while current_time < order.end_time:
time_slot = current_time.time()
# 該当時間帯の出来高比率を取得
volume_ratio = self._get_volume_ratio(time_slot)
# この時間帯に割り当てる数量
slice_qty = int(order.total_quantity * volume_ratio)
# 最小注文単位で調整
slice_qty = max(100, (slice_qty // 100) * 100)
if slice_qty > 0 and total_allocated < order.total_quantity:
remaining = order.total_quantity - total_allocated
slice_qty = min(slice_qty, remaining)
slices.append(SliceOrder(
symbol=order.symbol,
side=order.side,
quantity=slice_qty,
scheduled_time=current_time
))
total_allocated += slice_qty
current_time += timedelta(minutes=self.slice_interval_minutes)
return slices
def _get_volume_ratio(self, time_slot: time) -> float:
"""時間帯の出来高比率を取得(補間あり)"""
if time_slot in self.volume_profile:
return self.volume_profile[time_slot]
# 最も近い時間帯の値を使用
slots = sorted(self.volume_profile.keys())
for i, slot in enumerate(slots):
if slot > time_slot and i > 0:
return self.volume_profile[slots[i-1]]
return 0.05 # デフォルト値
# 使用例
engine = VWAPExecutionEngine(
volume_profile=TYPICAL_TOKYO_PROFILE,
slice_interval_minutes=5,
participation_rate=0.10
)
order = VWAPOrder(
symbol="7203", # トヨタ自動車
side="buy",
total_quantity=10000,
start_time=datetime(2026, 3, 14, 9, 0),
end_time=datetime(2026, 3, 14, 15, 0)
)
schedule = engine.generate_execution_schedule(order)
for slice_order in schedule[:5]:
print(f"{slice_order.scheduled_time}: {slice_order.quantity}株")
TWAPの設計
TWAP(Time Weighted Average Price)は時間均等に注文を分割する手法です。
VWAPとTWAPの比較
| 特性 | VWAP | TWAP |
|---|---|---|
| 分割基準 | 出来高プロファイル | 時間均等 |
| 計算複雑度 | 高(プロファイル推定必要) | 低(シンプル) |
| マーケットインパクト | 出来高多い時間に分散 | 均等に分散 |
| 最適ケース | 出来高予測が正確な場合 | 出来高予測が困難な場合 |
| パターン検知リスク | 中 | 高(規則的すぎる) |
| 機会コスト | 低 | 中(トレンドを逃す可能性) |
TWAP実装
@dataclass
class TWAPExecutionEngine:
"""TWAP執行エンジン"""
slice_interval_minutes: int = 5
randomization: float = 0.2 # タイミングのランダム化係数
def generate_execution_schedule(
self,
order: VWAPOrder
) -> list[SliceOrder]:
"""時間均等に分割した執行スケジュールを生成"""
duration = order.end_time - order.start_time
num_slices = int(duration.total_seconds() / 60 / self.slice_interval_minutes)
if num_slices == 0:
num_slices = 1
base_quantity = order.total_quantity // num_slices
remainder = order.total_quantity % num_slices
slices = []
current_time = order.start_time
for i in range(num_slices):
# 端数を最初のスライスに追加
quantity = base_quantity + (remainder if i == 0 else 0)
# タイミングをランダム化(検知回避)
random_offset = int(
self.slice_interval_minutes * 60 *
self.randomization *
(np.random.random() - 0.5)
)
scheduled = current_time + timedelta(seconds=random_offset)
slices.append(SliceOrder(
symbol=order.symbol,
side=order.side,
quantity=quantity,
scheduled_time=scheduled
))
current_time += timedelta(minutes=self.slice_interval_minutes)
return slices
マーケットインパクトの最小化
インパクトモデル
def estimate_market_impact(
order_size: int,
avg_daily_volume: int,
volatility: float,
spread: float
) -> float:
"""
マーケットインパクトを推定(Almgren-Chrissモデル簡略版)
Args:
order_size: 注文サイズ(株数)
avg_daily_volume: 日次平均出来高
volatility: 年率ボラティリティ
spread: ビッド・アスクスプレッド(%)
Returns:
推定インパクト(%)
"""
# 参加率
participation = order_size / avg_daily_volume
# 一時的インパクト(即時執行の場合)
temporary_impact = spread / 2 + 0.1 * volatility * (participation ** 0.5)
# 恒久的インパクト
permanent_impact = 0.05 * volatility * participation
total_impact = temporary_impact + permanent_impact
return round(total_impact * 100, 3) # bps
# 使用例
impact = estimate_market_impact(
order_size=50000,
avg_daily_volume=5000000,
volatility=0.25,
spread=0.001
)
print(f"推定マーケットインパクト: {impact}bps")
インパクト低減テクニック
- 注文分割(スライシング): 大口注文を小分けして執行
- 参加率制限: 市場出来高の10-20%以下に抑制
- ランダム化: 執行タイミングをランダム化してパターン検知を回避
- 流動性追随: 出来高が多い時間帯に多く執行
スライサー・アイスバーグ注文
アイスバーグ注文の概念
アイスバーグ注文は、総注文量を隠し、一部のみを市場に表示する手法です。表示部分が約定すると、自動的に次の部分が表示されます。
@dataclass
class IcebergOrder:
"""アイスバーグ注文"""
symbol: str
side: str
total_quantity: int
display_quantity: int # 表示数量
price: float
def __post_init__(self):
if self.display_quantity > self.total_quantity:
raise ValueError("表示数量は総数量以下である必要があります")
self.remaining = self.total_quantity
self.fills = []
def get_visible_order(self) -> dict | None:
"""現在の表示注文を取得"""
if self.remaining <= 0:
return None
visible_qty = min(self.display_quantity, self.remaining)
return {
"symbol": self.symbol,
"side": self.side,
"quantity": visible_qty,
"price": self.price,
"order_type": "limit"
}
def on_fill(self, fill_quantity: int, fill_price: float):
"""約定処理"""
self.remaining -= fill_quantity
self.fills.append({
"quantity": fill_quantity,
"price": fill_price,
"timestamp": datetime.now()
})
# VWAP + アイスバーグの組み合わせ
def execute_vwap_iceberg(
order: VWAPOrder,
display_ratio: float = 0.2
) -> list[IcebergOrder]:
"""
VWAPスケジュールの各スライスをアイスバーグ注文で執行
"""
engine = VWAPExecutionEngine(volume_profile=TYPICAL_TOKYO_PROFILE)
schedule = engine.generate_execution_schedule(order)
iceberg_orders = []
for slice_order in schedule:
display_qty = max(100, int(slice_order.quantity * display_ratio))
iceberg = IcebergOrder(
symbol=slice_order.symbol,
side=slice_order.side,
total_quantity=slice_order.quantity,
display_quantity=display_qty,
price=0.0 # 成行の場合
)
iceberg_orders.append(iceberg)
return iceberg_orders
kabuステーションAPIでの実装
kabuステーション(auカブコム証券)のREST APIを使用した実装例です。
API接続
import requests
from dataclasses import dataclass
import hashlib
@dataclass
class KabuStationClient:
"""kabuステーションAPIクライアント"""
api_password: str
base_url: str = "http://localhost:18080/kabusapi"
token: str | None = None
def authenticate(self) -> bool:
"""APIトークンを取得"""
endpoint = f"{self.base_url}/token"
response = requests.post(
endpoint,
json={"APIPassword": self.api_password}
)
if response.status_code == 200:
self.token = response.json()["Token"]
return True
return False
def _headers(self) -> dict:
return {"X-API-KEY": self.token}
def get_board(self, symbol: str, exchange: int = 1) -> dict:
"""板情報を取得"""
endpoint = f"{self.base_url}/board/{symbol}@{exchange}"
response = requests.get(endpoint, headers=self._headers())
return response.json()
def place_order(
self,
symbol: str,
side: str, # "buy" or "sell"
quantity: int,
order_type: str = "market",
price: float | None = None
) -> dict:
"""注文を発注"""
endpoint = f"{self.base_url}/sendorder"
# 売買区分: 2=買, 1=売
trade_type = "2" if side == "buy" else "1"
# 執行条件
front_order_type = 10 if order_type == "market" else 20
payload = {
"Password": self.api_password,
"Symbol": symbol,
"Exchange": 1, # 東証
"SecurityType": 1, # 株式
"Side": trade_type,
"CashMargin": 1, # 現物
"DelivType": 2, # お預り金
"AccountType": 2, # 特定
"Qty": quantity,
"FrontOrderType": front_order_type,
"Price": price if price else 0,
"ExpireDay": 0, # 当日
}
response = requests.post(
endpoint,
json=payload,
headers=self._headers()
)
return response.json()
VWAP執行システム
import asyncio
from datetime import datetime, time
class KabuVWAPExecutor:
"""kabuステーションを使用したVWAP執行"""
def __init__(self, client: KabuStationClient):
self.client = client
self.vwap_engine = VWAPExecutionEngine(
volume_profile=TYPICAL_TOKYO_PROFILE
)
self.executed_orders = []
async def execute_vwap_order(self, order: VWAPOrder) -> dict:
"""VWAP注文を執行"""
# 執行スケジュールを生成
schedule = self.vwap_engine.generate_execution_schedule(order)
results = {
"total_quantity": order.total_quantity,
"executed_quantity": 0,
"average_price": 0.0,
"slices": []
}
total_pv = 0.0
for slice_order in schedule:
# 予定時刻まで待機
await self._wait_until(slice_order.scheduled_time)
# 板情報を確認
board = self.client.get_board(order.symbol)
current_price = board.get("CurrentPrice", 0)
# 注文発注
response = self.client.place_order(
symbol=order.symbol,
side=order.side,
quantity=slice_order.quantity,
order_type="market"
)
if response.get("Result") == 0:
# 約定を記録
fill_price = current_price # 簡略化
results["executed_quantity"] += slice_order.quantity
total_pv += slice_order.quantity * fill_price
results["slices"].append({
"time": slice_order.scheduled_time.isoformat(),
"quantity": slice_order.quantity,
"price": fill_price
})
if results["executed_quantity"] > 0:
results["average_price"] = total_pv / results["executed_quantity"]
return results
async def _wait_until(self, target_time: datetime):
"""指定時刻まで待機"""
now = datetime.now()
if target_time > now:
wait_seconds = (target_time - now).total_seconds()
await asyncio.sleep(wait_seconds)
パフォーマンス評価
執行品質の測定
@dataclass
class ExecutionQuality:
"""執行品質評価"""
@staticmethod
def calculate_vwap_slippage(
executed_price: float,
vwap: float
) -> float:
"""
VWAPスリッページを計算
正の値: VWAPより不利に執行
負の値: VWAPより有利に執行
"""
return (executed_price - vwap) / vwap * 10000 # bps
@staticmethod
def calculate_implementation_shortfall(
decision_price: float,
executed_price: float,
side: str
) -> float:
"""
Implementation Shortfallを計算
投資判断時点の価格と実際の執行価格の差
"""
if side == "buy":
return (executed_price - decision_price) / decision_price * 10000
else:
return (decision_price - executed_price) / decision_price * 10000
# 評価例
quality = ExecutionQuality()
# VWAPスリッページ: 買い注文が1,015円で執行、VWAPは1,012円
slippage = quality.calculate_vwap_slippage(
executed_price=1015,
vwap=1012
)
print(f"VWAPスリッページ: {slippage:.1f}bps") # 約30bps
リスクと注意点
主要リスク
| リスク | 説明 | 対策 |
|---|---|---|
| 出来高予測誤差 | プロファイルが実際と乖離 | 適応型アルゴリズム、リアルタイム調整 |
| 機会コスト | トレンド相場で不利 | IS(Implementation Shortfall)との併用 |
| パターン検知 | HFTに狙われる | ランダム化、アイスバーグ注文 |
| システム障害 | API障害で執行失敗 | フェイルセーフ機構、手動介入準備 |
| 流動性リスク | 小型株で出来高不足 | 参加率上限設定、執行期間延長 |
個人投資家向けの現実的な適用
def is_vwap_suitable(
order_size: int,
avg_daily_volume: int,
price: float
) -> dict:
"""
VWAP執行が適切かを判定
Returns:
適切性評価と推奨事項
"""
participation_rate = order_size / avg_daily_volume
order_value = order_size * price
result = {
"suitable": False,
"reason": "",
"recommendation": ""
}
# 注文金額が小さすぎる場合
if order_value < 500000: # 50万円未満
result["reason"] = "注文金額が小さいため、分割のメリットが限定的"
result["recommendation"] = "成行または指値で一括執行"
return result
# 参加率が高すぎる場合
if participation_rate > 0.2: # 20%超
result["reason"] = "注文サイズが出来高に対して大きすぎる"
result["recommendation"] = "複数日に分けて執行、または執行期間を延長"
return result
# 流動性が低い場合
if avg_daily_volume < 100000: # 10万株未満
result["reason"] = "流動性が低く、VWAP執行のリスクが高い"
result["recommendation"] = "指値注文で慎重に執行"
return result
result["suitable"] = True
result["recommendation"] = "VWAP執行が効果的"
return result
まとめ
VWAP/TWAP執行アルゴリズムは、大口注文のマーケットインパクトを最小化する標準的な手法です。
実装のポイント:
- 出来高プロファイル: 過去データから時間帯別の出来高分布を推定
- 参加率制限: 市場出来高の10-20%以下に抑制
- ランダム化: 執行パターンの検知を回避
- アイスバーグ注文: 注文総量を隠蔽
- パフォーマンス評価: VWAPスリッページで執行品質を測定
個人投資家にとっての実用性:
- kabuステーションAPIで実装可能
- 50万円以上の注文で効果を発揮
- 平均0.5-1%のコスト削減が期待
- 自動化により執行の一貫性が向上
免責事項
本記事は情報提供を目的としており、特定の金融商品の売買を推奨するものではありません。投資判断は自己責任で行ってください。アルゴリズム取引にはシステム障害や予期せぬ市場変動によるリスクが伴います。実運用前に十分なバックテストとペーパートレーディングを行うことを推奨します。