Files
AssetMonitor/common.py
dsyoon c45ad151b6 init
2026-01-28 18:58:33 +09:00

185 lines
8.1 KiB
Python

from datetime import datetime
import time
import pandas as pd
from monitor_min import Monitor
from config import *
class CommonCoinMonitor(Monitor):
"""코인 모니터링 공통 로직 클래스"""
def __init__(self, cooldown_file: str = 'coins_buy_time.json') -> None:
super().__init__(cooldown_file)
def get_balances(self) -> dict:
"""현재 보유 잔고 정보를 가져옵니다."""
tmps = self.getBalances()
balances = {}
for tmp in tmps:
balances[tmp['currency']] = {
'balance': float(tmp['balance']),
'avg_buy_price': float(tmp['avg_buy_price'])
}
return balances
def check_cooldown(self, symbol: str, side: str = 'buy') -> bool:
"""매수/매도 쿨다운 시간을 확인합니다."""
current_time = datetime.now()
last_trade_dt = self.buy_cooldown.get(symbol, {}).get(side, {}).get('datetime')
if last_trade_dt:
time_diff = current_time - last_trade_dt
if time_diff.total_seconds() < BUY_MINUTE_LIMIT:
print(f"{symbol}: {side} 금지 중 (남은 시간: {BUY_MINUTE_LIMIT - time_diff.total_seconds():.0f}초)")
return False
return True
def save_trade_record(self, symbol: str, side: str, signal: str) -> None:
"""거래 기록을 저장합니다."""
if self.cooldown_file is not None:
try:
self.last_signal[symbol] = signal
except Exception:
self.last_signal[symbol] = ''
self.buy_cooldown.setdefault(symbol, {})[side] = {
'datetime': datetime.now(),
'signal': signal
}
self._save_buy_cooldown()
def check_5_week_lowest(self, data: pd.DataFrame) -> bool:
"""5주봉이 20주봉이나 40주봉보다 아래에 있는지 체크합니다."""
try:
hours_in_week = 24 * 7 # 168 hours
period_5w = 5 * hours_in_week # 840 hours
period_20w = 20 * hours_in_week # 3,360 hours
period_40w = 40 * hours_in_week # 6,720 hours
if len(data) >= period_40w:
wma5 = data['Close'].rolling(window=period_5w).mean().iloc[-1]
wma20 = data['Close'].rolling(window=period_20w).mean().iloc[-1]
wma40 = data['Close'].rolling(window=period_40w).mean().iloc[-1]
# 5-week MA is the lowest among 5, 20, 40 week MAs
if (wma5 < wma20) and (wma5 < wma40):
return True
except Exception:
pass
return False
def execute_buy(self, symbol: str, buy_amount: float, signal: str, current_price: float) -> bool:
"""매수를 실행합니다."""
try:
actual_buy_amount = self.hts.buyCoinMarket(symbol, buy_amount)
self.save_trade_record(symbol, 'buy', signal)
print(f"{KR_COINS[symbol]} ({symbol}) [{signal}], 현재가: {current_price:.4f}, {int(BUY_MINUTE_LIMIT/60)}분간 매수 금지 시작")
self.sendMsg("{}".format(self.format_message(symbol, KR_COINS[symbol], current_price, signal, actual_buy_amount)))
return True
except Exception as e:
print(f"Error buying {symbol}: {str(e)}")
return False
def execute_sell(self, symbol: str, sell_amount: float, signal: str, current_price: float, balances: dict) -> bool:
"""매도를 실행합니다."""
try:
available_balance = 0
if balances and symbol in balances:
available_balance = float(balances[symbol].get('balance', 0))
if available_balance <= 0:
return False
actual_sell_amount = available_balance * sell_amount
_ = self.hts.sellCoinMarket(symbol, 0, actual_sell_amount)
self.save_trade_record(symbol, 'sell', signal)
print(f"{KR_COINS[symbol]} ({symbol}) [{signal} 매도], 현재가: {current_price:.4f}")
self.sendMsg("[KRW-COIN]\n" + f"• 매도 [COIN] {KR_COINS[symbol]} ({symbol}): {signal} ({''}{current_price:.4f})")
return True
except Exception as e:
print(f"Error selling {symbol}: {str(e)}")
return False
def process_inverse_data(self, symbol: str, interval: int, data: pd.DataFrame, balances: dict, coin_strategy) -> bool:
"""인버스 데이터를 처리하여 매도 신호를 확인합니다."""
try:
inverse_data = self.inverse_data(data)
recent_inverse_data = self.annotate_signals(symbol, interval, inverse_data)
# 허용된 인버스 매도 신호만 처리 (시간봉별 신호 포함)
last_signal = str(recent_inverse_data['signal'].iloc[-1]) if 'signal' in recent_inverse_data.columns else ''
allowed_signals = coin_strategy.get_sell_signals()
if last_signal not in allowed_signals:
return False
if not self.check_cooldown(symbol, 'sell'):
return False
# 코인별 전략에 따라 매도 비율 결정
sell_ratio = coin_strategy.get_sell_amount_ratio(last_signal)
return self.execute_sell(symbol, sell_ratio, last_signal, recent_inverse_data['Close'].iloc[-1], balances)
except Exception as e:
print(f"Error processing inverse data for {symbol}: {str(e)}")
return False
def process_normal_data(self, symbol: str, interval: int, data: pd.DataFrame, coin_strategy) -> bool:
"""일반 데이터를 처리하여 매수 신호를 확인합니다."""
try:
data = self.calculate_technical_indicators(data)
recent_data = self.annotate_signals(symbol, interval, data)
# XRP 전용 신호 확인 (hasattr로 메서드 존재 여부 확인)
if hasattr(coin_strategy, 'check_xrp_specific_signals'):
xrp_signal, xrp_point = coin_strategy.check_xrp_specific_signals(recent_data, len(recent_data) - 1)
if xrp_point == 1:
recent_data.at[recent_data.index[-1], 'signal'] = xrp_signal
recent_data.at[recent_data.index[-1], 'point'] = xrp_point
if recent_data['point'].iloc[-1] != 1:
return False
if not self.check_cooldown(symbol, 'buy'):
return False
# 코인별 전략에 따라 매수 금액 결정
signal = recent_data['signal'].iloc[-1]
current_price = recent_data['Close'].iloc[-1]
check_5_week_lowest = self.check_5_week_lowest(data)
buy_amount = coin_strategy.get_buy_amount(signal, current_price, check_5_week_lowest)
return self.execute_buy(symbol, buy_amount, signal, current_price)
except Exception as e:
print(f"Error processing normal data for {symbol}: {str(e)}")
return False
def monitor_single_coin(self, symbol: str, coin_strategy) -> None:
"""단일 코인을 모니터링합니다."""
print("[{}] {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), symbol))
# 3분봉과 1일봉으로 초기 신호 확인
intervals = [3, 1440]
data = self.get_coin_some_data(symbol, intervals)
for interval in intervals:
data[interval] = self.calculate_technical_indicators(data[interval])
recent_data = self.annotate_signals(symbol, interval, data[interval])
# 매수라면
if recent_data['point'].iloc[-1] == 1:
balances = self.get_balances()
# 인버스 데이터 처리 (매도)
self.process_inverse_data(symbol, interval, data[interval], balances, coin_strategy)
# 일반 데이터 처리 (매수)
self.process_normal_data(symbol, interval, data[interval], coin_strategy)
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(1)