扩展组件¶
trade-learn 的扩展原则是:用户层写策略语法,research 层写投研组件,backtest 层执行事件回测,Rust 只承担高频内核。日常扩展不需要改 Rust。
扩展点总览¶
| 扩展目标 | 推荐方式 | 入口 |
|---|---|---|
| Engine 专业策略组件 | bt.Indicator / bt.Analyzer / bt.Sizer / bt.CommInfoBase |
tradelearn.engine |
| Lite 快速策略组件 | 普通函数 + self.I(...),或 target_weights() 所需的权重 Series |
tradelearn.lite |
| Research 预处理 / 特征 / 选股 / 权重 | sklearn-like 类:fit() / transform() / fit_transform() / build() |
tradelearn.research |
| 数据源 | 返回标准 OHLCV DataFrame 或 MultiIndex(timestamp, symbol) panel |
tradelearn.data |
| 报告 | 从 Stats 或 Reporter 读取结果生成 HTML / CSV / XLSX |
tradelearn.report |
| paper / live broker | 实现 tradelearn.core.Broker 中性协议,通过事件回流成交状态 |
外部适配器 |
Engine 高级拓展¶
Engine 是 Backtrader 风格入口。复杂策略组件应尽量写成 class,这样可复用、可测试,也更符合迁移用户心智。
自定义 Engine 指标¶
import tradelearn.engine as bt
class MidPrice(bt.Indicator):
lines = ("mid",)
def __init__(self):
self.lines.mid = (self.data.high + self.data.low) / 2.0
class AboveMid(bt.Strategy):
def __init__(self):
self.mid = MidPrice(self.data)
def next(self):
if not self.position and self.data.close[0] > self.mid.mid[0]:
self.buy(size=10)
内置 vendor 指标仍然走统一命名空间:
self.ma20 = bt.tdx.MA(self.data.close, N=20)
self.rsi14 = bt.talib.RSI(self.data.close, timeperiod=14)
自定义 Sizer¶
class FixedCashSizer(bt.Sizer):
params = (("cash_per_trade", 10_000),)
def _getsizing(self, comminfo, cash, data, isbuy):
price = data.close[0]
return int(self.p.cash_per_trade / price) if price > 0 else 0
cerebro.addsizer(FixedCashSizer, cash_per_trade=20_000)
自定义 Analyzer¶
class TurnoverAnalyzer(bt.Analyzer):
def start(self):
self.turnover = 0.0
def notify_order(self, order):
if order.status == order.Completed:
self.turnover += abs(order.executed.size * order.executed.price)
def get_analysis(self):
return {"turnover": self.turnover}
cerebro.addanalyzer(TurnoverAnalyzer, _name="turnover")
自定义佣金¶
class FixedPlusPercent(bt.CommInfoBase):
params = (("commission", 0.0003), ("fixed", 1.0))
def getcommission(self, size, price):
return abs(size) * price * self.p.commission + self.p.fixed
cerebro.setcommission(comminfo=FixedPlusPercent())
Lite 轻量拓展¶
Lite 的定位是快速表达策略。自定义指标优先写成普通函数,然后通过 self.I(...) 注册。
import pandas as pd
import tradelearn.lite as tl
def zscore(close: pd.Series, window: int = 20) -> pd.Series:
mean = close.rolling(window).mean()
std = close.rolling(window).std()
return (close - mean) / std
class MeanReversion(tl.Strategy):
def init(self):
self.z = self.I(zscore, self.data.close, window=20)
self.start_on_bar(20)
def next(self):
if self.z[0] < -2:
self.buy(size=100)
elif self.position():
self.position().close()
内置指标与 Engine 一样使用 vendor 命名空间:
self.ma20 = tl.tdx.MA(self.data.close, N=20)
self.rsi14 = tl.talib.RSI(self.data.close, timeperiod=14)
self.tv_rsi = tl.tv.RSI(self.data.close, length=14)
Research 通用组件拓展¶
Research 组件适合投研流水线:训练集 fit、测试集 transform、最后生成 scores 或 weights。
自定义 transformer¶
import pandas as pd
class RankNormalizer:
def __init__(self, columns):
self.columns = list(columns)
def fit(self, frame: pd.DataFrame):
return self
def transform(self, frame: pd.DataFrame) -> pd.DataFrame:
out = frame.copy()
out[self.columns] = out[self.columns].rank(pct=True)
return out
def fit_transform(self, frame: pd.DataFrame) -> pd.DataFrame:
return self.fit(frame).transform(frame)
接入 Pipeline:
pipe = research.Pipeline([RankNormalizer(columns=["alpha"])])
train = pipe.fit_transform(train)
test = pipe.transform(test)
自定义 Allocator¶
Allocator 的职责是把 scores 转成目标权重。最简单的扩展只需要实现 build(scores)。
class LongOnlyTopN:
def __init__(self, n: int, gross: float = 0.95):
self.n = n
self.gross = gross
def build(self, scores):
selected = scores.groupby(level="timestamp").nlargest(self.n)
selected.index = selected.index.droplevel(0)
return selected.groupby(level="timestamp").transform(
lambda x: self.gross / len(x)
).rename("weight")
如果只是常见 top-k 等权组合,内置写法更直接:
import tradelearn.research.portfolio as pf
allocator = pf.Allocator(
select=pf.TopK(k=50),
weight=pf.EqualWeight(gross=0.95),
constrain=pf.Constraints(max_weight=0.05, normalize=True),
)
weights = allocator.build(scores)
Data / Report 通用拓展¶
自定义数据源¶
自定义 provider 不需要继承框架类,只要返回规范数据:
- 单标的:index 为时间,columns 至少包含
open/high/low/close/volume。 - 多标的:
MultiIndex(timestamp, symbol),columns 同上。
class MyProvider:
def history_ohlc(self, symbols, start=None, end=None, freq="1d"):
...
return bars
Engine 的 cerebro.adddata(panel) 会自动按 symbol 拆 feed;Lite 的 Backtest(panel, Strategy) 也会识别 panel。
自定义报告¶
Stats 是报告层的统一输入:
from tradelearn.report import Reporter
reporter = Reporter(strategy.stats)
reporter.report("report.html")
summary = reporter.summary()
trades = reporter.trades()
equity = reporter.equity_curve()
如果用户已有收益率序列,也可以使用兼容 pyfolio 心智的入口:
Reporter.from_returns(returns).report("returns_report.html")
paper / live broker 拓展¶
实盘 broker 不复用 RustBroker 状态机。外部适配器实现 core 的中性协议,并通过 BrokerEventPump 把成交、撤单、拒单回流给策略。
from tradelearn.core import AccountSnapshot, OrderAck, OrderRequest
class MyLiveBroker:
def place(self, req: OrderRequest) -> OrderAck:
...
def cancel(self, broker_oid: str) -> None:
...
def positions(self):
...
def account(self) -> AccountSnapshot:
...
def on_fill(self, callback):
...
约束:
- 策略发出的是订单意图,不假设立即成交。
- 成交、撤单、拒单、状态变化必须通过 broker 事件回流。
- QMT / IB / CTP 等具体适配器可以在外部包或私有扩展里实现,不进入
core。