Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.0k views
in Technique[技术] by (71.8m points)

algorithm - How to implement RSI Divergence in Python

I was wondering is there any Python library that covers RSI-Divergence (difference between a fast and a slow RSI) or any guidence about how can I implement its algorithm in Python.

Already asked question: Programmatically detect RSI divergence. One of the answer suggests quantconnect forum for the Python version but it does not cover anything.

I was not able to find its mathematical formula but I was able to find the RSI-Divergence in pine-script, as below, but I was not able to convert it into Python since its not possible to debug pine-script using tradingview.

study(title="RSI Divergence", shorttitle="RSI Divergence")
src_fast = close, len_fast = input(5, minval=1, title="Length Fast RSI")
src_slow = close, len_slow = input(14,minval=1, title="Length Slow RSI")
up_fast = rma(max(change(src_fast), 0), len_fast)
down_fast = rma(-min(change(src_fast), 0), len_fast)
rsi_fast = down_fast == 0 ? 100 : up_fast == 0 ? 0 : 100 - (100 / (1 + up_fast / down_fast))
up_slow = rma(max(change(src_slow), 0), len_slow)
down_slow = rma(-min(change(src_slow), 0), len_slow)
rsi_slow = down_slow == 0 ? 100 : up_slow == 0 ? 0 : 100 - (100 / (1 + up_slow / down_slow))
divergence = rsi_fast - rsi_slow
plotdiv = plot(divergence, color = divergence > 0 ? lime:red, linewidth = 2)
band = hline(0)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I found this on the next link: Back Testing RSI Divergence Strategy on FX

The author of the post used the exponential moving average for RSI calculation, using this piece of code:

'''
Assuming you have a pandas OHLC Dataframe downloaded from Metatrader 5 historical data. 
'''
# Get the difference in price from previous step
Data = pd.DataFrame(Data)
delta = Data.iloc[:, 3].diff()
delta = delta[1:]

# Make the positive gains (up) and negative gains (down) Series
up, down = delta.copy(), delta.copy()
up[up < 0] = 0
down[down > 0] = 0
roll_up = pd.stats.moments.ewma(up, lookback)
roll_down = pd.stats.moments.ewma(down.abs(), lookback)

# Calculate the SMA
roll_up = roll_up[lookback:]
roll_down = roll_down[lookback:]
Data = Data.iloc[lookback + 1:,].values

# Calculate the RSI based on SMA
RS = roll_up / roll_down
RSI = (100.0 - (100.0 / (1.0 + RS)))
RSI = np.array(RSI)
RSI = np.reshape(RSI, (-1, 1))

Data = np.concatenate((Data, RSI), axis = 1)

At this point we have an array with OHLC data and a fifth column that has the RSI in it. Then added the next two columns:

  1. Column 6: Data[:, 5] will be for the bullish divergences and will have values of 0 or 1 (initiate buy).
  2. Column 7: Data[:, 6] will be for the bearish divergences and will have values of 0 or -1 (initiate short).

using this variables:

lower_barrier = 30
upper_barrier = 70
width = 10

Here is the code:

# Bullish Divergence
for i in range(len(Data)):
   try:
       if Data[i, 4] < lower_barrier:
           for a in range(i + 1, i + width):
               if Data[a, 4] > lower_barrier:
                    for r in range(a + 1, a + width):
                       if Data[r, 4] < lower_barrier and 
                        Data[r, 4] > Data[i, 4] and Data[r, 3] < Data[i, 3]:
                            for s in range(r + 1, r + width): 
                                if Data[s, 4] > lower_barrier:
                                    Data[s + 1, 5] = 1
                                    break
                                else:
                                    continue
                        else:
                            continue
                    else:
                        continue
                else:
                    continue
  except IndexError:
        pass

# Bearish Divergence
for i in range(len(Data)):
   try:
       if Data[i, 4] > upper_barrier:
           for a in range(i + 1, i + width): 
               if Data[a, 4] < upper_barrier:
                   for r in range(a + 1, a + width):
                       if Data[r, 4] > upper_barrier and 
                       Data[r, 4] < Data[i, 4] and Data[r, 3] > Data[i, 3]:
                           for s in range(r + 1, r + width):
                               if Data[s, 4] < upper_barrier:
                                   Data[s + 1, 6] = -1
                                   break
                               else:
                                   continue
                       else:
                           continue
                   else:
                       continue
               else:
                   continue
   except IndexError:
       pass

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...