仮想通貨Bot開発の実践 — ccxt・Pythonで作るCEX自動売買システム

中級〜上級
アルゴリズム取引自動売買取引所API

はじめに

仮想通貨市場は24時間365日稼働しており、人間が常時監視することは不可能です。自動売買Bot(トレーディングBot)は、事前に定義したルールに基づいて自動的に売買を執行するプログラムであり、仮想通貨トレーダーにとって必須のツールとなっています。

本記事では、Pythonとccxtライブラリを使って、実際に動作するCEX(中央集権型取引所)自動売買Botを構築する手順を解説します。

本記事で構築するシステムの全体像

コンポーネント技術役割
取引所接続ccxt100以上の取引所に統一APIでアクセス
データ取得ccxt + WebSocketOHLCV、板情報、ティッカーのリアルタイム取得
戦略エンジンpandas + numpyテクニカル指標の計算とシグナル生成
注文執行ccxt成行・指値・逆指値注文の発注
リスク管理カスタム実装ポジションサイジング、最大ドローダウン制御
バックテストvectorbt / Backtrader過去データでの戦略検証
監視Grafana + Discordパフォーマンス監視と通知
インフラAWS/GCP24/365稼働の本番環境

ccxtライブラリの基礎

ccxtとは

ccxt(CryptoCurrency eXchange Trading Library)は、100以上の取引所に統一APIでアクセスできるオープンソースライブラリです。取引所ごとに異なるAPIの差異を吸収し、同一のコードで複数の取引所を操作できます。

インストールと初期設定

# インストール
pip install ccxt pandas numpy ta

# 基本的な取引所接続
import ccxt

# パブリックAPI(認証不要)
exchange = ccxt.binance()

# プライベートAPI(認証必要)
exchange = ccxt.binance({
    "apiKey": "YOUR_API_KEY",
    "secret": "YOUR_SECRET_KEY",
    "options": {
        "defaultType": "spot",  # spot / future / margin
    },
    "enableRateLimit": True,  # レート制限を自動管理
})

# テストネット接続(推奨:開発時)
exchange.set_sandbox_mode(True)

対応取引所の比較

取引所API安定性WebSocketテストネット手数料(Maker/Taker)日本居住者
Binance対応あり0.1% / 0.1%利用不可
Bybit対応あり0.1% / 0.1%利用可
OKX対応あり0.08% / 0.1%利用可
bitFlyer対応なし0.01-0.15%利用可
GMOコイン対応なし-0.01% / 0.05%利用可

認証とセキュリティ

import os
from dotenv import load_dotenv

load_dotenv()

exchange = ccxt.bybit({
    "apiKey": os.getenv("BYBIT_API_KEY"),
    "secret": os.getenv("BYBIT_SECRET"),
    "options": {"defaultType": "spot"},
    "enableRateLimit": True,
})

# IP制限を取引所のAPI設定で必ず有効化すること
# 出金権限は付与しないこと(取引権限のみ)

# 接続テスト
balance = exchange.fetch_balance()
print(f"USDT残高: {balance['USDT']['free']}")

マーケットデータの取得

OHLCV(ローソク足)データ

import pandas as pd

def fetch_ohlcv(
    exchange: ccxt.Exchange,
    symbol: str,
    timeframe: str = "1h",
    limit: int = 500,
) -> pd.DataFrame:
    """OHLCVデータを取得してDataFrameに変換"""
    ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
    df = pd.DataFrame(
        ohlcv,
        columns=["timestamp", "open", "high", "low", "close", "volume"],
    )
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    df.set_index("timestamp", inplace=True)
    return df

# 使用例
df = fetch_ohlcv(exchange, "BTC/USDT", "1h", 500)
print(df.tail())

オーダーブック(板情報)

def fetch_orderbook_summary(
    exchange: ccxt.Exchange,
    symbol: str,
    depth: int = 20,
) -> dict:
    """板情報のサマリーを取得"""
    orderbook = exchange.fetch_order_book(symbol, depth)

    bid_volume = sum([order[1] for order in orderbook["bids"]])
    ask_volume = sum([order[1] for order in orderbook["asks"]])
    spread = orderbook["asks"][0][0] - orderbook["bids"][0][0]
    mid_price = (orderbook["asks"][0][0] + orderbook["bids"][0][0]) / 2

    return {
        "best_bid": orderbook["bids"][0][0],
        "best_ask": orderbook["asks"][0][0],
        "spread": spread,
        "spread_pct": (spread / mid_price) * 100,
        "bid_volume": bid_volume,
        "ask_volume": ask_volume,
        "imbalance": bid_volume / (bid_volume + ask_volume),
    }

