백인감자

[AWS] 업비트 RSI Open API 텔레그램 봇 연동 메시지 전송 본문

AWS

[AWS] 업비트 RSI Open API 텔레그램 봇 연동 메시지 전송

백인감자 2023. 7. 13. 23:11

목표

 

일봉기준으로 RSI 가 30 이하이거나 70이상인 경우 텔레그램으로 알림을 전송한다.

 

0. 텔레그램 설치 및 chat_id 확인 등은 아래 블로그 참고

https://junuuu.tistory.com/492

 

업비트 RSI 알림이 만들기(with 텔레그램 봇)

개요 Python, Upbit API, Telegram API를 활용해서 업비트 RSI 알림이를 만들어 보겠습니다. 여기서 RSI란 Relative Strengh Index의 약자입니다. 상대 강도지수라고 불리는 보조지표로 0에 가까울수록 파는 힘이

junuuu.tistory.com

 

 아래 블로그를 통해서 프로젝트 구조 등을 설정하였다.

https://technfin.tistory.com/entry/RSI-%EC%83%81%EB%8C%80%EA%B0%95%EB%8F%84%EC%A7%80%EC%88%98-%EA%B5%AC%ED%95%98%EA%B8%B0-%ED%8C%8C%EC%9D%B4%EC%8D%AC-%EC%97%85%EB%B9%84%ED%8A%B8-%EB%B9%84%ED%8A%B8%EC%BD%94%EC%9D%B8-%EC%9E%90%EB%8F%99%EB%A7%A4%EB%A7%A4

 

RSI 상대강도지수 구하기 - 파이썬 업비트 비트코인 자동매매

주식과는 마찬가지로 코인 시장에서도 차트를 이용한 매매 기법을 사용하실 수 있습니다. 그 중에서도 RSI(상대강도지수)는 상당히 많이 사용되는 보조지표중에 하나 입니다. 앞으로 업비트 API

technfin.tistory.com

 

 

1. AWS 인스턴스에 python 및 pip 설치

https://docs.aws.amazon.com/ko_kr/elasticbeanstalk/latest/dg/eb-cli3-install-linux.html

 

Linux에 Python, pip 및 EB CLI 설치 - AWS Elastic Beanstalk

Linux에 Python, pip 및 EB CLI 설치 EB CLI에는 Python 2.7, 3.4 또는 그 이상이 필요합니다. 배포가 Python과 함께 제공되지 않았거나 이전 버전과 함께 제공된 경우 pip 및 EB CLI를 설치하기 전에 Python을 설치

docs.aws.amazon.com

 

 

 

2. AWS 인스턴스에 텔레그램 봇 설치

 코드 :  pip install python-telegram-bot

 

3-1. 공통모듈코드(upbit.py)

 

import logging
import traceback
import requests
import pandas as pd
import time
import smtplib
import jwt
import sys
import uuid
import hashlib
import datetime
import math
import numpy as np
import pyupbit
import telegram
import asyncio

from decimal import Decimal
from urllib.parse import urlencode


"""
업비트 일봉기준 RSI 30이하, 70이상일때 메시지 전송
"""

# Keys
access_key = '업비트에서 발급받은 Access Key'
secret_key = '업비트에서 발급받은 Secret Key'
server_url = 'https://api.upbit.com'
chat_id_value = '텔레그램 봇 chat_id'
token_value = '텔레그램 봇 token'
tickers = pyupbit.get_tickers(fiat="KRW")

#전역변수
global wait_dict
wait_dict = {}

# -----------------------------------------------------------------------------
# - Name : set_loglevel
# - Desc : 로그레벨 설정
# - Input
#   1) level : 로그레벨
#     1. D(d) : DEBUG
#     2. E(e) : ERROR
#     3. 그외(기본) : INFO
# - Output
# -----------------------------------------------------------------------------
def set_loglevel(level):
    try:
 
        # ---------------------------------------------------------------------
        # 로그레벨 : DEBUG
        # ---------------------------------------------------------------------
        if level.upper() == "D":
            logging.basicConfig(
                format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s',
                datefmt='%Y/%m/%d %I:%M:%S %p',
                level=logging.DEBUG
            )
        # ---------------------------------------------------------------------
        # 로그레벨 : ERROR
        # ---------------------------------------------------------------------
        elif level.upper() == "E":
            logging.basicConfig(
                format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s',
                datefmt='%Y/%m/%d %I:%M:%S %p',
                level=logging.ERROR
            )
        # ---------------------------------------------------------------------
        # 로그레벨 : INFO
        # ---------------------------------------------------------------------
        else:
            # -----------------------------------------------------------------------------
            # 로깅 설정
            # 로그레벨(DEBUG, INFO, WARNING, ERROR, CRITICAL)
            # -----------------------------------------------------------------------------
            logging.basicConfig(
                format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s',
                datefmt='%Y/%m/%d %I:%M:%S %p',
                level=logging.INFO
            )
 
    # ----------------------------------------
    # Exception Raise
    # ----------------------------------------
    except Exception:
        raise



