Overall Statistics
Total Trades
18
Average Win
2.40%
Average Loss
-2.70%
Compounding Annual Return
12.590%
Drawdown
35.400%
Expectancy
0.620
Net Profit
31.494%
Sharpe Ratio
0.464
Probabilistic Sharpe Ratio
15.140%
Loss Rate
14%
Win Rate
86%
Profit-Loss Ratio
0.89
Alpha
0.241
Beta
-1.327
Annual Standard Deviation
0.271
Annual Variance
0.074
Information Ratio
0.096
Tracking Error
0.408
Treynor Ratio
-0.095
Total Fees
$246.92
Estimated Strategy Capacity
$70000000.00
Lowest Capacity Asset
QQQ RIWIV7K5Z9LX
Portfolio Turnover
0.98%
# region imports
from AlgorithmImports import *

from math import log, sqrt, pi, exp
from scipy.stats import norm
import scipy.stats as stats
# endregion
Assignment1 = False
Assignment2 = True

## ------------------------ Assignment 1 AAPL Contracts info
if Assignment1:
    class FatYellowOwl(QCAlgorithm):

        def Initialize(self) -> None:
            self.SetStartDate(2022, 7, 14)
            self.SetEndDate(2022, 12, 1)
            self.SetCash(100000)
            self.SetWarmUp(20)
            
            # Universe settings
            self.UniverseSettings.Resolution = Resolution.Hour
            
            # Requesting data
            self.aapl = self.AddEquity("AAPL",Resolution.Hour)
            self.aapl.SetDataNormalizationMode(DataNormalizationMode.Raw)
            self.aapl = self.aapl.Symbol

            option = self.AddOption("AAPL",Resolution.Hour)
            self.option_symbol = option.Symbol
            
            # Volatility
            self.last_price = RollingWindow[float](2)
            self.std = StandardDeviation(25)

            # Risk Free rate
            self.yieldCurve = self.AddData(USTreasuryYieldCurveRate, "USTYCR", Resolution.Daily).Symbol
            self.last_rfr = RollingWindow[float](4)

            #Set Options Model: Equity Options are American Style
            # As such, need to use CrankNicolsonFD or other models.
            #BlackScholes Model is only for European Options
            option.PriceModel = OptionPriceModels.CrankNicolsonFD()

        ## ----------- Black Scholes ------------- ##
        # Price

        def bsm_price(self, option_type, sigma, s, k, r, T, q):
            # calculate the bsm price of European call and put options
            d1 = (np.log(s / k) + (r - q + sigma ** 2 * 0.5) * T) / (sigma * np.sqrt(T))
            d2 = d1 - sigma * np.sqrt(T)
            if option_type == 'c':
                return np.exp(-r*T) * (s * np.exp((r - q)*T) * stats.norm.cdf(d1) - k *  stats.norm.cdf(d2))
            if option_type == 'p':
                return np.exp(-r*T) * (k * stats.norm.cdf(-d2) - s * np.exp((r - q)*T) *  stats.norm.cdf(-d1))
            raise Exception(f'No such option type: {option_type}')

        def implied_vol(self, option_type, option_price, s, k, r, T, q):
            # apply bisection method to get the implied volatility by solving the BSM function
            precision = 0.00001
            upper_vol = 500.0
            max_vol = 500.0
            min_vol = 0.0001
            lower_vol = 0.0001
            iteration = 50

            while iteration > 0:
                iteration -= 1
                mid_vol = (upper_vol + lower_vol)/2
                price = self.bsm_price(option_type, mid_vol, s, k, r, T, q)

                if option_type == 'c':
                    lower_price = self.bsm_price(option_type, lower_vol, s, k, r, T, q)
                    if (lower_price - option_price) * (price - option_price) > 0:
                        lower_vol = mid_vol
                    else:
                        upper_vol = mid_vol
                    if mid_vol > max_vol - 5 :
                        return 0.000001

                if option_type == 'p':
                    upper_price = self.bsm_price(option_type, upper_vol, s, k, r, T, q)
                    if (upper_price - option_price) * (price - option_price) > 0:
                        upper_vol = mid_vol
                    else:
                        lower_vol = mid_vol

                if abs(price - option_price) < precision:
                    break

            return mid_vol

        def OnData(self, data: Slice) -> None:
            if data.get(self.yieldCurve):
                self.last_rfr.Add(data[self.yieldCurve].OneYear)
            if self.Time.hour == 10 and data.get(self.aapl): # Once per day
                self.last_price.Add(data[self.aapl].Price)
                if self.last_price.IsReady:
                    self.std.Update(self.Time, (self.last_price[0]-self.last_price[1])/self.last_price[1])
            if self.IsWarmingUp: #Wait until warming up is done
                return
            
            if self.Time == datetime(2022,10,14,15):
                rfr = self.last_rfr[0]
                vol = self.std.Current.Value*sqrt(252)
                T = abs(self.Time - datetime(2022,11,18)).days/365
                # Compute the price for the call option
                price = self.bsm_price('c',vol,data[self.aapl].Price, 145, rfr, T, 0)
                # Get the Implied volatility based on computed price 
                implied_volatility = self.implied_vol('c', price, data[self.aapl].Price, 145,rfr, T, 0)
                self.Log(f'At{self.Time} a Call option on AAPL with strike price 145 and expiry of 2022-11-18 has a price {price} and implied volatility {implied_volatility}')
            elif self.Time == datetime(2022,10,17,10):
                rfr = self.last_rfr[0]
                vol = self.std.Current.Value*sqrt(252)
                T = abs(self.Time - datetime(2022,11,11)).days
                # Compute the price for the call option
                price = self.bsm_price('p',vol,data[self.aapl].Price, 145, rfr, T, 0)
                # Get the Implied volatility based on computed price 
                implied_volatility = self.implied_vol('p', price, data[self.aapl].Price, 145,rfr, T, 0)
                self.Log(f'At{self.Time} a Put option on AAPL with strike price 130 and expiry of 2022-11-11 has a price {price} and implied volatility {implied_volatility}')
            elif self.Time == datetime(2022,10,17,13):
                self.interpolate_contract(data)
            
        def interpolate_contract(self,data):
            computed_prices = [None]
            computed_impvol = [None]
            deltas_t = [datetime(2022,12,16)]
            rfr = self.last_rfr[0]
            vol = self.std.Current.Value*sqrt(252)
            # Because we are using +- this will be month approximatly
            for i in range(10):
                delta_t_minus = abs(self.Time - (datetime(2022,12,16)-timedelta(days=i))).days
                delta_t_plus = abs(self.Time - (datetime(2022,12,16)+timedelta(days=i))).days
                deltas_t = [datetime(2022,12,16)+timedelta(days=i)] + deltas_t
                deltas_t.append(datetime(2022,12,16)+timedelta(days=i))
                # Compute the price and Implied volatility for the put option
                price = self.bsm_price('p', vol, data[self.aapl].Price, 145, rfr, delta_t_minus, 0)
                implied_volatility = self.implied_vol('p', price, data[self.aapl].Price, 145,rfr, delta_t_minus, 0)
                computed_prices = [price] + computed_prices
                computed_impvol = [implied_volatility] + computed_impvol

                # Compute the price and Implied volatility for the put option
                price = self.bsm_price('p', vol, data[self.aapl].Price, 145, rfr, delta_t_plus, 0)
                implied_volatility = self.implied_vol('p', price, data[self.aapl].Price, 145,rfr, delta_t_plus, 0)
                computed_prices.append(price)
                computed_impvol.append(implied_volatility)
            my_data = pd.DataFrame({'Price':computed_prices,'Implied_Volatility': computed_impvol}, index=deltas_t)
            intepolated = my_data.interpolate()
            self.Log(f'At {self.Time} the interpolated value for price and implied volatility with expiry 2022-12-16 is {str(intepolated.iloc[10])}')
            self.Log(f'The three previous values are {str(intepolated.iloc[7:10])}')
            self.Log(f'The three following values are {str(intepolated.iloc[11:14])}')