summary = fetch_orderbook_summary(exchange, "BTC/USDT")
print(f"スプレッド: {summary['spread_pct']:.4f}%")
print(f"板の偏り(買い比率): {summary['imbalance']:.2f}")

ティッカー情報

def fetch_ticker_info(exchange: ccxt.Exchange, symbol: str) -> dict:
    """ティッカー情報を取得"""
    ticker = exchange.fetch_ticker(symbol)
    return {
        "last": ticker["last"],
        "change_24h": ticker["percentage"],
        "volume_24h": ticker["quoteVolume"],
        "high_24h": ticker["high"],
        "low_24h": ticker["low"],
        "vwap": ticker.get("vwap"),
    }

注文執行

注文タイプの比較

注文タイプ用途メリットデメリット
成行注文即時約定が必要な場合確実に約定スリッページリスク
指値注文特定価格での約定価格保証約定しない可能性
逆指値注文ストップロス損失制限ギャップで滑る可能性
OCO注文利確+損切り同時自動管理対応取引所が限定的

注文関数の実装

from typing import Optional
import logging

logger = logging.getLogger(__name__)

class OrderManager:
    def __init__(self, exchange: ccxt.Exchange):
        self.exchange = exchange

    def market_buy(self, symbol: str, amount: float) -> dict:
        """成行買い注文"""
        try:
            order = self.exchange.create_market_buy_order(symbol, amount)
            logger.info(
                f"成行買い: {symbol} {amount} @ {order['average']}"
            )
            return order
        except ccxt.InsufficientFunds as e:
            logger.error(f"残高不足: {e}")
            raise
        except ccxt.NetworkError as e:
            logger.error(f"ネットワークエラー: {e}")
            raise

    def limit_buy(
        self, symbol: str, amount: float, price: float
    ) -> dict:
        """指値買い注文"""
        order = self.exchange.create_limit_buy_order(
            symbol, amount, price
        )
        logger.info(f"指値買い: {symbol} {amount} @ {price}")
        return order

    def stop_loss(
        self,
        symbol: str,
        amount: float,
        stop_price: float,
    ) -> dict:
        """逆指値(ストップロス)注文"""
        order = self.exchange.create_order(
            symbol,
            "stop_market",
            "sell",
            amount,
            None,
            {"stopPrice": stop_price},
        )
        logger.info(f"ストップロス設定: {symbol} {amount} @ {stop_price}")
        return order

    def cancel_all_orders(self, symbol: str) -> list:
        """指定シンボルの全未約定注文をキャンセル"""
        open_orders = self.exchange.fetch_open_orders(symbol)
        results = []
        for order in open_orders:
            result = self.exchange.cancel_order(order["id"], symbol)
            results.append(result)
            logger.info(f"注文キャンセル: {order['id']}")
        return results

WebSocketリアルタイムデータ

ccxt.proによるWebSocket接続

import ccxt.pro as ccxtpro
import asyncio

async def watch_ticker(symbol: str):
    """WebSocketでティッカーをリアルタイム監視"""
    exchange = ccxtpro.bybit({"enableRateLimit": True})
    try:
        while True:
            ticker = await exchange.watch_ticker(symbol)
            print(
                f"{symbol}: {ticker['last']:.2f} "
                f"({ticker['percentage']:+.2f}%)"
            )
    finally:
        await exchange.close()

async def watch_orderbook(symbol: str):
    """WebSocketで板情報をリアルタイム監視"""
    exchange = ccxtpro.bybit({"enableRateLimit": True})
    try:
        while True:
            orderbook = await exchange.watch_order_book(symbol)
            spread = orderbook["asks"][0][0] - orderbook["bids"][0][0]
            print(
                f"Bid: {orderbook['bids'][0][0]:.2f} | "
                f"Ask: {orderbook['asks'][0][0]:.2f} | "
                f"Spread: {spread:.2f}"
            )
    finally:
        await exchange.close()

