| Overall Statistics |
|
Total Trades 175 Average Win 18.58% Average Loss -7.00% Compounding Annual Return 178.304% Drawdown 57.000% Expectancy 0.722 Net Profit 2975.910% Sharpe Ratio 2.32 Probabilistic Sharpe Ratio 83.874% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 2.65 Alpha 1.452 Beta 0.062 Annual Standard Deviation 0.628 Annual Variance 0.395 Information Ratio 2.122 Tracking Error 0.648 Treynor Ratio 23.643 Total Fees $26826.03 |
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import talib
class renko:
def __init__(self):
self.source_prices = []
self.renko_prices = []
self.renko_directions = []
# Setting brick size. Auto mode is preferred, it uses history
def set_brick_size(self, HLC_history = None, auto = True, brick_size = 10.0):
if auto == True:
self.brick_size = self.__get_optimal_brick_size(HLC_history.iloc[:, [0, 1, 2]])
else:
self.brick_size = brick_size
return self.brick_size
def __renko_rule(self, last_price):
# Get the gap between two prices
gap_div = int(float(last_price - self.renko_prices[-1]) / self.brick_size)
is_new_brick = False
start_brick = 0
num_new_bars = 0
# When we have some gap in prices
if gap_div != 0:
# Forward any direction (up or down)
if (gap_div > 0 and (self.renko_directions[-1] > 0 or self.renko_directions[-1] == 0)) or (gap_div < 0 and (self.renko_directions[-1] < 0 or self.renko_directions[-1] == 0)):
num_new_bars = gap_div
is_new_brick = True
start_brick = 0
# Backward direction (up -> down or down -> up)
elif np.abs(gap_div) >= 2: # Should be double gap at least
num_new_bars = gap_div
num_new_bars -= np.sign(gap_div)
start_brick = 2
is_new_brick = True
self.renko_prices.append(self.renko_prices[-1] + 2 * self.brick_size * np.sign(gap_div))
self.renko_directions.append(np.sign(gap_div))
#else:
#num_new_bars = 0
if is_new_brick:
# Add each brick
for d in range(start_brick, np.abs(gap_div)):
self.renko_prices.append(self.renko_prices[-1] + self.brick_size * np.sign(gap_div))
self.renko_directions.append(np.sign(gap_div))
return num_new_bars
# Getting renko on history
def build_history(self, prices):
if len(prices) > 0:
# Init by start values
self.source_prices = prices
self.renko_prices.append(prices.iloc[0])
self.renko_directions.append(0)
# For each price in history
for p in self.source_prices[1:]:
self.__renko_rule(p)
return len(self.renko_prices)
# Getting next renko value for last price
def do_next(self, last_price):
if len(self.renko_prices) == 0:
self.source_prices.append(last_price)
self.renko_prices.append(last_price)
self.renko_directions.append(0)
return 1
else:
self.source_prices.append(last_price)
return self.__renko_rule(last_price)
# Simple method to get optimal brick size based on ATR
def __get_optimal_brick_size(self, HLC_history, atr_timeperiod = 14):
brick_size = 0.0
# If we have enough of data
if HLC_history.shape[0] > atr_timeperiod:
brick_size = np.median(talib.ATR(high = np.double(HLC_history.iloc[:, 0]),
low = np.double(HLC_history.iloc[:, 1]),
close = np.double(HLC_history.iloc[:, 2]),
timeperiod = atr_timeperiod)[atr_timeperiod:])
return brick_size
def evaluate(self, method = 'simple'):
balance = 0
sign_changes = 0
price_ratio = len(self.source_prices) / len(self.renko_prices)
if method == 'simple':
for i in range(2, len(self.renko_directions)):
if self.renko_directions[i] == self.renko_directions[i - 1]:
balance = balance + 1
else:
balance = balance - 2
sign_changes = sign_changes + 1
if sign_changes == 0:
sign_changes = 1
score = balance / sign_changes
if score >= 0 and price_ratio >= 1:
score = np.log(score + 1) * np.log(price_ratio)
else:
score = -1.0
return {'balance': balance, 'sign_changes:': sign_changes,
'price_ratio': price_ratio, 'score': score}
def get_renko_prices(self):
return self.renko_prices
def get_renko_directions(self):
return self.renko_directions
def plot_renko(self, col_up = 'g', col_down = 'r'):
fig, ax = plt.subplots(1, figsize=(20, 10))
ax.set_title('Renko chart')
ax.set_xlabel('Renko bars')
ax.set_ylabel('Price')
# Calculate the limits of axes
ax.set_xlim(0.0,
len(self.renko_prices) + 1.0)
ax.set_ylim(np.min(self.renko_prices) - 3.0 * self.brick_size,
np.max(self.renko_prices) + 3.0 * self.brick_size)
# Plot each renko bar
for i in range(1, len(self.renko_prices)):
# Set basic params for patch rectangle
col = col_up if self.renko_directions[i] == 1 else col_down
x = i
y = self.renko_prices[i] - self.brick_size if self.renko_directions[i] == 1 else self.renko_prices[i]
height = self.brick_size
# Draw bar with params
ax.add_patch(
patches.Rectangle(
(x, y), # (x,y)
1.0, # width
self.brick_size, # height
facecolor = col
)
)
plt.show()from datetime import datetime, timedelta
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Indicators")
from System import *
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Data import *
from QuantConnect.Data.Market import *
from QuantConnect.Data.Custom import *
from QuantConnect.Algorithm import *
from QuantConnect.Python import *
import pyrenko
import pandas as pd
import scipy.optimize as opt
from scipy.stats import iqr
import numpy as np
class RenkoStrategy(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017,1,1) #Set Start Date
self.SetEndDate(2020,5,5) #Set End Date
self.SetCash(10000)
self.AddCrypto("BTCUSD", Resolution.Daily)
self.SetBrokerageModel(BrokerageName.Bitfinex, AccountType.Margin)
# Create Renko object
self.renko_obj = pyrenko.renko()
self.last_brick_size = 0.0
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(12, 0), self.LiquidateUnrealizedLosses)
def OnData(self, data):
if len(self.renko_obj.get_renko_prices()) == 0:
def evaluate_renko(brick, history, column_name):
self.renko_obj = pyrenko.renko()
self.renko_obj.set_brick_size(brick_size = brick, auto = False)
self.renko_obj.build_history(prices = history)
return self.renko_obj.evaluate()[column_name]
self.renko_obj = pyrenko.renko()
# I look up history close price in an hourly time frame for the last 15 days
history = self.History(self.Symbol("BTCUSD"), 360, Resolution.Hour)
history = pd.Series(history.close)
# Get daily absolute returns
diffs = history.diff(24).abs()
diffs = diffs[~np.isnan(diffs)]
# Calculate IQR of daily returns
iqr_diffs = np.percentile(diffs, [50, 80])
# Find the optimal brick size
opt_bs = opt.fminbound(lambda x: -evaluate_renko(brick = x,
history = history, column_name = 'score'),
iqr_diffs[0], iqr_diffs[1], disp=0)
# Build the model
#self.Debug(str(self.Time) + " " + 'Rebuilding Renko: ' + str(opt_bs))
self.last_brick_size = opt_bs
self.renko_obj.set_brick_size(brick_size = opt_bs, auto = False)
self.renko_obj.build_history(prices = history)
#Open a position
self.SetHoldings("BTCUSD", self.renko_obj.get_renko_directions()[-1])
else:
last_price = self.History(TradeBar, self.Symbol("BTCUSD"), 10, Resolution.Daily)
last_close = last_price['close'].tail(1)
last_close = pd.Series(last_close)
prev = self.renko_obj.get_renko_prices()
prev_dir = self.renko_obj.get_renko_directions()[-1]
num_created_bars = self.renko_obj.do_next(last_close)
if num_created_bars != 0:
self.Log('New Renko bars created')
self.Log('last price: ' + str(last_close[0]))
#self.Log('previous Renko price: ' + str(prev))
self.Log('current Renko price: ' + str(self.renko_obj.get_renko_prices()[-1]))
self.Log('direction: ' + str(prev_dir))
self.Log('brick size: ' + str(self.renko_obj.brick_size))
if np.sign(self.Portfolio["BTCUSD"].Quantity * self.renko_obj.get_renko_directions()[-1]) == -1:
self.Liquidate("BTCUSD")
self.renko_obj = pyrenko.renko()
def LiquidateUnrealizedLosses(self):
# if we overcome certain percentage of unrealized losses, liquidate'''
if (self.Portfolio.TotalUnrealizedProfit / self.Portfolio.TotalPortfolioValue) > 0.25:
self.Log("Liquidated due to unrealized losses at: {0}".format(self.Time))
self.Liquidate()
self.renko_obj = pyrenko.renko()