elif Assignment2:
    class FatYellowOwl(QCAlgorithm):
        # threshold to tel if an implied volatility is high (impvol/vol)
        ThHighVol = 1
        DesiredDelta = 1
        CallQuantity = 100

        def Initialize(self) -> None:
            self.SetStartDate(2020, 8, 11)
            self.SetEndDate(2022, 12, 1)
            self.SetCash(2000000)
            self.SetWarmUp(25)
            
            # Universe settings
            self.UniverseSettings.Resolution = Resolution.Hour
            
            # Requesting data
            self.underlaying = self.AddEquity("QQQ",Resolution.Hour)
            self.underlaying.SetDataNormalizationMode(DataNormalizationMode.Raw)
            self.underlaying = self.underlaying.Symbol

            option = self.AddOption("QQQ",Resolution.Hour)
            self.option_symbol = option.Symbol
            
            # Volatility
            self.last_price = RollingWindow[float](2)
            self.std = StandardDeviation(25)
            self.Schedule.On(self.DateRules.EveryDay("QQQ"),
                            self.TimeRules.BeforeMarketClose("QQQ", -1),
                            self.UpdateVol)

            # Risk Free rate
            self.yieldCurve = self.AddData(USTreasuryYieldCurveRate, "USTYCR", Resolution.Daily).Symbol
            self.last_rfr = RollingWindow[float](4)

            #Set Options Model: Equity Options are American Style
            # As such, need to use CrankNicolsonFD or other models.
            #BlackScholes Model is only for European Options
            option.PriceModel = OptionPriceModels.CrankNicolsonFD()

        ## ----------- Black Scholes ------------- ##
        # Update the daily
        def UpdateVol(self):
            self.last_price.Add(self.Securities[self.underlaying].Close)
            if self.last_price.IsReady:
                self.std.Update(self.Time, (self.last_price[0]-self.last_price[1])/self.last_price[1])

        # Delta
        def delta(self, option_type, s, k, sigma, r, T, q=0):
            '''
            Inputs
                - s: The underlying price.
                - k: The strike price of the option
                - sigma: The volatility of the underlying asset.
                - r: The risk-free interest rate
                - T: Time to expiration
                - q: The dividend yield
            '''
            d1 = (np.log(s / k) + (r - q + sigma ** 2 * 0.5) * T) / (sigma * np.sqrt(T))
            if option_type == "c":
                return exp(-q * T) * norm.cdf(d1)
            elif option_type == "p":
                return exp(-q * T) * (norm.cdf(d1)-1)

        def OnData(self, data: Slice) -> None:
            # Keep track of the one year interest rate available
            if data.get(self.yieldCurve):
                self.last_rfr.Add(data[self.yieldCurve].OneYear)
            if self.IsWarmingUp: #Wait until warming up is done
                return
            # Annual Volatility
            vol = self.std.Current.Value*sqrt(252)
            if self.Time == datetime(2021,11,1,12):
                chain = data.OptionChains.get(self.option_symbol)
                if chain:
                    # Select call contracts
                    calls = [contract for contract in chain if contract.Right == OptionRight.Call]
                if calls:
                    # Only expiry dates in 11/19/2021 and with strike prices > actual underlying price
                    my_call = filter(lambda c: c.Expiry == datetime(2021,11,19) and c.Strike >= c.UnderlyingLastPrice, calls)
                    self.my_call = sorted(my_call, key = lambda c: c.Strike - c.UnderlyingLastPrice)[0]
                    if self.my_call.ImpliedVolatility/vol > self.ThHighVol:
                        self.order = self.MarketOrder(self.my_call.Symbol, -self.CallQuantity)
                    else:
                        self.order = self.MarketOrder(self.my_call.Symbol, self.CallQuantity)
                    self.Schedule.On(self.DateRules.EveryDay("QQQ"),
                            self.TimeRules.AfterMarketOpen("QQQ", 10),
                            self.DeltaHedge)
                else:
                    self.Debug('No Call contracts finded')
            
        def DeltaHedge(self):
            # Risk Free Rate
            rfr = self.last_rfr[0]
            # Annual Volatility
            vol = self.std.Current.Value*sqrt(252)
            # Time to expiry
            T = abs(self.Time - datetime(2021,11,19)).days/365

            # Current Delta
            delta = self.delta('c', self.Securities[self.underlaying].Price, self.my_call.Strike, vol, rfr, T)
            # Current Shares
            holds = self.Securities[self.underlaying].Holdings.Quantity
            # Difference to the desired Delta converted to shares
            target = (self.DesiredDelta - delta) * 100
            quantity_order = target - holds
            if quantity_order > 0:
                self.Log(f'Current {self.underlaying.Value} Holds')
                self.order = self.MarketOrder(self.underlaying, -quantity_order)
                self.Log(f'For a Delta of {delta}, a market order of {-quantity_order} shares in ' +\
                        f'{self.underlaying.Value} was created to mantain the Desired delta of {self.DesiredDelta}')