# -----------------------------------------------------------------------------
# - Name : send_request
# - Desc : 리퀘스트 처리
# - Input
#   1) reqType : 요청 타입
#   2) reqUrl : 요청 URL
#   3) reqParam : 요청 파라메타
#   4) reqHeader : 요청 헤더
# - Output
#   4) reponse : 응답 데이터
# -----------------------------------------------------------------------------
def send_request(reqType, reqUrl, reqParam, reqHeader):
    try:
 
        # 요청 가능회수 확보를 위해 기다리는 시간(초)
        err_sleep_time = 0.3
 
        # 요청에 대한 응답을 받을 때까지 반복 수행
        while True:
 
            # 요청 처리
            response = requests.request(reqType, reqUrl, params=reqParam, headers=reqHeader)
 
            # 요청 가능회수 추출
            if 'Remaining-Req' in response.headers:
 
                hearder_info = response.headers['Remaining-Req']
                start_idx = hearder_info.find("sec=")
                end_idx = len(hearder_info)
                remain_sec = hearder_info[int(start_idx):int(end_idx)].replace('sec=', '')
            else:
                logging.error("헤더 정보 이상")
                logging.error(response.headers)
                break
 
            # 요청 가능회수가 3개 미만이면 요청 가능회수 확보를 위해 일정시간 대기
            if int(remain_sec) < 3:
                logging.debug("요청 가능회수 한도 도달! 남은횟수:" + str(remain_sec))
                time.sleep(err_sleep_time)
 
            # 정상 응답
            if response.status_code == 200 or response.status_code == 201:
                break
            # 요청 가능회수 초과인 경우
            elif response.status_code == 429:
                logging.error("요청 가능회수 초과!:" + str(response.status_code))
                time.sleep(err_sleep_time)
            # 그 외 오류
            else:
                logging.error("기타 에러:" + str(response.status_code))
                logging.error(response.status_code)
                break
 
            # 요청 가능회수 초과 에러 발생시에는 다시 요청
            logging.info("[restRequest] 요청 재처리중...")
 
        return response
 
    # ----------------------------------------
    # Exception Raise
    # ----------------------------------------
    except Exception:
        raise


# -----------------------------------------------------------------------------
# - Name : send_telegram_message
# - Desc : 텔레그램 메시지 전송
# - Input
#   1) chat_id : chat_id
#   2) token : token
#   3) message : 보낼 메시지
# - Output

async def send_telegram_message(chat_id_value, token_value, message):
    try:
        bot = telegram.Bot(token=token_value)
        await bot.sendMessage(chat_id=chat_id_value, text=message)
    except Exception as e:
        print(f"텔레그램 메시지 전송오류 : {e}")
# -----------------------------------------------------------------------------
# - Name : get_candle
# - Desc : 캔들 조회
# - Input
#   1) target_item : 대상 종목
#   2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 - 분, D-일, W-주, M-월)
#   3) inq_range : 조회 범위
# - Output
#   1) 캔들 정보 배열
# -----------------------------------------------------------------------------
def get_candle(target_item, tick_kind, inq_range):
    try:
 
        # ----------------------------------------
        # Tick 별 호출 URL 설정
        # ----------------------------------------
        # 분붕
        if tick_kind == "1" or tick_kind == "3" or tick_kind == "5" or tick_kind == "10" or tick_kind == "15" or tick_kind == "30" or tick_kind == "60" or tick_kind == "240":
            target_url = "minutes/" + tick_kind
        # 일봉
        elif tick_kind == "D":
            target_url = "days"
        # 주봉
        elif tick_kind == "W":
            target_url = "weeks"
        # 월봉
        elif tick_kind == "M":
            target_url = "months"
        # 잘못된 입력
        else:
            raise Exception("잘못된 틱 종류:" + str(tick_kind))
 
        #logging.debug(target_url)
 
        # ----------------------------------------
        # Tick 조회
        # ----------------------------------------
        querystring = {"market": target_item, "count": inq_range}
        res = send_request("GET", server_url + "/v1/candles/" + target_url, querystring, "")
        candle_data = res.json()
 
        #logging.debug(candle_data)
 
        return candle_data
 
    # ----------------------------------------
    # Exception Raise
    # ----------------------------------------
    except Exception:
        raise
    
