仮想通貨Bot開発の実践 — ccxt・Pythonで作るCEX自動売買システム
中級〜上級
アルゴリズム取引自動売買取引所API
はじめに
仮想通貨市場は24時間365日稼働しており、人間が常時監視することは不可能です。自動売買Bot(トレーディングBot)は、事前に定義したルールに基づいて自動的に売買を執行するプログラムであり、仮想通貨トレーダーにとって必須のツールとなっています。
本記事では、Pythonとccxtライブラリを使って、実際に動作するCEX(中央集権型取引所)自動売買Botを構築する手順を解説します。
本記事で構築するシステムの全体像
| コンポーネント | 技術 | 役割 |
|---|---|---|
| 取引所接続 | ccxt | 100以上の取引所に統一APIでアクセス |
| データ取得 | ccxt + WebSocket | OHLCV、板情報、ティッカーのリアルタイム取得 |
| 戦略エンジン | pandas + numpy | テクニカル指標の計算とシグナル生成 |
| 注文執行 | ccxt | 成行・指値・逆指値注文の発注 |
| リスク管理 | カスタム実装 | ポジションサイジング、最大ドローダウン制御 |
| バックテスト | vectorbt / Backtrader | 過去データでの戦略検証 |
| 監視 | Grafana + Discord | パフォーマンス監視と通知 |
| インフラ | AWS/GCP | 24/365稼働の本番環境 |
ccxtライブラリの基礎
ccxtとは
ccxt(CryptoCurrency eXchange Trading Library)は、100以上の取引所に統一APIでアクセスできるオープンソースライブラリです。取引所ごとに異なるAPIの差異を吸収し、同一のコードで複数の取引所を操作できます。
インストールと初期設定
# インストール
pip install ccxt pandas numpy ta
# 基本的な取引所接続
import ccxt
# パブリックAPI(認証不要)
exchange = ccxt.binance()
# プライベートAPI(認証必要)
exchange = ccxt.binance({
"apiKey": "YOUR_API_KEY",
"secret": "YOUR_SECRET_KEY",
"options": {
"defaultType": "spot", # spot / future / margin
},
"enableRateLimit": True, # レート制限を自動管理
})
# テストネット接続(推奨:開発時)
exchange.set_sandbox_mode(True)
対応取引所の比較
| 取引所 | API安定性 | WebSocket | テストネット | 手数料(Maker/Taker) | 日本居住者 |
|---|---|---|---|---|---|
| Binance | 高 | 対応 | あり | 0.1% / 0.1% | 利用不可 |
| Bybit | 高 | 対応 | あり | 0.1% / 0.1% | 利用可 |
| OKX | 高 | 対応 | あり | 0.08% / 0.1% | 利用可 |
| bitFlyer | 中 | 対応 | なし | 0.01-0.15% | 利用可 |
| GMOコイン | 中 | 対応 | なし | -0.01% / 0.05% | 利用可 |
認証とセキュリティ
import os
from dotenv import load_dotenv
load_dotenv()
exchange = ccxt.bybit({
"apiKey": os.getenv("BYBIT_API_KEY"),
"secret": os.getenv("BYBIT_SECRET"),
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
# IP制限を取引所のAPI設定で必ず有効化すること
# 出金権限は付与しないこと(取引権限のみ)
# 接続テスト
balance = exchange.fetch_balance()
print(f"USDT残高: {balance['USDT']['free']}")
マーケットデータの取得
OHLCV(ローソク足)データ
import pandas as pd
def fetch_ohlcv(
exchange: ccxt.Exchange,
symbol: str,
timeframe: str = "1h",
limit: int = 500,
) -> pd.DataFrame:
"""OHLCVデータを取得してDataFrameに変換"""
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(
ohlcv,
columns=["timestamp", "open", "high", "low", "close", "volume"],
)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
df.set_index("timestamp", inplace=True)
return df
# 使用例
df = fetch_ohlcv(exchange, "BTC/USDT", "1h", 500)
print(df.tail())
オーダーブック(板情報)
def fetch_orderbook_summary(
exchange: ccxt.Exchange,
symbol: str,
depth: int = 20,
) -> dict:
"""板情報のサマリーを取得"""
orderbook = exchange.fetch_order_book(symbol, depth)
bid_volume = sum([order[1] for order in orderbook["bids"]])
ask_volume = sum([order[1] for order in orderbook["asks"]])
spread = orderbook["asks"][0][0] - orderbook["bids"][0][0]
mid_price = (orderbook["asks"][0][0] + orderbook["bids"][0][0]) / 2
return {
"best_bid": orderbook["bids"][0][0],
"best_ask": orderbook["asks"][0][0],
"spread": spread,
"spread_pct": (spread / mid_price) * 100,
"bid_volume": bid_volume,
"ask_volume": ask_volume,
"imbalance": bid_volume / (bid_volume + ask_volume),
}
summary = fetch_orderbook_summary(exchange, "BTC/USDT")
print(f"スプレッド: {summary['spread_pct']:.4f}%")
print(f"板の偏り(買い比率): {summary['imbalance']:.2f}")
ティッカー情報
def fetch_ticker_info(exchange: ccxt.Exchange, symbol: str) -> dict:
"""ティッカー情報を取得"""
ticker = exchange.fetch_ticker(symbol)
return {
"last": ticker["last"],
"change_24h": ticker["percentage"],
"volume_24h": ticker["quoteVolume"],
"high_24h": ticker["high"],
"low_24h": ticker["low"],
"vwap": ticker.get("vwap"),
}
注文執行
注文タイプの比較
| 注文タイプ | 用途 | メリット | デメリット |
|---|---|---|---|
| 成行注文 | 即時約定が必要な場合 | 確実に約定 | スリッページリスク |
| 指値注文 | 特定価格での約定 | 価格保証 | 約定しない可能性 |
| 逆指値注文 | ストップロス | 損失制限 | ギャップで滑る可能性 |
| OCO注文 | 利確+損切り同時 | 自動管理 | 対応取引所が限定的 |
注文関数の実装
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class OrderManager:
def __init__(self, exchange: ccxt.Exchange):
self.exchange = exchange
def market_buy(self, symbol: str, amount: float) -> dict:
"""成行買い注文"""
try:
order = self.exchange.create_market_buy_order(symbol, amount)
logger.info(
f"成行買い: {symbol} {amount} @ {order['average']}"
)
return order
except ccxt.InsufficientFunds as e:
logger.error(f"残高不足: {e}")
raise
except ccxt.NetworkError as e:
logger.error(f"ネットワークエラー: {e}")
raise
def limit_buy(
self, symbol: str, amount: float, price: float
) -> dict:
"""指値買い注文"""
order = self.exchange.create_limit_buy_order(
symbol, amount, price
)
logger.info(f"指値買い: {symbol} {amount} @ {price}")
return order
def stop_loss(
self,
symbol: str,
amount: float,
stop_price: float,
) -> dict:
"""逆指値(ストップロス)注文"""
order = self.exchange.create_order(
symbol,
"stop_market",
"sell",
amount,
None,
{"stopPrice": stop_price},
)
logger.info(f"ストップロス設定: {symbol} {amount} @ {stop_price}")
return order
def cancel_all_orders(self, symbol: str) -> list:
"""指定シンボルの全未約定注文をキャンセル"""
open_orders = self.exchange.fetch_open_orders(symbol)
results = []
for order in open_orders:
result = self.exchange.cancel_order(order["id"], symbol)
results.append(result)
logger.info(f"注文キャンセル: {order['id']}")
return results
WebSocketリアルタイムデータ
ccxt.proによるWebSocket接続
import ccxt.pro as ccxtpro
import asyncio
async def watch_ticker(symbol: str):
"""WebSocketでティッカーをリアルタイム監視"""
exchange = ccxtpro.bybit({"enableRateLimit": True})
try:
while True:
ticker = await exchange.watch_ticker(symbol)
print(
f"{symbol}: {ticker['last']:.2f} "
f"({ticker['percentage']:+.2f}%)"
)
finally:
await exchange.close()
async def watch_orderbook(symbol: str):
"""WebSocketで板情報をリアルタイム監視"""
exchange = ccxtpro.bybit({"enableRateLimit": True})
try:
while True:
orderbook = await exchange.watch_order_book(symbol)
spread = orderbook["asks"][0][0] - orderbook["bids"][0][0]
print(
f"Bid: {orderbook['bids'][0][0]:.2f} | "
f"Ask: {orderbook['asks'][0][0]:.2f} | "
f"Spread: {spread:.2f}"
)
finally:
await exchange.close()
# 複数シンボルの同時監視
async def watch_multiple():
symbols = ["BTC/USDT", "ETH/USDT", "SOL/USDT"]
exchange = ccxtpro.bybit({"enableRateLimit": True})
try:
while True:
tasks = [
exchange.watch_ticker(symbol) for symbol in symbols
]
tickers = await asyncio.gather(*tasks)
for ticker in tickers:
print(
f"{ticker['symbol']}: {ticker['last']:.2f}"
)
finally:
await exchange.close()
asyncio.run(watch_ticker("BTC/USDT"))
戦略実装:移動平均クロスオーバー
基本戦略の実装
import ta
from dataclasses import dataclass
from enum import Enum
class Signal(Enum):
BUY = "buy"
SELL = "sell"
HOLD = "hold"
@dataclass
class StrategyConfig:
fast_period: int = 20
slow_period: int = 50
rsi_period: int = 14
rsi_overbought: float = 70.0
rsi_oversold: float = 30.0
class MACrossoverStrategy:
"""移動平均クロスオーバー+RSIフィルター戦略"""
def __init__(self, config: StrategyConfig = StrategyConfig()):
self.config = config
def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
"""テクニカル指標を計算"""
df["sma_fast"] = ta.trend.sma_indicator(
df["close"], window=self.config.fast_period
)
df["sma_slow"] = ta.trend.sma_indicator(
df["close"], window=self.config.slow_period
)
df["rsi"] = ta.momentum.rsi(
df["close"], window=self.config.rsi_period
)
df["atr"] = ta.volatility.average_true_range(
df["high"], df["low"], df["close"], window=14
)
return df
def generate_signal(self, df: pd.DataFrame) -> Signal:
"""最新のシグナルを生成"""
df = self.calculate_indicators(df)
latest = df.iloc[-1]
previous = df.iloc[-2]
# ゴールデンクロス(短期MAが長期MAを上抜け)
if (
previous["sma_fast"] <= previous["sma_slow"]
and latest["sma_fast"] > latest["sma_slow"]
and latest["rsi"] < self.config.rsi_overbought
):
return Signal.BUY
# デッドクロス(短期MAが長期MAを下抜け)
if (
previous["sma_fast"] >= previous["sma_slow"]
and latest["sma_fast"] < latest["sma_slow"]
and latest["rsi"] > self.config.rsi_oversold
):
return Signal.SELL
return Signal.HOLD
Botのメインループ
import time
class TradingBot:
def __init__(
self,
exchange: ccxt.Exchange,
symbol: str,
strategy: MACrossoverStrategy,
order_manager: OrderManager,
trade_amount: float,
):
self.exchange = exchange
self.symbol = symbol
self.strategy = strategy
self.order_manager = order_manager
self.trade_amount = trade_amount
self.position = None # None / "long"
def run(self, interval_seconds: int = 60):
"""メインループ"""
logger.info(f"Bot起動: {self.symbol}")
while True:
try:
# データ取得
df = fetch_ohlcv(self.exchange, self.symbol, "1h", 200)
# シグナル生成
signal = self.strategy.generate_signal(df)
logger.info(f"シグナル: {signal.value}")
# シグナルに基づく注文実行
if signal == Signal.BUY and self.position is None:
self.order_manager.market_buy(
self.symbol, self.trade_amount
)
self.position = "long"
# ストップロス設定(ATRの2倍)
atr = df.iloc[-1]["atr"]
stop_price = df.iloc[-1]["close"] - (atr * 2)
self.order_manager.stop_loss(
self.symbol, self.trade_amount, stop_price
)
elif signal == Signal.SELL and self.position == "long":
self.order_manager.cancel_all_orders(self.symbol)
self.exchange.create_market_sell_order(
self.symbol, self.trade_amount
)
self.position = None
time.sleep(interval_seconds)
except ccxt.NetworkError as e:
logger.warning(f"ネットワークエラー: {e}")
time.sleep(30)
except Exception as e:
logger.error(f"予期しないエラー: {e}")
time.sleep(60)
バックテスト
vectorbtによるバックテスト
import vectorbt as vbt
def backtest_ma_crossover(
symbol: str,
fast_window: int = 20,
slow_window: int = 50,
start_date: str = "2025-01-01",
end_date: str = "2026-03-15",
) -> dict:
"""移動平均クロスオーバー戦略のバックテスト"""
# データ取得
price = vbt.YFData.download(
symbol, start=start_date, end=end_date
).get("Close")
# 移動平均計算
fast_ma = vbt.MA.run(price, fast_window)
slow_ma = vbt.MA.run(price, slow_window)
# シグナル生成
entries = fast_ma.ma_crossed_above(slow_ma)
exits = fast_ma.ma_crossed_below(slow_ma)
# バックテスト実行
portfolio = vbt.Portfolio.from_signals(
price, entries, exits, init_cash=10000, fees=0.001
)
return {
"total_return": portfolio.total_return(),
"sharpe_ratio": portfolio.sharpe_ratio(),
"max_drawdown": portfolio.max_drawdown(),
"total_trades": portfolio.total_trades(),
"win_rate": portfolio.win_rate(),
}
# BTC-USDで検証
results = backtest_ma_crossover("BTC-USD")
print(f"トータルリターン: {results['total_return']:.2%}")
print(f"シャープレシオ: {results['sharpe_ratio']:.2f}")
print(f"最大ドローダウン: {results['max_drawdown']:.2%}")
バックテスト結果の目安
| 戦略 | 期間 | 年率リターン | シャープレシオ | 最大DD | 勝率 |
|---|---|---|---|---|---|
| MA(20/50) BTC | 2024-2025 | 18.5% | 1.2 | -15.3% | 42% |
| MA(20/50) ETH | 2024-2025 | 22.1% | 1.0 | -22.7% | 38% |
| RSI(14) BTC | 2024-2025 | 15.2% | 0.9 | -18.1% | 45% |
| MACD BTC | 2024-2025 | 20.3% | 1.1 | -16.8% | 40% |
注意: バックテスト結果は過去のパフォーマンスであり、将来のリターンを保証するものではありません。スリッページ、手数料、APIレイテンシーの影響で、実運用のパフォーマンスは低下します。
リスク管理
ポジションサイジング
class RiskManager:
def __init__(
self,
max_position_pct: float = 0.05,
max_drawdown_pct: float = 0.10,
max_daily_loss_pct: float = 0.03,
):
self.max_position_pct = max_position_pct
self.max_drawdown_pct = max_drawdown_pct
self.max_daily_loss_pct = max_daily_loss_pct
self.peak_balance = 0.0
self.daily_pnl = 0.0
def calculate_position_size(
self,
balance: float,
price: float,
atr: float,
risk_per_trade: float = 0.01,
) -> float:
"""ATRベースのポジションサイジング"""
# リスク額 = 残高 × リスク許容率
risk_amount = balance * risk_per_trade
# ストップ幅 = ATR × 2
stop_distance = atr * 2
# ポジションサイズ = リスク額 / ストップ幅
position_size = risk_amount / stop_distance
# 最大ポジション制限
max_size = (balance * self.max_position_pct) / price
return min(position_size, max_size)
def check_drawdown(self, current_balance: float) -> bool:
"""最大ドローダウンチェック"""
self.peak_balance = max(self.peak_balance, current_balance)
drawdown = (self.peak_balance - current_balance) / self.peak_balance
if drawdown >= self.max_drawdown_pct:
logger.warning(
f"最大ドローダウン到達: {drawdown:.2%}"
)
return False # 取引停止
return True # 取引継続
def check_daily_loss(self, daily_pnl: float, balance: float) -> bool:
"""日次最大損失チェック"""
daily_loss_pct = abs(daily_pnl) / balance if daily_pnl < 0 else 0
if daily_loss_pct >= self.max_daily_loss_pct:
logger.warning(
f"日次最大損失到達: {daily_loss_pct:.2%}"
)
return False
return True
リスク管理パラメータの推奨値
| パラメータ | 保守的 | 標準 | 積極的 |
|---|---|---|---|
| 1トレードリスク | 0.5% | 1.0% | 2.0% |
| 最大ポジション比率 | 3% | 5% | 10% |
| 最大ドローダウン | 5% | 10% | 20% |
| 日次最大損失 | 1% | 3% | 5% |
ログ・監視・通知
Discord通知の実装
import requests
class DiscordNotifier:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send(self, title: str, message: str, color: int = 0x00FF00):
"""Discord Webhookで通知を送信"""
embed = {
"title": title,
"description": message,
"color": color,
}
payload = {"embeds": [embed]}
requests.post(self.webhook_url, json=payload)
def notify_trade(self, action: str, symbol: str, price: float, amount: float):
color = 0x00FF00 if action == "BUY" else 0xFF0000
self.send(
f"{action}: {symbol}",
f"価格: {price:,.2f}\n数量: {amount:.6f}\n"
f"総額: {price * amount:,.2f} USDT",
color,
)
def notify_error(self, error: str):
self.send("エラー発生", error, 0xFF0000)
Grafanaダッシュボードの構成
Prometheus + Grafanaでリアルタイム監視ダッシュボードを構築します。
| メトリクス | 説明 | アラート閾値 |
|---|---|---|
| bot_balance | 残高推移 | 初期残高の-10% |
| bot_pnl_daily | 日次損益 | -3%以下 |
| bot_trades_total | 累計取引数 | — |
| bot_api_latency | API応答時間 | 5秒以上 |
| bot_error_count | エラー数 | 5回/時間 |
本番運用インフラ
AWS構成例
┌─────────────────────────────────────────┐
│ AWS EC2 (t3.small) or ECS Fargate │
│ ┌──────────────────────────────────┐ │
│ │ Trading Bot Container │ │
│ │ - Python 3.12 │ │
│ │ - ccxt + strategy │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌──────────────┐ ┌────────────────┐ │
│ │ CloudWatch │ │ Secrets Manager│ │
│ │ Logs/Metrics │ │ API Keys │ │
│ └──────────────┘ └────────────────┘ │
└──────────────────────────────────────────┘
Docker化
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# ヘルスチェック
HEALTHCHECK \
CMD python healthcheck.py
CMD ["python", "main.py"]
シークレット管理
import boto3
def get_secret(secret_name: str) -> dict:
"""AWS Secrets ManagerからAPIキーを取得"""
client = boto3.client("secretsmanager", region_name="ap-northeast-1")
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
# 使用例
secrets = get_secret("trading-bot/bybit-api")
exchange = ccxt.bybit({
"apiKey": secrets["api_key"],
"secret": secrets["secret_key"],
})
運用上の注意点
APIレート制限
| 取引所 | REST制限 | WebSocket制限 | 推奨間隔 |
|---|---|---|---|
| Binance | 1200 req/min | 5 msg/sec | 500ms |
| Bybit | 120 req/min | 20 req/sec | 1000ms |
| OKX | 60 req/2sec | 480 msg/min | 1000ms |
| bitFlyer | 500 req/5min | — | 600ms |
取引所ダウンタイムへの対策
- 複数取引所への分散: 単一取引所に依存しない
- ヘルスチェック実装: API応答の定期確認
- フェイルオーバー: メイン取引所がダウンしたらサブに切り替え
- ポジションの自動クローズ: 長時間のAPI障害時は既存ポジションを決済
よくある失敗パターン
| 失敗パターン | 原因 | 対策 |
|---|---|---|
| 過剰なレバレッジ | リスク管理の欠如 | 最大レバレッジを3倍以下に |
| バックテストの過学習 | パラメータの最適化過多 | ウォークフォワード分析 |
| スリッページの無視 | 薄い板での大量注文 | 注文を分割、流動性確認 |
| API障害中の放置 | 監視の欠如 | ヘルスチェック+自動停止 |
| テストネット未使用 | 本番で直接テスト | 必ずテストネットで検証 |
まとめ
仮想通貨Bot開発は、適切なツール(ccxt)とリスク管理を組み合わせることで、個人でも実用的な自動売買システムを構築できます。
開発ステップの推奨順序:
- テストネットでccxtの基本操作を習得
- バックテストで戦略の有効性を検証
- 少額の実資金でペーパートレード
- リスク管理とモニタリングを実装
- 段階的に資金を増加
最も重要なのは、バックテストで利益が出る戦略でも、本番では必ず少額から始めることです。市場環境の変化、スリッページ、API遅延など、バックテストでは再現できない要因が存在します。