# 複数シンボルの同時監視
async def watch_multiple():
    symbols = ["BTC/USDT", "ETH/USDT", "SOL/USDT"]
    exchange = ccxtpro.bybit({"enableRateLimit": True})
    try:
        while True:
            tasks = [
                exchange.watch_ticker(symbol) for symbol in symbols
            ]
            tickers = await asyncio.gather(*tasks)
            for ticker in tickers:
                print(
                    f"{ticker['symbol']}: {ticker['last']:.2f}"
                )
    finally:
        await exchange.close()

asyncio.run(watch_ticker("BTC/USDT"))

戦略実装:移動平均クロスオーバー

基本戦略の実装

import ta
from dataclasses import dataclass
from enum import Enum

class Signal(Enum):
    BUY = "buy"
    SELL = "sell"
    HOLD = "hold"

@dataclass
class StrategyConfig:
    fast_period: int = 20
    slow_period: int = 50
    rsi_period: int = 14
    rsi_overbought: float = 70.0
    rsi_oversold: float = 30.0

class MACrossoverStrategy:
    """移動平均クロスオーバー+RSIフィルター戦略"""

    def __init__(self, config: StrategyConfig = StrategyConfig()):
        self.config = config

    def calculate_indicators(self, df: pd.DataFrame) -> pd.DataFrame:
        """テクニカル指標を計算"""
        df["sma_fast"] = ta.trend.sma_indicator(
            df["close"], window=self.config.fast_period
        )
        df["sma_slow"] = ta.trend.sma_indicator(
            df["close"], window=self.config.slow_period
        )
        df["rsi"] = ta.momentum.rsi(
            df["close"], window=self.config.rsi_period
        )
        df["atr"] = ta.volatility.average_true_range(
            df["high"], df["low"], df["close"], window=14
        )
        return df

    def generate_signal(self, df: pd.DataFrame) -> Signal:
        """最新のシグナルを生成"""
        df = self.calculate_indicators(df)
        latest = df.iloc[-1]
        previous = df.iloc[-2]

        # ゴールデンクロス(短期MAが長期MAを上抜け)
        if (
            previous["sma_fast"] <= previous["sma_slow"]
            and latest["sma_fast"] > latest["sma_slow"]
            and latest["rsi"] < self.config.rsi_overbought
        ):
            return Signal.BUY

        # デッドクロス(短期MAが長期MAを下抜け)
        if (
            previous["sma_fast"] >= previous["sma_slow"]
            and latest["sma_fast"] < latest["sma_slow"]
            and latest["rsi"] > self.config.rsi_oversold
        ):
            return Signal.SELL

        return Signal.HOLD

Botのメインループ

import time

class TradingBot:
    def __init__(
        self,
        exchange: ccxt.Exchange,
        symbol: str,
        strategy: MACrossoverStrategy,
        order_manager: OrderManager,
        trade_amount: float,
    ):
        self.exchange = exchange
        self.symbol = symbol
        self.strategy = strategy
        self.order_manager = order_manager
        self.trade_amount = trade_amount
        self.position = None  # None / "long"

    def run(self, interval_seconds: int = 60):
        """メインループ"""
        logger.info(f"Bot起動: {self.symbol}")
        while True:
            try:
                # データ取得
                df = fetch_ohlcv(self.exchange, self.symbol, "1h", 200)

                # シグナル生成
                signal = self.strategy.generate_signal(df)
                logger.info(f"シグナル: {signal.value}")

                # シグナルに基づく注文実行
                if signal == Signal.BUY and self.position is None:
                    self.order_manager.market_buy(
                        self.symbol, self.trade_amount
                    )
                    self.position = "long"
                    # ストップロス設定(ATRの2倍)
                    atr = df.iloc[-1]["atr"]
                    stop_price = df.iloc[-1]["close"] - (atr * 2)
                    self.order_manager.stop_loss(
                        self.symbol, self.trade_amount, stop_price
                    )

                elif signal == Signal.SELL and self.position == "long":
                    self.order_manager.cancel_all_orders(self.symbol)
                    self.exchange.create_market_sell_order(
                        self.symbol, self.trade_amount
                    )
                    self.position = None

                time.sleep(interval_seconds)

            except ccxt.NetworkError as e:
                logger.warning(f"ネットワークエラー: {e}")
                time.sleep(30)
            except Exception as e:
                logger.error(f"予期しないエラー: {e}")
                time.sleep(60)