async def get_rsi(target_item, tick_kind, inq_range):

         # 캔들 추출
        candle_data = get_candle(target_item, tick_kind, inq_range)
    
        df = pd.DataFrame(candle_data)
    
        df=df.reindex(index=df.index[::-1]).reset_index()
    
        df['close']=df["trade_price"]
        
        
    
        def rsi(ohlc: pd.DataFrame, period: int = 14):
            ohlc["close"] = ohlc["close"]
            delta = ohlc["close"].diff()
    
            up, down = delta.copy(), delta.copy()
            up[up < 0] = 0
            down[down > 0] = 0
    
            _gain = up.ewm(com=(period - 1), min_periods=period).mean()
            _loss = down.abs().ewm(com=(period - 1), min_periods=period).mean()
    
            RS = _gain / _loss
            return pd.Series(100 - (100 / (1 + RS)), name="RSI")
    
        rsi = rsi(df, 14).iloc[-1]
        print(target_item)
        print('Upbit Day RSI:', rsi)
        
        

        
        
        
        if rsi >= 70 and target_item not in wait_dict:
            text = str(target_item) + " : " + "RSI " + str(round(rsi))
            await send_telegram_message(chat_id_value,token_value, text)
            wait_dict[target_item] = datetime.datetime.now().minute
            
        if rsi <= 30 and target_item not in wait_dict:  
            text = str(target_item) + " : " + "RSI " + str(round(rsi))
            await send_telegram_message(chat_id_value,token_value, text)
            wait_dict[target_item] = datetime.datetime.now().minute        
        print('')
        


"""
#모듈화 없이 해당 소스에서 실행하는 경우 아래 주석 해제 
# 비동기 함수 실행
async def main():
    for ticker in tickers:
        try:
            await get_rsi(ticker, 'D', '200')
            #print(ticker)
        except Exception as e:
            print(f" 전송오류 : {e}")
            continue

# 이벤트 루프 생성 및 실행
if __name__ == "__main__":
    asyncio.run(main())
"""

 

3-2. 메인함수 코드(upbit_rsi_day.py)

import upbit

#로그레벨 설정
#upbit.set_loglevel("D");

# 비동기 함수 실행
async def main():
    for ticker in upbit.tickers:
        try:
            await upbit.get_rsi(ticker, 'D', '200') # 일단위 200개

        except Exception as e:
            print(f" 전송오류 : {e}")
            continue

# 이벤트 루프 생성 및 실행
if __name__ == "__main__":
    upbit.asyncio.run(main())

 

4. AWS 인스턴스에 위 소스파일 옮기기(FTP 등) 

내 경우에는 로컬pc에서 작성 후 winSCP 를 통해 FTP로 파일을 옮겼다.

 

5. 스케쥴 설정

위 소스에서 While True 를 추가하면 쉬지않고 돌겠지만 내 경우에는 특정 시간에만 받고 싶었기 때문에 cron 에 적용해보려한다.

 

5-1. cron 설치

 

# cron 설치
sudo yum update -y
sudo yum install -y cronie
# cron 시작
sudo systemctl start crond
# cron systemctl 활성화
sudo systemctl enable crond
# cron systemctl 등록 확인
sudo systemctl list-unit-files | grep crond

 

5-2(선택사항) . 스크립트 파일 생성

 

스크립트명 : run_upbit_api.sh

python3 /home/ec2-user/upbit_rsi_day.py &

 

5-3. crontab 등록 

스케쥴의 경우 매일 한국시간기준 오전9시 실행(UTC 0시)

 crontab -e 입력 후

 

* 0 * * * /home/ec2-user/run_upbit_api.sh

 

crontab -l 을 입력하면 정상적으로 조회되는것을 확인할 수 있다.

 

 

+) cron 실행 안되는 경우

스크립트 파일이나 파이썬 소스코드의 권한이 불충분해서 그럴 수 있으니 

권한에 크게 민감하지않으면 chmod 755 파일명 입력하면 될 것이다.

 

 

텔레그램 봇 메시지 정상수신

 

 

 

 

 

 

위 과정을 진행하면서 발생한 오류 및 대처법

 

기존 블로그들의 소스코드를 무차별로 복붙하다보니 오류들이 계속 발생하는데 하나하나 디버깅 하면서 교정하면된다.

아래의 경우 텔레그램 봇 라이브러리가 업데이트 되면서   asyncio 라이브러리 코드를 적용해야하는 이슈였다.

 

RuntimeWarning: coroutine 'Bot.send_message' was never awaited

--> https://fromitot.tistory.com/82

 

[Python] 텔레그램 봇 생성 및 메시지 송신

파이썬을 사용하여 텔레그램 봇을 만들어 볼 것이다. 카카오톡은 오픈채팅에서 사용할 수 있는 봇을 추가할 수 있지만 텔레그램에서는 그런 기능을 직접 코드로 구현해야되는 것으로 보인다.

fromitot.tistory.com

 

https://www.inflearn.com/questions/737370/%ED%85%94%EB%A0%88%EA%B7%B8%EB%9E%A8-%EB%B4%87-%EB%AA%85%EB%A0%B9%EC%96%B4-%EC%98%A4%EB%A5%98-%EC%A7%88%EB%AC%B8%EB%93%9C%EB%A6%BD%EB%8B%88%EB%8B%A4

 

텔레그램 봇 명령어 오류 질문드립니다. - 인프런 | 질문 & 답변

이 질문글 바로 이전과 비슷한 증상을 겪고 있는데요.텔레그램 봇 명령어가 안먹힙니다.sendMessage 명령어 입력해서 실행하면RuntimeWarning: coroutine 'Bot.send_message' was never awaitedRuntimeWarning: En...

www.inflearn.com

 

 

 

 

 

Comments