백인감자
[AWS] 업비트 RSI Open API 텔레그램 봇 연동 메시지 전송 본문
목표
일봉기준으로 RSI 가 30 이하이거나 70이상인 경우 텔레그램으로 알림을 전송한다.
0. 텔레그램 설치 및 chat_id 확인 등은 아래 블로그 참고
https://junuuu.tistory.com/492
아래 블로그를 통해서 프로젝트 구조 등을 설정하였다.
1. AWS 인스턴스에 python 및 pip 설치
https://docs.aws.amazon.com/ko_kr/elasticbeanstalk/latest/dg/eb-cli3-install-linux.html
2. AWS 인스턴스에 텔레그램 봇 설치
코드 : pip install python-telegram-bot
3-1. 공통모듈코드(upbit.py)
import logging
import traceback
import requests
import pandas as pd
import time
import smtplib
import jwt
import sys
import uuid
import hashlib
import datetime
import math
import numpy as np
import pyupbit
import telegram
import asyncio
from decimal import Decimal
from urllib.parse import urlencode
"""
업비트 일봉기준 RSI 30이하, 70이상일때 메시지 전송
"""
# Keys
access_key = '업비트에서 발급받은 Access Key'
secret_key = '업비트에서 발급받은 Secret Key'
server_url = 'https://api.upbit.com'
chat_id_value = '텔레그램 봇 chat_id'
token_value = '텔레그램 봇 token'
tickers = pyupbit.get_tickers(fiat="KRW")
#전역변수
global wait_dict
wait_dict = {}
# -----------------------------------------------------------------------------
# - Name : set_loglevel
# - Desc : 로그레벨 설정
# - Input
# 1) level : 로그레벨
# 1. D(d) : DEBUG
# 2. E(e) : ERROR
# 3. 그외(기본) : INFO
# - Output
# -----------------------------------------------------------------------------
def set_loglevel(level):
try:
# ---------------------------------------------------------------------
# 로그레벨 : DEBUG
# ---------------------------------------------------------------------
if level.upper() == "D":
logging.basicConfig(
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s',
datefmt='%Y/%m/%d %I:%M:%S %p',
level=logging.DEBUG
)
# ---------------------------------------------------------------------
# 로그레벨 : ERROR
# ---------------------------------------------------------------------
elif level.upper() == "E":
logging.basicConfig(
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s',
datefmt='%Y/%m/%d %I:%M:%S %p',
level=logging.ERROR
)
# ---------------------------------------------------------------------
# 로그레벨 : INFO
# ---------------------------------------------------------------------
else:
# -----------------------------------------------------------------------------
# 로깅 설정
# 로그레벨(DEBUG, INFO, WARNING, ERROR, CRITICAL)
# -----------------------------------------------------------------------------
logging.basicConfig(
format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]:%(message)s',
datefmt='%Y/%m/%d %I:%M:%S %p',
level=logging.INFO
)
# ----------------------------------------
# Exception Raise
# ----------------------------------------
except Exception:
raise
# -----------------------------------------------------------------------------
# - Name : send_request
# - Desc : 리퀘스트 처리
# - Input
# 1) reqType : 요청 타입
# 2) reqUrl : 요청 URL
# 3) reqParam : 요청 파라메타
# 4) reqHeader : 요청 헤더
# - Output
# 4) reponse : 응답 데이터
# -----------------------------------------------------------------------------
def send_request(reqType, reqUrl, reqParam, reqHeader):
try:
# 요청 가능회수 확보를 위해 기다리는 시간(초)
err_sleep_time = 0.3
# 요청에 대한 응답을 받을 때까지 반복 수행
while True:
# 요청 처리
response = requests.request(reqType, reqUrl, params=reqParam, headers=reqHeader)
# 요청 가능회수 추출
if 'Remaining-Req' in response.headers:
hearder_info = response.headers['Remaining-Req']
start_idx = hearder_info.find("sec=")
end_idx = len(hearder_info)
remain_sec = hearder_info[int(start_idx):int(end_idx)].replace('sec=', '')
else:
logging.error("헤더 정보 이상")
logging.error(response.headers)
break
# 요청 가능회수가 3개 미만이면 요청 가능회수 확보를 위해 일정시간 대기
if int(remain_sec) < 3:
logging.debug("요청 가능회수 한도 도달! 남은횟수:" + str(remain_sec))
time.sleep(err_sleep_time)
# 정상 응답
if response.status_code == 200 or response.status_code == 201:
break
# 요청 가능회수 초과인 경우
elif response.status_code == 429:
logging.error("요청 가능회수 초과!:" + str(response.status_code))
time.sleep(err_sleep_time)
# 그 외 오류
else:
logging.error("기타 에러:" + str(response.status_code))
logging.error(response.status_code)
break
# 요청 가능회수 초과 에러 발생시에는 다시 요청
logging.info("[restRequest] 요청 재처리중...")
return response
# ----------------------------------------
# Exception Raise
# ----------------------------------------
except Exception:
raise
# -----------------------------------------------------------------------------
# - Name : send_telegram_message
# - Desc : 텔레그램 메시지 전송
# - Input
# 1) chat_id : chat_id
# 2) token : token
# 3) message : 보낼 메시지
# - Output
async def send_telegram_message(chat_id_value, token_value, message):
try:
bot = telegram.Bot(token=token_value)
await bot.sendMessage(chat_id=chat_id_value, text=message)
except Exception as e:
print(f"텔레그램 메시지 전송오류 : {e}")
# -----------------------------------------------------------------------------
# - Name : get_candle
# - Desc : 캔들 조회
# - Input
# 1) target_item : 대상 종목
# 2) tick_kind : 캔들 종류 (1, 3, 5, 10, 15, 30, 60, 240 - 분, D-일, W-주, M-월)
# 3) inq_range : 조회 범위
# - Output
# 1) 캔들 정보 배열
# -----------------------------------------------------------------------------
def get_candle(target_item, tick_kind, inq_range):
try:
# ----------------------------------------
# Tick 별 호출 URL 설정
# ----------------------------------------
# 분붕
if tick_kind == "1" or tick_kind == "3" or tick_kind == "5" or tick_kind == "10" or tick_kind == "15" or tick_kind == "30" or tick_kind == "60" or tick_kind == "240":
target_url = "minutes/" + tick_kind
# 일봉
elif tick_kind == "D":
target_url = "days"
# 주봉
elif tick_kind == "W":
target_url = "weeks"
# 월봉
elif tick_kind == "M":
target_url = "months"
# 잘못된 입력
else:
raise Exception("잘못된 틱 종류:" + str(tick_kind))
#logging.debug(target_url)
# ----------------------------------------
# Tick 조회
# ----------------------------------------
querystring = {"market": target_item, "count": inq_range}
res = send_request("GET", server_url + "/v1/candles/" + target_url, querystring, "")
candle_data = res.json()
#logging.debug(candle_data)
return candle_data
# ----------------------------------------
# Exception Raise
# ----------------------------------------
except Exception:
raise
async def get_rsi(target_item, tick_kind, inq_range):
# 캔들 추출
candle_data = get_candle(target_item, tick_kind, inq_range)
df = pd.DataFrame(candle_data)
df=df.reindex(index=df.index[::-1]).reset_index()
df['close']=df["trade_price"]
def rsi(ohlc: pd.DataFrame, period: int = 14):
ohlc["close"] = ohlc["close"]
delta = ohlc["close"].diff()
up, down = delta.copy(), delta.copy()
up[up < 0] = 0
down[down > 0] = 0
_gain = up.ewm(com=(period - 1), min_periods=period).mean()
_loss = down.abs().ewm(com=(period - 1), min_periods=period).mean()
RS = _gain / _loss
return pd.Series(100 - (100 / (1 + RS)), name="RSI")
rsi = rsi(df, 14).iloc[-1]
print(target_item)
print('Upbit Day RSI:', rsi)
if rsi >= 70 and target_item not in wait_dict:
text = str(target_item) + " : " + "RSI " + str(round(rsi))
await send_telegram_message(chat_id_value,token_value, text)
wait_dict[target_item] = datetime.datetime.now().minute
if rsi <= 30 and target_item not in wait_dict:
text = str(target_item) + " : " + "RSI " + str(round(rsi))
await send_telegram_message(chat_id_value,token_value, text)
wait_dict[target_item] = datetime.datetime.now().minute
print('')
"""
#모듈화 없이 해당 소스에서 실행하는 경우 아래 주석 해제
# 비동기 함수 실행
async def main():
for ticker in tickers:
try:
await get_rsi(ticker, 'D', '200')
#print(ticker)
except Exception as e:
print(f" 전송오류 : {e}")
continue
# 이벤트 루프 생성 및 실행
if __name__ == "__main__":
asyncio.run(main())
"""
3-2. 메인함수 코드(upbit_rsi_day.py)
import upbit
#로그레벨 설정
#upbit.set_loglevel("D");
# 비동기 함수 실행
async def main():
for ticker in upbit.tickers:
try:
await upbit.get_rsi(ticker, 'D', '200') # 일단위 200개
except Exception as e:
print(f" 전송오류 : {e}")
continue
# 이벤트 루프 생성 및 실행
if __name__ == "__main__":
upbit.asyncio.run(main())
4. AWS 인스턴스에 위 소스파일 옮기기(FTP 등)
내 경우에는 로컬pc에서 작성 후 winSCP 를 통해 FTP로 파일을 옮겼다.
5. 스케쥴 설정
위 소스에서 While True 를 추가하면 쉬지않고 돌겠지만 내 경우에는 특정 시간에만 받고 싶었기 때문에 cron 에 적용해보려한다.
5-1. cron 설치
# cron 설치
sudo yum update -y
sudo yum install -y cronie
# cron 시작
sudo systemctl start crond
# cron systemctl 활성화
sudo systemctl enable crond
# cron systemctl 등록 확인
sudo systemctl list-unit-files | grep crond
5-2(선택사항) . 스크립트 파일 생성
스크립트명 : run_upbit_api.sh
python3 /home/ec2-user/upbit_rsi_day.py &
5-3. crontab 등록
스케쥴의 경우 매일 한국시간기준 오전9시 실행(UTC 0시)
crontab -e 입력 후
* 0 * * * /home/ec2-user/run_upbit_api.sh
crontab -l 을 입력하면 정상적으로 조회되는것을 확인할 수 있다.
+) cron 실행 안되는 경우
스크립트 파일이나 파이썬 소스코드의 권한이 불충분해서 그럴 수 있으니
권한에 크게 민감하지않으면 chmod 755 파일명 입력하면 될 것이다.
위 과정을 진행하면서 발생한 오류 및 대처법
기존 블로그들의 소스코드를 무차별로 복붙하다보니 오류들이 계속 발생하는데 하나하나 디버깅 하면서 교정하면된다.
아래의 경우 텔레그램 봇 라이브러리가 업데이트 되면서 asyncio 라이브러리 코드를 적용해야하는 이슈였다.
RuntimeWarning: coroutine 'Bot.send_message' was never awaited
--> https://fromitot.tistory.com/82
'AWS' 카테고리의 다른 글
[AWS] 탄력적 IP주소 연결하기 (0) | 2023.07.13 |
---|---|
[AWS]EC2 인스턴스 WinSCP 이용한 FTP 전송 (0) | 2023.07.12 |
[AWS]EC2 프리티어 인스턴스 생성, Apahce&PHP 설치 (1) | 2023.07.09 |