バックテスト

vectorbtによるバックテスト

import vectorbt as vbt

def backtest_ma_crossover(
    symbol: str,
    fast_window: int = 20,
    slow_window: int = 50,
    start_date: str = "2025-01-01",
    end_date: str = "2026-03-15",
) -> dict:
    """移動平均クロスオーバー戦略のバックテスト"""
    # データ取得
    price = vbt.YFData.download(
        symbol, start=start_date, end=end_date
    ).get("Close")

    # 移動平均計算
    fast_ma = vbt.MA.run(price, fast_window)
    slow_ma = vbt.MA.run(price, slow_window)

    # シグナル生成
    entries = fast_ma.ma_crossed_above(slow_ma)
    exits = fast_ma.ma_crossed_below(slow_ma)

    # バックテスト実行
    portfolio = vbt.Portfolio.from_signals(
        price, entries, exits, init_cash=10000, fees=0.001
    )

    return {
        "total_return": portfolio.total_return(),
        "sharpe_ratio": portfolio.sharpe_ratio(),
        "max_drawdown": portfolio.max_drawdown(),
        "total_trades": portfolio.total_trades(),
        "win_rate": portfolio.win_rate(),
    }

# BTC-USDで検証
results = backtest_ma_crossover("BTC-USD")
print(f"トータルリターン: {results['total_return']:.2%}")
print(f"シャープレシオ: {results['sharpe_ratio']:.2f}")
print(f"最大ドローダウン: {results['max_drawdown']:.2%}")

バックテスト結果の目安

戦略期間年率リターンシャープレシオ最大DD勝率
MA(20/50) BTC2024-202518.5%1.2-15.3%42%
MA(20/50) ETH2024-202522.1%1.0-22.7%38%
RSI(14) BTC2024-202515.2%0.9-18.1%45%
MACD BTC2024-202520.3%1.1-16.8%40%

注意: バックテスト結果は過去のパフォーマンスであり、将来のリターンを保証するものではありません。スリッページ、手数料、APIレイテンシーの影響で、実運用のパフォーマンスは低下します。


リスク管理

ポジションサイジング

class RiskManager:
    def __init__(
        self,
        max_position_pct: float = 0.05,
        max_drawdown_pct: float = 0.10,
        max_daily_loss_pct: float = 0.03,
    ):
        self.max_position_pct = max_position_pct
        self.max_drawdown_pct = max_drawdown_pct
        self.max_daily_loss_pct = max_daily_loss_pct
        self.peak_balance = 0.0
        self.daily_pnl = 0.0

    def calculate_position_size(
        self,
        balance: float,
        price: float,
        atr: float,
        risk_per_trade: float = 0.01,
    ) -> float:
        """ATRベースのポジションサイジング"""
        # リスク額 = 残高 × リスク許容率
        risk_amount = balance * risk_per_trade
        # ストップ幅 = ATR × 2
        stop_distance = atr * 2
        # ポジションサイズ = リスク額 / ストップ幅
        position_size = risk_amount / stop_distance
        # 最大ポジション制限
        max_size = (balance * self.max_position_pct) / price
        return min(position_size, max_size)

    def check_drawdown(self, current_balance: float) -> bool:
        """最大ドローダウンチェック"""
        self.peak_balance = max(self.peak_balance, current_balance)
        drawdown = (self.peak_balance - current_balance) / self.peak_balance
        if drawdown >= self.max_drawdown_pct:
            logger.warning(
                f"最大ドローダウン到達: {drawdown:.2%}"
            )
            return False  # 取引停止
        return True  # 取引継続

    def check_daily_loss(self, daily_pnl: float, balance: float) -> bool:
        """日次最大損失チェック"""
        daily_loss_pct = abs(daily_pnl) / balance if daily_pnl < 0 else 0
        if daily_loss_pct >= self.max_daily_loss_pct:
            logger.warning(
                f"日次最大損失到達: {daily_loss_pct:.2%}"
            )
            return False
        return True

リスク管理パラメータの推奨値