else:
    class FatYellowOwl(QCAlgorithm):
        # threshold to tell if an implied volatility is high (impvol/vol)
        ThHighVol = 1
        # The Delta that want to be mantained during the holding of the call
        DesiredDelta = 0.5
        # Initial order quantity for call option
        CallQuantity = 100
        # Minimum difference to select the contract (strike - underlying), 
        # negative will allow strike prices below the current underlying.price
        # The contract with the lowest price will be selected 
        StrikeDiff = -5
        # Days to expiry
        TimeDelta = 90

        def Initialize(self) -> None:
            self.SetStartDate(2020, 8, 11)
            self.SetEndDate(2022, 12, 1)
            self.SetCash(2000000)
            self.SetWarmUp(25)
            
            # Universe settings
            self.UniverseSettings.Resolution = Resolution.Hour
            
            # Requesting data
            self.underlaying = self.AddEquity("QQQ",Resolution.Hour)
            self.underlaying.SetDataNormalizationMode(DataNormalizationMode.Raw)
            self.underlaying = self.underlaying.Symbol

            option = self.AddOption("QQQ",Resolution.Hour)
            self.option_symbol = option.Symbol
            
            # Volatility
            self.last_price = RollingWindow[float](2)
            self.std = StandardDeviation(25)
            self.Schedule.On(self.DateRules.EveryDay("QQQ"),
                            self.TimeRules.BeforeMarketClose("QQQ", -1),
                            self.UpdateVol)

            # Risk Free rate
            self.yieldCurve = self.AddData(USTreasuryYieldCurveRate, "USTYCR", Resolution.Daily).Symbol
            self.last_rfr = RollingWindow[float](4)

            #Set Options Model: Equity Options are American Style
            # As such, need to use CrankNicolsonFD or other models.
            #BlackScholes Model is only for European Options
            option.PriceModel = OptionPriceModels.CrankNicolsonFD()

        ## ----------- Black Scholes ------------- ##
        # Update the daily
        def UpdateVol(self):
            self.last_price.Add(self.Securities[self.underlaying].Close)
            if self.last_price.IsReady:
                self.std.Update(self.Time, (self.last_price[0]-self.last_price[1])/self.last_price[1])

        # Delta
        def delta(self, option_type, s, k, sigma, r, T, q=0):
            '''
            Inputs
                - s: The underlying price.
                - k: The strike price of the option
                - sigma: The volatility of the underlying asset.
                - r: The risk-free interest rate
                - T: Time to expiration
                - q: The dividend yield
            '''
            d1 = (np.log(s / k) + (r - q + sigma ** 2 * 0.5) * T) / (sigma * np.sqrt(T))
            if option_type == "c":
                return exp(-q * T) * norm.cdf(d1)
            elif option_type == "p":
                return exp(-q * T) * (norm.cdf(d1)-1)

        def OnData(self, data: Slice) -> None:
            # Keep track of the one year interest rate available
            if data.get(self.yieldCurve):
                self.last_rfr.Add(data[self.yieldCurve].OneYear)
            if self.IsWarmingUp: #Wait until warming up is done
                return
            # Annual Volatility
            vol = self.std.Current.Value*sqrt(252)
            if self.Time == datetime(2021,11,1,12):
                chain = data.OptionChains.get(self.option_symbol)
                if chain:
                    # Select call contracts
                    calls = [contract for contract in chain if contract.Right == OptionRight.Call]
                if calls:
                    # Only expiry dates in 11/19/2021 and with strike prices > actual underlying price
                    my_call = filter(lambda c: (c.Expiry - self.Time).days > self.TimeDelta and c.Strike - c.UnderlyingLastPrice >= self.StrikeDiff, calls)
                    self.my_call = sorted(my_call, key = lambda c: c.Strike - c.UnderlyingLastPrice)[0]
                    if self.my_call.ImpliedVolatility/vol > self.ThHighVol:
                        self.order = self.MarketOrder(self.my_call.Symbol, -self.CallQuantity)
                    else:
                        self.order = self.MarketOrder(self.my_call.Symbol, self.CallQuantity)
                    self.Schedule.On(self.DateRules.EveryDay("QQQ"),
                            self.TimeRules.AfterMarketOpen("QQQ", 10),
                            self.DeltaHedge)
                else:
                    self.Debug('No Call contracts finded')
            
        def DeltaHedge(self):
            # Risk Free Rate
            rfr = self.last_rfr[0]
            # Annual Volatility
            vol = self.std.Current.Value*sqrt(252)
            # Time to expiry
            T = abs(self.Time - datetime(2021,11,19)).days/365

            # Current Delta
            delta = self.delta('c', self.Securities[self.underlaying].Price, self.my_call.Strike, vol, rfr, T)
            # Current Shares
            holds = self.Securities[self.underlaying].Holdings.Quantity
            # Difference to the desired Delta converted to shares
            target = (self.DesiredDelta - delta) * 100
            quantity_order = target - holds
            if quantity_order > 0:
                self.Log(f'Current {self.underlaying.Value} Holds')
                self.order = self.MarketOrder(self.underlaying, -quantity_order)
                self.Log(f'For a Delta of {delta}, a market order of {-quantity_order} shares in ' +\
                        f'{self.underlaying.Value} was created to mantain the Desired delta of {self.DesiredDelta}')