パラメータ保守的標準積極的
1トレードリスク0.5%1.0%2.0%
最大ポジション比率3%5%10%
最大ドローダウン5%10%20%
日次最大損失1%3%5%

ログ・監視・通知

Discord通知の実装

import requests

class DiscordNotifier:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    def send(self, title: str, message: str, color: int = 0x00FF00):
        """Discord Webhookで通知を送信"""
        embed = {
            "title": title,
            "description": message,
            "color": color,
        }
        payload = {"embeds": [embed]}
        requests.post(self.webhook_url, json=payload)

    def notify_trade(self, action: str, symbol: str, price: float, amount: float):
        color = 0x00FF00 if action == "BUY" else 0xFF0000
        self.send(
            f"{action}: {symbol}",
            f"価格: {price:,.2f}\n数量: {amount:.6f}\n"
            f"総額: {price * amount:,.2f} USDT",
            color,
        )

    def notify_error(self, error: str):
        self.send("エラー発生", error, 0xFF0000)

Grafanaダッシュボードの構成

Prometheus + Grafanaでリアルタイム監視ダッシュボードを構築します。

メトリクス説明アラート閾値
bot_balance残高推移初期残高の-10%
bot_pnl_daily日次損益-3%以下
bot_trades_total累計取引数
bot_api_latencyAPI応答時間5秒以上
bot_error_countエラー数5回/時間

本番運用インフラ

AWS構成例

┌─────────────────────────────────────────┐
│  AWS EC2 (t3.small) or ECS Fargate      │
│  ┌──────────────────────────────────┐    │
│  │  Trading Bot Container           │    │
│  │  - Python 3.12                   │    │
│  │  - ccxt + strategy              │    │
│  └──────────────────────────────────┘    │
│                                          │
│  ┌──────────────┐  ┌────────────────┐    │
│  │ CloudWatch   │  │ Secrets Manager│    │
│  │ Logs/Metrics │  │ API Keys      │    │
│  └──────────────┘  └────────────────┘    │
└──────────────────────────────────────────┘

Docker化

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

# ヘルスチェック
HEALTHCHECK --interval=60s --timeout=10s \
  CMD python healthcheck.py

CMD ["python", "main.py"]

シークレット管理

import boto3

def get_secret(secret_name: str) -> dict:
    """AWS Secrets ManagerからAPIキーを取得"""
    client = boto3.client("secretsmanager", region_name="ap-northeast-1")
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])

# 使用例
secrets = get_secret("trading-bot/bybit-api")
exchange = ccxt.bybit({
    "apiKey": secrets["api_key"],
    "secret": secrets["secret_key"],
})

運用上の注意点

APIレート制限

取引所REST制限WebSocket制限推奨間隔
Binance1200 req/min5 msg/sec500ms
Bybit120 req/min20 req/sec1000ms
OKX60 req/2sec480 msg/min1000ms
bitFlyer500 req/5min600ms

取引所ダウンタイムへの対策

  1. 複数取引所への分散: 単一取引所に依存しない
  2. ヘルスチェック実装: API応答の定期確認
  3. フェイルオーバー: メイン取引所がダウンしたらサブに切り替え
  4. ポジションの自動クローズ: 長時間のAPI障害時は既存ポジションを決済

よくある失敗パターン

失敗パターン原因対策
過剰なレバレッジリスク管理の欠如最大レバレッジを3倍以下に
バックテストの過学習パラメータの最適化過多ウォークフォワード分析
スリッページの無視薄い板での大量注文注文を分割、流動性確認
API障害中の放置監視の欠如ヘルスチェック+自動停止
テストネット未使用本番で直接テスト必ずテストネットで検証

まとめ

仮想通貨Bot開発は、適切なツール(ccxt)とリスク管理を組み合わせることで、個人でも実用的な自動売買システムを構築できます。

開発ステップの推奨順序:

  1. テストネットでccxtの基本操作を習得
  2. バックテストで戦略の有効性を検証
  3. 少額の実資金でペーパートレード
  4. リスク管理とモニタリングを実装
  5. 段階的に資金を増加

最も重要なのは、バックテストで利益が出る戦略でも、本番では必ず少額から始めることです。市場環境の変化、スリッページ、API遅延など、バックテストでは再現できない要因が存在します。