This commit is contained in:
dsyoon
2025-08-09 17:04:17 +09:00
parent b3b06c59f8
commit ef962843a1
8 changed files with 1144 additions and 1136 deletions

View File

@@ -114,7 +114,7 @@ export STOCK_TELEGRAM_CHAT_ID="<CHAT_ID>"
## 사용 방법
```bash
$ python stock_monitor.py
$ python monitor_coin.py
```
스크립트가 백그라운드에서 무한 루프로 동작하며 지정된 시간마다 텔레그램 알림을 전송합니다.

View File

@@ -1,6 +1,6 @@
from HTS2 import HTS
import sqlite3
from stock_monitor import get_coin_more_data
from monitor_coin import get_coin_more_data
from config import *
hts = HTS()

407
monitor.py Normal file
View File

@@ -0,0 +1,407 @@
import pandas as pd
from HTS2 import HTS
from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta
import sqlite3
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
import FinanceDataReader as fdr
import numpy as np
import os
class Monitor:
"""자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스"""
cooldown_file = None
def __init__(self, cooldown_file='coins_buy_time.json') -> None:
self.hts = HTS()
if cooldown_file is not None:
self.cooldown_file = cooldown_file
self.buy_cooldown = self._load_buy_cooldown()
# ------------- Persistence -------------
def _load_buy_cooldown(self) -> dict:
if os.path.exists(self.cooldown_file):
try:
with open(self.cooldown_file, 'r', encoding='utf-8') as f:
data = json.load(f)
cooldown: dict[str, datetime] = {}
for symbol, time_str in data.items():
cooldown[symbol] = datetime.fromisoformat(time_str)
return cooldown
except Exception as e:
print(f"Error loading cooldown data: {e}")
return {}
return {}
def _save_buy_cooldown(self) -> None:
try:
data: dict[str, str] = {}
for symbol, dt in self.buy_cooldown.items():
data[symbol] = dt.isoformat()
with open(self.cooldown_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error saving cooldown data: {e}")
# ------------- Telegram -------------
def _send_coin_msg(self, text: str) -> None:
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
def _send_stock_msg(self, text: str) -> None:
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
def send_coin_telegram_message(self, message_list: list[str], header: str) -> None:
payload = header + "\n"
for i, message in enumerate(message_list):
payload += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(self._send_coin_msg, [payload])
payload = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(self._send_coin_msg, [payload])
def send_stock_telegram_message(self, message_list: list[str], header: str) -> None:
payload = header + "\n"
for i, message in enumerate(message_list):
payload += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(self._send_stock_msg, [payload])
payload = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(self._send_stock_msg, [payload])
# ------------- Indicators -------------
def normalize_data(self, data: pd.DataFrame) -> pd.DataFrame:
columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']
normalized_data = data.copy()
for column in columns_to_normalize:
min_val = data[column].rolling(window=20).min()
max_val = data[column].rolling(window=20).max()
denominator = max_val - min_val
normalized_data[f'{column}_Norm'] = np.where(
denominator != 0,
(data[column] - min_val) / denominator,
0.5,
)
return normalized_data
def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
data = self.normalize_data(data)
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA40'] = data['Close'].rolling(window=40).mean()
data['MA120'] = data['Close'].rolling(window=120).mean()
data['MA200'] = data['Close'].rolling(window=200).mean()
data['MA240'] = data['Close'].rolling(window=240).mean()
data['MA720'] = data['Close'].rolling(window=720).mean()
data['MA1440'] = data['Close'].rolling(window=1440).mean()
data['Deviation5'] = (data['Close'] / data['MA5']) * 100
data['Deviation20'] = (data['Close'] / data['MA20']) * 100
data['Deviation40'] = (data['Close'] / data['MA40']) * 100
data['Deviation120'] = (data['Close'] / data['MA120']) * 100
data['Deviation200'] = (data['Close'] / data['MA200']) * 100
data['Deviation240'] = (data['Close'] / data['MA240']) * 100
data['Deviation720'] = (data['Close'] / data['MA720']) * 100
data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100
data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1))
data['MA'] = data['Close'].rolling(window=20).mean()
data['STD'] = data['Close'].rolling(window=20).std()
data['Upper'] = data['MA'] + (2 * data['STD'])
data['Lower'] = data['MA'] - (2 * data['STD'])
return data
# ------------- Strategy -------------
def buy_ticker(self, symbol: str, data: pd.DataFrame) -> bool:
try:
current_time = datetime.now()
if data['buy_signal'].iloc[-1] != 'fall_5p':
buy_amount = 100000
if symbol in self.buy_cooldown:
time_diff = current_time - self.buy_cooldown[symbol]
if time_diff.total_seconds() < 600:
print(f"{symbol}: 매수 금지 중 (남은 시간: {600 - time_diff.total_seconds():.0f}초)")
return False
else:
if symbol in self.buy_cooldown:
time_diff = current_time - self.buy_cooldown[symbol]
if time_diff.total_seconds() < 1200:
print(f"{symbol}: 매수 금지 중 (남은 시간: {1200 - time_diff.total_seconds():.0f}초)")
return False
buy_amount = 5100
if data['buy_signal'].iloc[-1] == 'movingaverage':
buy_amount = 7000
elif data['buy_signal'].iloc[-1] == 'deviation40':
buy_amount = 10000
elif data['buy_signal'].iloc[-1] == 'deviation240':
buy_amount = 6000
elif data['buy_signal'].iloc[-1] == 'deviation1440':
if symbol in ['BONK', 'PEPE', 'TON']:
buy_amount = 30000
else:
buy_amount = 50000
_ = self.hts.buyCoinMarket(symbol, buy_amount)
if self.cooldown_file is not None:
self.buy_cooldown[symbol] = current_time
self._save_buy_cooldown()
print(f"{KR_COINS[symbol]} ({symbol}): {data['Close'].iloc[-1]:.4f}: 매수 완료, 20분간 매수 금지 시작")
try:
pool = Pool(12)
pool.map(self._send_coin_msg, [
"[KRW-COIN]" + "\n" + self.format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['buy_signal'].iloc[-1])
])
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
except Exception as e:
print(f"Error buying {symbol}: {str(e)}")
return False
return True
def check_buy_point(self, symbol: str, data: pd.DataFrame, simulation: bool | None = None) -> pd.DataFrame:
data = data.copy()
data['buy_signal'] = ''
data['buy_point'] = 0
if data['buy_point'].iloc[-1] != 1:
for i in range(1, len(data)):
if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \
all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \
data['MA720'].iloc[i] < data['MA1440'].iloc[i]:
data.at[data.index[i], 'buy_signal'] = 'movingaverage'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'movingaverage'
data.at[data.index[-1], 'buy_point'] = 1
if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation40'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation40'
data.at[data.index[-1], 'buy_point'] = 1
if symbol not in ['BONK']:
if symbol in ['TRX']:
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation240'
data.at[data.index[-1], 'buy_point'] = 1
else:
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation240'
data.at[data.index[-1], 'buy_point'] = 1
if symbol in ['TON']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
elif symbol in ['XRP']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
elif symbol in ['BONK']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
else:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
try:
prev_low = data['Low'].iloc[i - 1]
curr_close = data['Close'].iloc[i]
cond_close_drop = curr_close <= prev_low * 0.95
if cond_close_drop:
data.at[data.index[i], 'buy_signal'] = 'fall_5p'
data.at[data.index[i], 'buy_point'] = 1
if not simulation and data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'fall_5p'
data.at[data.index[-1], 'buy_point'] = 1
except Exception:
pass
return data
# ------------- Formatting -------------
def format_message(self, market_type: str, symbol: str, symbol_name: str, close: float, buy_signal: str) -> str:
message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} "
message += f"현재가: {'$' if market_type == 'US' else ''}{close:.4f}, "
return message
def format_ma_message(self, info: dict, market_type: str) -> str:
prefix = '상승 ' if info.get('alert') else ''
message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.4f} \n"
return message
# ------------- Data fetch -------------
def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None:
for attempt in range(retries):
try:
if to is None:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol)
else:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame:
to = datetime.now()
data: pd.DataFrame | None = None
while data is None or len(data) < bong_count:
if data is None:
data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
else:
previous_count = len(data)
df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
data = pd.concat([data, df], ignore_index=True)
if previous_count == len(data):
break
time.sleep(0.3)
to = to - relativedelta(minutes=interval * 200)
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame:
conn = sqlite3.connect('coins.db')
cursor = conn.cursor()
for i in range(1, len(data)):
cursor.execute(
"SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?",
(symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval),
)
arr = cursor.fetchone()
if not arr:
cursor.execute(
"INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
interval,
symbol,
KR_COINS[symbol],
data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'),
data['datetime'].iloc[-i].strftime('%Y%m%d'),
data['datetime'].iloc[-i].strftime('%H%M%S'),
data['Close'].iloc[-i],
data['Open'].iloc[-i],
data['High'].iloc[-i],
data['Low'].iloc[-i],
data['Volume'].iloc[-i],
),
)
else:
break
cursor.execute(
"select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from "
+ symbol
+ " order by ymdhms desc limit 5000) subquery order by datetime"
)
result = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
df = pd.DataFrame(result)
df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime']
df = df.set_index('datetime')
df = df.sort_index()
df['datetime'] = df.index
return df
def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame:
data = self.get_coin_data(symbol, interval)
data_1 = self.get_coin_data(symbol, interval=1)
data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60
saved_data = self.get_coin_saved_data(symbol, interval, data)
data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True)
data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S')
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None:
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume',
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None

164
monitor_coin.py Normal file
View File

@@ -0,0 +1,164 @@
import pandas as pd
from dateutil.relativedelta import relativedelta
from datetime import datetime
import sqlite3
import time
import requests
import json
from config import *
from monitor import Monitor
class MonitorCoin (Monitor):
"""자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스"""
def __init__(self, cooldown_file: str = 'coins_buy_time.json') -> None:
super().__init__(cooldown_file)
# ------------- Data fetch -------------
def get_coin_data(self, symbol: str, interval: int = 60, to: str | None = None, retries: int = 3) -> pd.DataFrame | None:
for attempt in range(retries):
try:
if to is None:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol)
else:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to)
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_coin_more_data(self, symbol: str, interval: int, bong_count: int = 3000) -> pd.DataFrame:
to = datetime.now()
data: pd.DataFrame | None = None
while data is None or len(data) < bong_count:
if data is None:
data = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
else:
previous_count = len(data)
df = self.get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
data = pd.concat([data, df], ignore_index=True)
if previous_count == len(data):
break
time.sleep(0.3)
to = to - relativedelta(minutes=interval * 200)
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_coin_saved_data(self, symbol: str, interval: int, data: pd.DataFrame) -> pd.DataFrame:
conn = sqlite3.connect('coins.db')
cursor = conn.cursor()
for i in range(1, len(data)):
cursor.execute(
"SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?",
(symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval),
)
arr = cursor.fetchone()
if not arr:
cursor.execute(
"INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
interval,
symbol,
KR_COINS[symbol],
data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'),
data['datetime'].iloc[-i].strftime('%Y%m%d'),
data['datetime'].iloc[-i].strftime('%H%M%S'),
data['Close'].iloc[-i],
data['Open'].iloc[-i],
data['High'].iloc[-i],
data['Low'].iloc[-i],
data['Volume'].iloc[-i],
),
)
else:
break
cursor.execute(
"select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from "
+ symbol
+ " order by ymdhms desc limit 5000) subquery order by datetime"
)
result = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
df = pd.DataFrame(result)
df.columns = ['Open', 'Close', 'High', 'Low', 'Volume', 'datetime']
df = df.set_index('datetime')
df = df.sort_index()
df['datetime'] = df.index
return df
def get_coin_some_data(self, symbol: str, interval: int) -> pd.DataFrame:
data = self.get_coin_data(symbol, interval)
data_1 = self.get_coin_data(symbol, interval=1)
data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60
saved_data = self.get_coin_saved_data(symbol, interval, data)
data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True)
data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S')
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def monitor_coins(self) -> None:
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
interval = 60
data = self.get_coin_some_data(symbol, interval)
if data is not None and not data.empty:
try:
data = self.calculate_technical_indicators(data)
recent_data = self.check_buy_point(symbol, data)
if recent_data['buy_point'].iloc[-1] != 1:
continue
buy_success = self.buy_ticker(symbol, recent_data)
if not buy_success:
continue
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
print("\t-{}".format(','.join(KR_COINS.keys())))
return
# ------------- Scheduler -------------
def run_schedule(self) -> None:
while True:
self.monitor_coins()
time.sleep(10)
if __name__ == "__main__":
KR_COINS.keys()
MonitorCoin().run_schedule()

113
monitor_stock.py Normal file
View File

@@ -0,0 +1,113 @@
import pandas as pd
from datetime import datetime, timedelta
import time
import schedule
from config import *
import FinanceDataReader as fdr
from monitor import Monitor
class MonitorStock (Monitor):
"""자산(코인/주식/ETF) 모니터링 및 매수 실행 클래스"""
def __init__(self) -> None:
super().__init__(None)
def get_kr_stock_data(self, symbol: str, retries: int = 3) -> pd.DataFrame | None:
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume',
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
# ------------- Monitors -------------
def monitor_us_stocks(self) -> None:
message_list: list[str] = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = self.get_kr_stock_data(symbol)
if data is not None and not data.empty:
try:
data = self.calculate_technical_indicators(data)
recent_data = self.check_buy_point(symbol, data)
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}")
message_list.append(
self.format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1])
)
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
self.send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
def monitor_kr_stocks(self) -> None:
message_list: list[str] = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
try:
clean_symbol = symbol.replace('.KS', '')
data = self.get_kr_stock_data(clean_symbol)
if data is not None and not data.empty:
try:
data = self.calculate_technical_indicators(data)
recent_data = self.check_buy_point(symbol, data)
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}")
message_list.append(
self.format_message('KR', symbol, KR_ETFS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1])
)
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(1)
except Exception as e:
print(f"Unexpected error processing {symbol}: {str(e)}")
continue
if len(message_list) > 0:
try:
self.send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
# ------------- Scheduler -------------
def run_schedule(self) -> None:
schedule.every().day.at("16:30").do(self.monitor_us_stocks)
schedule.every().day.at("23:30").do(self.monitor_us_stocks)
schedule.every().day.at("23:30").do(self.monitor_us_stocks)
schedule.every().day.at("05:10").do(self.monitor_us_stocks)
schedule.every().day.at("18:20").do(self.monitor_kr_stocks)
schedule.every().day.at("07:10").do(self.monitor_kr_stocks)
print("Scheduler started. Stock Monitoring will run at specified times.")
print(f"Loaded cooldown data for {len(self.buy_cooldown)} coins")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
MonitorStock().run_schedule()

458
simulation.py Normal file
View File

@@ -0,0 +1,458 @@
from dateutil.relativedelta import relativedelta
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import matplotlib.dates
import mplcursors
plt.rcParams['font.family'] ='AppleGothic'
plt.rcParams['axes.unicode_minus'] =False
from config import *
from monitor import Monitor
class Simulation:
def __init__(self) -> None:
self.monitor = Monitor()
self.INTERVAL_MAP = {
60: "60m",
240: "4h",
}
def detect_turnaround_signal(self, symbol, data, interval=0, params=None):
if len(data) < 7:
return None
current_data = data.iloc[-1]
if current_data.get('buy_point', 0) == 1:
return {
'alert': True,
'details': f"매수신호: {current_data.get('buy_signal', 'unknown')}"
}
return {'alert': False, 'details': "매수신호 없음"}
def fetch_price_history(self, symbol: str, interval_minutes: int, days: int = 30) -> pd.DataFrame:
if symbol in KR_COINS:
bong_count = 3000
return self.monitor.get_coin_more_data(symbol, interval_minutes, bong_count=bong_count)
if interval_minutes not in self.INTERVAL_MAP:
raise ValueError("interval must be 60 or 240")
interval_str = self.INTERVAL_MAP[interval_minutes]
df = yf.download(
tickers=symbol,
period=f"{days}d",
interval=interval_str,
progress=False,
)
if df.empty:
raise RuntimeError("No data fetched. Check symbol or interval support.")
return df
def analyze_bottom_period(self, symbol: str, interval_minutes: int, days: int = 90):
data = self.fetch_price_history(symbol, interval_minutes, days)
data = self.monitor.calculate_technical_indicators(data)
data = self.monitor.check_buy_point(symbol, data, simulation=True)
print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}")
print(f"총 데이터 수: {len(data)}")
bottom_start = pd.Timestamp('2025-06-22')
bottom_end = pd.Timestamp('2025-07-09')
bottom_data = data[(data.index >= bottom_start) & (data.index <= bottom_end)]
if len(bottom_data) == 0:
print("저점 기간 데이터가 없습니다.")
return None, []
print(f"\n저점 기간 데이터: {bottom_data.index[0]} ~ {bottom_data.index[-1]}")
print(f"저점 기간 데이터 수: {len(bottom_data)}")
print("\n=== 저점 기간 기술적 지표 분석 ===")
min_price = bottom_data['Low'].min()
max_price = bottom_data['High'].max()
avg_price = bottom_data['Close'].mean()
print(f"최저가: {min_price:.4f}")
print(f"최고가: {max_price:.4f}")
print(f"평균가: {avg_price:.4f}")
print(f"가격 변동폭: {((max_price - min_price) / min_price * 100):.2f}%")
bb_lower_min = bottom_data['Lower'].min()
bb_upper_max = bottom_data['Upper'].max()
print(f"\n볼린저 밴드 분석:")
print(f"하단 밴드 최저: {bb_lower_min:.4f}")
print(f"상단 밴드 최고: {bb_upper_max:.4f}")
volume_avg = bottom_data['Volume'].mean()
volume_max = bottom_data['Volume'].max()
print(f"\n거래량 분석:")
print(f"평균 거래량: {volume_avg:.0f}")
print(f"최대 거래량: {volume_max:.0f}")
actual_bottom_idx = bottom_data['Low'].idxmin()
actual_bottom_price = bottom_data.loc[actual_bottom_idx, 'Low']
actual_bottom_date = actual_bottom_idx
print(f"\n실제 저점:")
print(f"날짜: {actual_bottom_date}")
print(f"가격: {actual_bottom_price:.4f}")
print(f"볼린저 하단 대비: {((actual_bottom_price - bottom_data.loc[actual_bottom_idx, 'Lower']) / bottom_data.loc[actual_bottom_idx, 'Lower'] * 100):.2f}%")
print(f"\n=== 매수 신호 분석 ===")
bottom_alerts = bottom_data[bottom_data['buy_point'] == 1]
alerts = [(idx, row['Close']) for idx, row in bottom_alerts.iterrows()]
print(f"저점 기간 매수 신호 수: {len(alerts)}")
if alerts:
print("매수 신호 발생 시점:")
for date, price in alerts:
print(f" {date}: {price:.4f}")
return bottom_data, alerts
def run_simulation(self, symbol: str, interval_minutes: int, days: int = 30):
data = self.fetch_price_history(symbol, interval_minutes)
data = self.monitor.calculate_technical_indicators(data)
data = self.monitor.check_buy_point(symbol, data, simulation=True)
print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}")
print(f"총 데이터 수: {len(data)}")
alerts = []
for i in range(len(data)):
if data['buy_point'].iloc[i] == 1:
alerts.append((data.index[i], data['Close'].iloc[i]))
print(f"\n총 매수 신호 수: {len(alerts)}")
ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')])
dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')])
dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')])
dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')])
print(f" - MA 신호: {ma_signals}")
print(f" - Dev40 신호: {dev40_signals}")
print(f" - Dev240 신호: {dev240_signals}")
print(f" - Dev1440 신호: {dev1440_signals}")
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(15, 8), height_ratios=[3, 1])
fig.suptitle(f"{symbol} - 시뮬레이션 {interval_minutes}분봉", fontsize=14)
# ----------------- 마우스 휠 확대/축소 -----------------
def on_scroll(event):
# event.button: 'up' -> zoom in, 'down' -> zoom out
if event.inaxes not in [ax1, ax2]:
return
ax = event.inaxes
# x 축만 두 축을 동시에 조정
cur_xlim = ax1.get_xlim()
xdata = event.xdata
if xdata is None:
return
scale_factor = 0.9 if event.button == 'up' else 1/0.9
new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor
relx = (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0])
new_left = xdata - new_width * (1 - relx)
new_right = xdata + new_width * relx
# 데이터 영역 벗어나지 않도록 클램프
xmin, xmax = matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1])
if new_left < xmin:
new_left = xmin
if new_right > xmax:
new_right = xmax
ax1.set_xlim([new_left, new_right])
ax2.set_xlim([new_left, new_right])
# y축은 해당 축만 줌
cur_ylim = ax.get_ylim()
ydata = event.ydata
if ydata is not None:
new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor
rely = (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0])
ax.set_ylim([ydata - new_height * (1 - rely), ydata + new_height * rely])
ax.figure.canvas.draw_idle()
fig.canvas.mpl_connect('scroll_event', on_scroll)
# 캔들스틱 차트 추가 (matplotlib 기본 기능 사용)
import matplotlib.dates as mdates
# 캔들스틱 데이터 준비
ohlc_data = []
for i, (idx, row) in enumerate(data.iterrows()):
ohlc_data.append([
mdates.date2num(idx),
row['Open'],
row['High'],
row['Low'],
row['Close']
])
# 캔들스틱 그리기 (matplotlib 기본 기능으로 구현)
for ohlc in ohlc_data:
date, open_price, high, low, close = ohlc
# 캔들 색상 결정
color = 'red' if close >= open_price else 'blue'
# 캔들 몸통 그리기
body_height = abs(close - open_price)
body_bottom = min(open_price, close)
rect = plt.Rectangle((date - 0.3, body_bottom), 0.6, body_height,
facecolor=color, edgecolor='black', alpha=0.7)
ax1.add_patch(rect)
# 캔들 심지 그리기
ax1.plot([date, date], [low, high], color='black', linewidth=1)
# 메인 차트 (가격, 이동평균선, 볼린저 밴드)
line_close = ax1.plot(data.index, data["Close"], label="종가", color="black", linewidth=1.5, alpha=0.8)[0]
line_ma5 = ax1.plot(data.index, data["MA5"], label="MA5", color="red", linewidth=1)[0]
line_ma20 = ax1.plot(data.index, data["MA20"], label="MA20", color="blue", linewidth=1)[0]
line_ma40 = ax1.plot(data.index, data["MA40"], label="MA40", color="green", linewidth=1)[0]
line_ma120 = ax1.plot(data.index, data["MA120"], label="MA120", color="purple", linewidth=1)[0]
line_ma200 = ax1.plot(data.index, data["MA200"], label="MA200", color="brown", linewidth=1)[0]
line_ma240 = ax1.plot(data.index, data["MA240"], label="MA240", color="darkred", linewidth=1)[0]
line_ma720 = ax1.plot(data.index, data["MA720"], label="MA720", color="cyan", linewidth=1)[0]
line_ma1440 = ax1.plot(data.index, data["MA1440"], label="MA1440", color="magenta", linewidth=1)[0]
# Bollinger Bands
line_upper = ax1.plot(data.index, data["Upper"], label="볼린저 Upper", color="grey", linestyle="--", linewidth=1)[0]
line_lower = ax1.plot(data.index, data["Lower"], label="볼린저 Lower", color="grey", linestyle="--", linewidth=1)[0]
ax1.fill_between(data.index, data["Lower"], data["Upper"], color="grey", alpha=0.1)
# 매수 포인트를 신호 유형별로 다르게 표시 (수직선 포함)
# 이동평균선 기반 매수 포인트
ma_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]
scatter_ma_buy_points = None
if len(ma_buy_points) > 0:
scatter_ma_buy_points = ax1.scatter(ma_buy_points.index, ma_buy_points['Close'], color='red', s=150, zorder=10, label='MA 매수 포인트', marker='o')
for time in ma_buy_points.index:
ax1.axvline(x=time, color='red', linestyle='-', alpha=0.5, linewidth=1)
# Deviation40 기반 매수 포인트
dev40_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]
scatter_dev40_buy_points = None
if len(dev40_buy_points) > 0:
scatter_dev40_buy_points = ax1.scatter(dev40_buy_points.index, dev40_buy_points['Close'],
facecolors='none', edgecolors='red', linestyle='--',
linewidth=2, s=200, zorder=10, label='Dev40 매수 포인트')
for time in dev40_buy_points.index:
ax1.axvline(x=time, color='red', linestyle='--', alpha=0.5, linewidth=1)
# Deviation240 기반 매수 포인트
dev240_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]
scatter_dev240_buy_points = None
if len(dev240_buy_points) > 0:
scatter_dev240_buy_points = ax1.scatter(dev240_buy_points.index, dev240_buy_points['Close'],
facecolors='none', edgecolors='blue', linestyle='--',
linewidth=2, s=200, zorder=10, label='Dev240 매수 포인트')
for time in dev240_buy_points.index:
ax1.axvline(x=time, color='blue', linestyle='--', alpha=0.5, linewidth=1)
# Deviation1440 기반 매수 포인트
dev1440_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')]
scatter_dev1440_buy_points = None
if len(dev1440_buy_points) > 0:
scatter_dev1440_buy_points = ax1.scatter(dev1440_buy_points.index, dev1440_buy_points['Close'],
facecolors='none', edgecolors='purple', linestyle='--',
linewidth=2, s=200, zorder=10, label='Dev1440 매수 포인트')
for time in dev1440_buy_points.index:
ax1.axvline(x=time, color='purple', linestyle='--', alpha=0.5, linewidth=1)
# 마우스 오버 기능 추가 (이동평균선 매수 포인트)
if scatter_ma_buy_points is not None:
cursor = mplcursors.cursor(scatter_ma_buy_points, hover=True)
cursor.connect("add", lambda sel: sel.annotation.set_text(
f'MA 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation40 매수 포인트)
if scatter_dev40_buy_points is not None:
cursor_dev40 = mplcursors.cursor(scatter_dev40_buy_points, hover=True)
cursor_dev40.connect("add", lambda sel: sel.annotation.set_text(
f'Dev40 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation240 매수 포인트)
if scatter_dev240_buy_points is not None:
cursor_dev240 = mplcursors.cursor(scatter_dev240_buy_points, hover=True)
cursor_dev240.connect("add", lambda sel: sel.annotation.set_text(
f'Dev240 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev240.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation1440 매수 포인트)
if scatter_dev1440_buy_points is not None:
cursor_dev1440 = mplcursors.cursor(scatter_dev1440_buy_points, hover=True)
cursor_dev1440.connect("add", lambda sel: sel.annotation.set_text(
f'Dev1440 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev1440.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 모든 봉에 마우스 오버 기능 추가
cursor3 = mplcursors.cursor(line_close, hover=True)
cursor3.connect("add", lambda sel: sel.annotation.set_text(
f'종가\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor3.connect("remove", lambda sel: sel.annotation.set_visible(False))
ax1.set_ylabel("가격 (KRW)")
ax1.set_title(f"{symbol} 차트 - 캔들스틱 & 이동평균선", fontsize=14)
ax1.grid(True, linestyle='--', alpha=0.3)
# 홈 버튼 추가 (초기화 기능)
home_button = ax1.text(0.02, 0.98, '', transform=ax1.transAxes,
bbox=dict(boxstyle="round,pad=0.3", facecolor='lightblue', alpha=0.8),
fontsize=10, ha='left', va='top', picker=True)
# --- 범례 생성 및 인터랙티브 토글 ---
legend = ax1.legend(loc='upper left', bbox_to_anchor=(0.02, 0.85), fontsize=10)
# 범례 클릭 시 해당 선 토글 기능
lined = {}
if hasattr(legend, "legend_handles"):
legend_handles = legend.legend_handles
elif hasattr(legend, "legendHandles"):
legend_handles = legend.legendHandles
else:
legend_handles = legend.get_lines()
plot_lines = [line_close, line_ma5, line_ma20, line_ma40, line_ma120,
line_ma200, line_ma240, line_ma720, line_ma1440,
line_upper, line_lower]
if scatter_ma_buy_points is not None:
plot_lines.append(scatter_ma_buy_points)
if scatter_dev40_buy_points is not None:
plot_lines.append(scatter_dev40_buy_points)
if scatter_dev240_buy_points is not None:
plot_lines.append(scatter_dev240_buy_points)
if scatter_dev1440_buy_points is not None:
plot_lines.append(scatter_dev1440_buy_points)
for leg_handle, orig in zip(legend_handles, plot_lines):
leg_handle.set_picker(True)
lined[leg_handle] = orig
def on_pick(event):
leg_handle = event.artist
if leg_handle == home_button:
ax1.set_xlim(matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1]))
ax1.set_ylim(data['Low'].min() * 0.99, data['High'].max() * 1.01)
ax2.set_xlim(ax1.get_xlim())
ax2.set_ylim(
data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].min().min() * 0.95,
data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].max().max() * 1.05,
)
fig.canvas.draw_idle()
return
orig = lined.get(leg_handle)
if orig is None:
return
vis = not orig.get_visible()
orig.set_visible(vis)
leg_handle.set_alpha(1.0 if vis else 0.2)
fig.canvas.draw_idle()
fig.canvas.mpl_connect('pick_event', on_pick)
# Deviation subplot (이격도 보조지표)
line_dev5 = ax2.plot(data.index, data['Deviation5'], color='red', label='Dev5', linewidth=1)[0]
line_dev20 = ax2.plot(data.index, data['Deviation20'], color='blue', label='Dev20', linewidth=1)[0]
line_dev40 = ax2.plot(data.index, data['Deviation40'], color='green', label='Dev40', linewidth=2)[0]
line_dev120 = ax2.plot(data.index, data['Deviation120'], color='purple', label='Dev120', linewidth=1)[0]
line_dev200 = ax2.plot(data.index, data['Deviation200'], color='brown', label='Dev200', linewidth=1)[0]
line_dev240 = ax2.plot(data.index, data['Deviation240'], color='darkred', label='Dev240', linewidth=2)[0]
line_dev720 = ax2.plot(data.index, data['Deviation720'], color='cyan', label='Dev720', linewidth=1)[0]
line_dev1440 = ax2.plot(data.index, data['Deviation1440'], color='magenta', label='Dev1440', linewidth=1)[0]
cursor_dev = mplcursors.cursor([line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440], hover=True)
cursor_dev.connect("add", lambda sel: sel.annotation.set_text(
f"{sel.artist.get_label()}\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M')}\n값: {sel.target[1]:.2f}"
))
line_h90 = ax2.axhline(90, color='red', linestyle='--', linewidth=2, label='90', alpha=0.7)
line_h95 = ax2.axhline(95, color='green', linestyle='--', linewidth=2, label='95', alpha=0.7)
line_h100 = ax2.axhline(100, color='black', linestyle='-', linewidth=1, label='100', alpha=0.5)
ax2.set_ylabel('이격도 (%)')
ax2.set_title('이격도 보조지표', fontsize=12)
legend2 = ax2.legend(loc='upper left', fontsize=9)
if legend2 is not None:
if hasattr(legend2, "legend_handles"):
legend2_handles = legend2.legend_handles
elif hasattr(legend2, "legendHandles"):
legend2_handles = legend2.legendHandles
else:
legend2_handles = legend2.get_lines()
plot_lines2 = [line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440, line_h90, line_h95]
for leg_handle in legend2_handles:
label = leg_handle.get_label()
target_line = next((pl for pl in plot_lines2 if pl.get_label() == label), None)
if target_line is not None:
leg_handle.set_picker(True)
lined[leg_handle] = target_line
ax2.grid(True, linestyle='--', alpha=0.3)
plt.tight_layout()
print("그래프를 표시합니다...")
print(f"매수 포인트 수: MA={len(ma_buy_points)}, Dev40={len(dev40_buy_points)}, Dev240={len(dev240_buy_points)}")
# -------- 확대/축소 및 이동 기능 --------
press = {}
def on_scroll(event):
ax = event.inaxes
if ax is None:
return
x_left, x_right = ax.get_xlim()
x_range = (x_right - x_left)
if event.button == 'up': # zoom in
scale = 0.8
elif event.button == 'down': # zoom out
scale = 1.25
else:
scale = 1.0
new_range = x_range * scale
center = event.xdata if event.xdata is not None else (x_left + x_right) / 2
ax.set_xlim(center - new_range / 2, center + new_range / 2)
for other_ax in fig.axes:
if other_ax is not ax:
other_ax.set_xlim(ax.get_xlim())
fig.canvas.draw_idle()
def on_press(event):
if event.button == 1 and event.inaxes is not None:
press['xpress'] = event.xdata
press['axes'] = event.inaxes
def on_motion(event):
if 'xpress' not in press or press.get('axes') is None or event.inaxes is None:
return
dx = press['xpress'] - event.xdata
for ax in fig.axes:
x_left, x_right = ax.get_xlim()
ax.set_xlim(x_left + dx, x_right + dx)
fig.canvas.draw_idle()
def on_release(event):
press.clear()
fig.canvas.mpl_connect('scroll_event', on_scroll)
fig.canvas.mpl_connect('button_press_event', on_press)
fig.canvas.mpl_connect('motion_notify_event', on_motion)
fig.canvas.mpl_connect('button_release_event', on_release)
plt.show()
if __name__ == "__main__":
sim = Simulation()
interval = 60
days = 90
target_coins = ['POL']
show_graphs = True
for symbol in target_coins:
print(f"\n=== {symbol} 저점 기간 분석 시작 ===")
try:
bottom_data, alerts = sim.analyze_bottom_period(symbol, interval, days)
print(f"\n=== {symbol} 전체 기간 시뮬레이션 ===")
if show_graphs:
sim.run_simulation(symbol, interval, days)
else:
data = sim.fetch_price_history(symbol, interval, days)
data = sim.monitor.calculate_technical_indicators(data)
data = sim.monitor.check_buy_point(symbol, data, simulation=True)
total_buy_signals = len(data[data['buy_point'] == 1])
ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')])
dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')])
dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')])
dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')])
print(f"총 매수 신호: {total_buy_signals}")
print(f" - MA 신호: {ma_signals}")
print(f" - Dev40 신호: {dev40_signals}")
print(f" - Dev240 신호: {dev240_signals}")
print(f" - Dev1440 신호: {dev1440_signals}")
except Exception as e:
print(f"Error analyzing {symbol}: {str(e)}")

View File

@@ -1,592 +0,0 @@
import pandas as pd
from HTS2 import HTS
from dateutil.relativedelta import relativedelta
from datetime import datetime, timedelta
import sqlite3
import telegram
import time
import requests
import json
import asyncio
from multiprocessing import Pool
import schedule
from config import *
import FinanceDataReader as fdr
import numpy as np
import os
hts = HTS()
# 매수 금지 시간을 관리하는 JSON 파일 경로
COOLDOWN_FILE = 'coins_buy_time.json'
def load_buy_cooldown():
"""매수 금지 시간을 JSON 파일에서 로드"""
if os.path.exists(COOLDOWN_FILE):
try:
with open(COOLDOWN_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
# 문자열을 datetime 객체로 변환
cooldown = {}
for symbol, time_str in data.items():
cooldown[symbol] = datetime.fromisoformat(time_str)
return cooldown
except Exception as e:
print(f"Error loading cooldown data: {e}")
return {}
return {}
def save_buy_cooldown(cooldown):
"""매수 금지 시간을 JSON 파일에 저장"""
try:
# datetime 객체를 문자열로 변환
data = {}
for symbol, dt in cooldown.items():
data[symbol] = dt.isoformat()
with open(COOLDOWN_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Error saving cooldown data: {e}")
# 매수 금지 시간을 추적하는 전역 딕셔너리
buy_cooldown = load_buy_cooldown()
def send_coin_msg(text):
coin_client = telegram.Bot(token=COIN_TELEGRAM_BOT_TOKEN)
asyncio.run(coin_client.send_message(chat_id=COIN_TELEGRAM_CHAT_ID, text=text))
return
def send_coin_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_coin_msg, [pStr])
return
def send_stock_msg(text):
stock_client = telegram.Bot(token=STOCK_TELEGRAM_BOT_TOKEN)
asyncio.run(stock_client.send_message(chat_id=STOCK_TELEGRAM_CHAT_ID, text=text))
return
def send_stock_telegram_message(message_list, header):
pStr = header + "\n"
for i, message in enumerate(message_list):
pStr += message
if i + 1 % 20 == 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
pStr = ''
if len(message_list) % 20 != 0:
pool = Pool(12)
pool.map(send_stock_msg, [pStr])
return
def normalize_data(data):
"""데이터 정규화 함수 - 모든 코인에 동일하게 적용"""
# Min-Max 정규화를 위한 컬럼
columns_to_normalize = ['Open', 'High', 'Low', 'Close', 'Volume']
normalized_data = data.copy()
# 각 컬럼별 정규화 (20일 롤링 윈도우 사용)
for column in columns_to_normalize:
min_val = data[column].rolling(window=20).min()
max_val = data[column].rolling(window=20).max()
# 0으로 나누기 방지
denominator = max_val - min_val
normalized_data[f'{column}_Norm'] = np.where(
denominator != 0,
(data[column] - min_val) / denominator,
0.5 # 기본값 설정
)
return normalized_data
def calculate_technical_indicators(data):
"""기술적 지표 계산 - 모든 코인에 동일하게 적용"""
# 데이터 정규화
data = normalize_data(data)
# 이동평균선 계산
data['MA5'] = data['Close'].rolling(window=5).mean()
data['MA20'] = data['Close'].rolling(window=20).mean()
data['MA40'] = data['Close'].rolling(window=40).mean()
data['MA120'] = data['Close'].rolling(window=120).mean()
data['MA200'] = data['Close'].rolling(window=200).mean()
data['MA240'] = data['Close'].rolling(window=240).mean()
data['MA720'] = data['Close'].rolling(window=720).mean()
data['MA1440'] = data['Close'].rolling(window=1440).mean()
# --- 이격도(Deviation) 계산 ---
data['Deviation5'] = (data['Close'] / data['MA5']) * 100
data['Deviation20'] = (data['Close'] / data['MA20']) * 100
data['Deviation40'] = (data['Close'] / data['MA40']) * 100
data['Deviation120'] = (data['Close'] / data['MA120']) * 100
data['Deviation200'] = (data['Close'] / data['MA200']) * 100
data['Deviation240'] = (data['Close'] / data['MA240']) * 100
data['Deviation720'] = (data['Close'] / data['MA720']) * 100
data['Deviation1440'] = (data['Close'] / data['MA1440']) * 100
# 매수 타이밍을 이동평균선으로 결정
# 골든크로스: 단기 이동평균선이 장기 이동평균선을 상향 돌파할 때 매수
data['golden_cross'] = (data['MA5'] > data['MA20']) & (data['MA5'].shift(1) <= data['MA20'].shift(1))
# 볼린저 밴드 계산
data['MA'] = data['Close'].rolling(window=20).mean()
data['STD'] = data['Close'].rolling(window=20).std()
data['Upper'] = data['MA'] + (2 * data['STD'])
data['Lower'] = data['MA'] - (2 * data['STD'])
return data
def buy_ticker(symbol, data):
try:
current_time = datetime.now()
if data['buy_signal'].iloc[-1] != 'fall_5p':
# 5%이상 급락일 때는 10분마다 매수
BUY_AMOUNT = 100000
else:
# 매수 금지 시간 확인 (20분)
if symbol in buy_cooldown:
time_diff = current_time - buy_cooldown[symbol]
if time_diff.total_seconds() < 1200: # 20분 = 1200초
print(f"{symbol}: 매수 금지 중 (남은 시간: {1200 - time_diff.total_seconds():.0f}초)")
return False
BUY_AMOUNT = 5100
if data['buy_signal'].iloc[-1] == 'movingaverage':
BUY_AMOUNT = 7000
elif data['buy_signal'].iloc[-1] == 'deviation40':
BUY_AMOUNT = 10000
elif data['buy_signal'].iloc[-1] == 'deviation240':
BUY_AMOUNT = 6000
elif data['buy_signal'].iloc[-1] == 'deviation1440':
if symbol in ['BONK', 'PEPE', 'TON']:
BUY_AMOUNT = 30000
else:
BUY_AMOUNT = 50000
_ = hts.buyCoinMarket(symbol, BUY_AMOUNT)
# 매수 성공 시 금지 시간 설정 및 파일에 저장
buy_cooldown[symbol] = current_time
save_buy_cooldown(buy_cooldown)
print(f"{KR_COINS[symbol]} ({symbol}): {data['Close'].iloc[-1]:.4f}: 매수 완료, 20분간 매수 금지 시작")
try:
pool = Pool(12)
pool.map(send_coin_msg, ["[KRW-COIN]" + "\n" + format_message('COIN', symbol, KR_COINS[symbol], data['Close'].iloc[-1], data['buy_signal'].iloc[-1])])
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
except Exception as e:
print(f"Error buying {symbol}: {str(e)}")
return False
return True
def check_buy_point(symbol, data, simulation=None):
# 매수 포인트 탐지 및 표시
# 데이터 복사본 생성하여 SettingWithCopyWarning 방지
data = data.copy()
# 'buy_point' 열 초기화
data['buy_signal'] = ''
data['buy_point'] = 0
# FutureWarning 해결
if data['buy_point'].iloc[-1] != 1:
# 코드 계속
for i in range(1, len(data)):
# 이동평균선 기반 매수 조건
if all(data[f'MA{n}'].iloc[i] < data['MA720'].iloc[i] for n in [5, 20, 40, 120, 200, 240]) and \
all(data[f'MA{n}'].iloc[i] > data[f'MA{n}'].iloc[i - 1] for n in [5, 20, 40, 120, 200, 240]) and \
data['MA720'].iloc[i] < data['MA1440'].iloc[i]:
data.at[data.index[i], 'buy_signal'] = 'movingaverage'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'movingaverage'
data.at[data.index[-1], 'buy_point'] = 1
# Deviation40(이격도 40) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation40'].iloc[i - 1] < data['Deviation40'].iloc[i] and data['Deviation40'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation40'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation40'
data.at[data.index[-1], 'buy_point'] = 1
if symbol not in ['BONK']:
# BONK는 240 체크하지 않음
if symbol in ['TRX']:
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 98:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation240'
data.at[data.index[-1], 'buy_point'] = 1
else:
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if data['Deviation240'].iloc[i - 1] < data['Deviation240'].iloc[i] and data['Deviation240'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation240'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation240'
data.at[data.index[-1], 'buy_point'] = 1
# Deviation240(이격도 240) 기반 매수 조건: 90 이하에서 상승 전환
if symbol in ['TON']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 89:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
elif symbol in ['XRP']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 90:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
elif symbol in ['BONK']:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 76:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
else:
if data['Deviation1440'].iloc[i - 1] < data['Deviation1440'].iloc[i] and data['Deviation1440'].iloc[i - 1] <= 80:
data.at[data.index[i], 'buy_signal'] = 'deviation1440'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'deviation1440'
data.at[data.index[-1], 'buy_point'] = 1
# 하락 5% 조건: 현재가 또는 현재 봉의 저가가 지난 봉 고가/현재 봉 고가 대비 5% 이상 하락
try:
prev_low = data['Low'].iloc[i - 1]
curr_close = data['Close'].iloc[i]
cond_close_drop = curr_close <= prev_low * 0.95
if cond_close_drop:
data.at[data.index[i], 'buy_signal'] = 'fall_5p'
data.at[data.index[i], 'buy_point'] = 1
if not simulation:
if data['buy_point'][-3:].sum() > 0:
data.at[data.index[-1], 'buy_signal'] = 'fall_5p'
data.at[data.index[-1], 'buy_point'] = 1
except Exception:
pass
return data
def format_message(market_type, symbol, symbol_name, close, buy_signal):
message = f"매수 [{market_type}] {symbol_name} ({symbol}): {buy_signal} "
message += f"현재가: {'$' if market_type == 'US' else ''}{close:.4f}, "
return message
def format_ma_message(info, market_type):
"""MA 알림 메시지 생성"""
prefix = '상승 ' if info.get('alert') else ''
message = prefix + f"[{market_type}] {info['name']} ({info['symbol']}) "
message += f"현재가: {'$' if market_type == 'US' else ''}{info['price']:.4f} \n"
return message
def get_coin_data(symbol, interval=60, to=None, retries=3):
for attempt in range(retries):
try:
#url = "https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=3000".format(interval, symbol)
if to is None:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200").format(interval, symbol)
else:
url = ("https://api.bithumb.com/v1/candles/minutes/{}?market=KRW-{}&count=200&to={}").format(interval, symbol, to)
#url = 'https://api.bithumb.com/v1/candles/minutes/60?market=KRW-ADA&count=200'
#url = 'https://api.bithumb.com/v1/candles/minutes/minutes/60?market=KRW-ADA&count=200&to=2025-08-06 10:38:38'
headers = {"accept": "application/json"}
response = requests.get(url, headers=headers)
json_data = json.loads(response.text)
df_temp = pd.DataFrame(json_data)
df_temp = df_temp.sort_index(ascending=False)
if 'candle_date_time_kst' not in df_temp:
return None
data = pd.DataFrame()
# data.columns = ['datetime', 'open', 'close', 'high', 'low', 'volume']
# data['datetime'] = pd.to_datetime(data_temp['candle_date_time_kst'])
data['datetime'] = pd.to_datetime(df_temp['candle_date_time_kst'], format='%Y-%m-%dT%H:%M:%S')
data['Open'] = df_temp['opening_price']
data['Close'] = df_temp['trade_price']
data['High'] = df_temp['high_price']
data['Low'] = df_temp['low_price']
data['Volume'] = df_temp['candle_acc_trade_volume']
data = data.set_index('datetime')
data = data.astype(float)
data["datetime"] = data.index
if not data.empty:
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(0.5)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def get_coin_more_data(symbol, interval, bong_count=3000):
# 코인 데이터 1500개 봉 가져오기
to = datetime.now()
data = None
while data is None or len(data) < bong_count:
if data is None:
data = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
else:
p_count = len(data)
df = get_coin_data(symbol, interval, to.strftime("%Y-%m-%d %H:%M:%S"))
data = pd.concat([data, df], ignore_index=True)
if p_count == len(data):
break
time.sleep(0.3)
to = to - relativedelta(minutes=interval * 200)
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
# 코인 데이터 1500개 봉 가져오기
return data
def get_coin_saved_data(symbol, interval, data):
conn = sqlite3.connect('coins.db')
cursor = conn.cursor()
for i in range(1, len(data)):
cursor.execute("SELECT * from " + symbol + " where CODE = ? and ymdhms = ? and interval = ?", (symbol, data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), interval))
arr = cursor.fetchone()
if not arr:
cursor.execute("INSERT INTO " + symbol + " (interval, CODE, NAME, ymdhms, ymd, hms, close, open, high, low, volume) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (interval, symbol, KR_COINS[symbol], data['datetime'].iloc[-i].strftime('%Y-%m-%d %H:%M:%S'), data['datetime'].iloc[-i].strftime('%Y%m%d'), data['datetime'].iloc[-i].strftime('%H%M%S'), data['Close'].iloc[-i], data['Open'].iloc[-i], data['High'].iloc[-i], data['Low'].iloc[-i], data['Volume'].iloc[-i]))
else:
break
cursor.execute("select * from (SELECT Open,Close,High,Low,Volume,ymdhms as datetime from " + symbol + " order by ymdhms desc limit 5000) subquery order by datetime")
result = cursor.fetchall()
conn.commit()
cursor.close()
conn.close()
df = pd.DataFrame(result)
df.columns = ['Open','Close','High','Low','Volume','datetime']
df = df.set_index('datetime')
df = df.sort_index()
df['datetime'] = df.index
return df
def get_coin_some_data(symbol, interval):
data = get_coin_data(symbol, interval)
data_1 = get_coin_data(symbol, interval=1)
data_1.at[data_1.index[-1], 'Volume'] = data_1['Volume'].iloc[-1] * 60
saved_data = get_coin_saved_data(symbol, interval, data)
data = pd.concat([data, saved_data, data_1.iloc[[-1]]], ignore_index=True)
#data = pd.concat([data, saved_data], ignore_index=True)
data['datetime'] = pd.to_datetime(data['datetime'], format='%Y-%m-%d %H:%M:%S')
data = data.set_index('datetime')
data = data.sort_index()
data = data.drop_duplicates(keep='first')
data["datetime"] = data.index
return data
def get_kr_stock_data(symbol, retries=3):
for attempt in range(retries):
try:
end = datetime.now()
start = end - timedelta(days=300)
# FinanceDataReader를 사용하여 한국 주식 데이터 가져오기
data = fdr.DataReader(symbol, start.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d'))
if not data.empty:
# FinanceDataReader의 컬럼명을 yfinance 형식으로 변환
data = data.rename(columns={
'Open': 'Open',
'High': 'High',
'Low': 'Low',
'Close': 'Close',
'Volume': 'Volume'
})
return data
print(f"No data received for {symbol}, attempt {attempt + 1}")
time.sleep(2)
except Exception as e:
print(f"Attempt {attempt + 1} failed for {symbol}: {str(e)}")
if attempt < retries - 1:
time.sleep(5)
continue
return None
def monitor_us_stocks():
message_list = []
print("US Stocks {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in US_STOCKS:
data = get_kr_stock_data(symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(symbol, data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {US_STOCKS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}")
message_list.append(format_message('US', symbol, US_STOCKS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1]))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
time.sleep(0.5)
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[US-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_kr_stocks():
message_list = []
print("KR ETFs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_ETFS:
try:
# .KS 접미사 제거
clean_symbol = symbol.replace('.KS', '')
data = get_kr_stock_data(clean_symbol)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(symbol, data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
print(f" - {KR_ETFS[symbol]} ({symbol}): {recent_data['Close'].iloc[-1]:.2f}")
message_list.append(format_message('KR', symbol, KR_ETFS[symbol], recent_data['Close'].iloc[-1], recent_data['buy_signal'].iloc[-1]))
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
# 각 심볼 처리 후 1초 대기
time.sleep(1)
except Exception as e:
print(f"Unexpected error processing {symbol}: {str(e)}")
continue
if len(message_list) > 0:
try:
send_stock_telegram_message(message_list, header="[KR-STOCK]")
except Exception as e:
print(f"Error sending Telegram message: {str(e)}")
return
def monitor_coins():
message_list = []
print("KRW COINs {}".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
for symbol in KR_COINS:
# 1시간
interval = 60
data = get_coin_some_data(symbol, interval)
if data is not None and not data.empty:
try:
data = calculate_technical_indicators(data)
recent_data = check_buy_point(symbol, data) # Changed to check_buy_point
if recent_data['buy_point'].iloc[-1] != 1:
continue
# buy
buy_success = buy_ticker(symbol, recent_data)
if not buy_success:
continue # 매수 금지 중이면 다음 코인으로 넘어감
except Exception as e:
print(f"Error processing data for {symbol}: {str(e)}")
else:
print(f"Data for {symbol} is empty or None.")
time.sleep(0.5)
return
def run_schedule():
# 코인 모니터링 스케줄 (매시간 4분, 14분, 24분, 34분, 44분, 54분)
for minute in [2, 12, 22, 32, 42, 52]:
schedule.every().hour.at(f":{minute:02d}").do(monitor_coins)
# 미국 주식 모니터링 스케줄 (매일 저녁 5시 20분)
schedule.every().day.at("16:30").do(monitor_us_stocks)
schedule.every().day.at("23:30").do(monitor_us_stocks)
schedule.every().day.at("05:10").do(monitor_us_stocks)
# 한국 ETF 모니터링 스케줄 (매일 오전 8시)
schedule.every().day.at("18:20").do(monitor_kr_stocks)
schedule.every().day.at("07:10").do(monitor_kr_stocks)
print("Scheduler started. Monitoring will run at specified times.")
print(f"Loaded cooldown data for {len(buy_cooldown)} coins")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
run_schedule()

View File

@@ -1,542 +0,0 @@
from dateutil.relativedelta import relativedelta
import pandas as pd
import yfinance as yf
import matplotlib.pyplot as plt
import matplotlib.dates
import mplcursors
plt.rcParams['font.family'] ='AppleGothic'
plt.rcParams['axes.unicode_minus'] =False
from config import *
from stock_monitor import calculate_technical_indicators, get_coin_more_data, check_buy_point
def detect_turnaround_signal(symbol, data, interval=0, params=None):
"""매수 신호 감지 함수 - stock_simulation.py에서 사용"""
if len(data) < 7:
return None
# 현재 매수 조건 확인
current_data = data.iloc[-1]
# 매수 신호가 있는지 확인
if current_data.get('buy_point', 0) == 1:
return {
'alert': True,
'details': f"매수신호: {current_data.get('buy_signal', 'unknown')}"
}
return {
'alert': False,
'details': "매수신호 없음"
}
# 비트/알트코인 KRW 마켓 식별: 문자열 "-KRW" 포함 여부로 간단 구분
INTERVAL_MAP = {
60: "60m", # 1시간 (yfinance)
240: "4h", # 4시간 (yfinance)
}
BITHUMB_MAX_COUNT = 3000 # API 최대 3000 캔들
def fetch_price_history(symbol: str, interval_minutes: int, days: int = 30) -> pd.DataFrame:
"""최근 `days`일 데이터(캔들)를 가져온다. 코인(-KRW)은 빗썸, 그 외 yfinance."""
if symbol in KR_COINS:
bong_count = 3000
return get_coin_more_data(symbol, interval_minutes, bong_count=bong_count)
# -------- 주식/ETF/해외코인 (yfinance) --------
if interval_minutes not in INTERVAL_MAP:
raise ValueError("interval must be 60 or 240")
interval_str = INTERVAL_MAP[interval_minutes]
df = yf.download(
tickers=symbol,
period=f"{days}d",
interval=interval_str,
progress=False,
)
if df.empty:
raise RuntimeError("No data fetched. Check symbol or interval support.")
return df
def analyze_bottom_period(symbol: str, interval_minutes: int, days: int = 90):
"""저점 기간(6월 22일~7월 9일) 분석 - 최적화된 버전"""
data = fetch_price_history(symbol, interval_minutes, days)
data = calculate_technical_indicators(data)
data = check_buy_point(symbol, data, simulation=True) # 한 번만 계산
print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}")
print(f"총 데이터 수: {len(data)}")
# 저점 기간 필터링 (6월 22일~7월 9일)
bottom_start = pd.Timestamp('2025-06-22')
bottom_end = pd.Timestamp('2025-07-09')
bottom_data = data[(data.index >= bottom_start) & (data.index <= bottom_end)]
if len(bottom_data) == 0:
print("저점 기간 데이터가 없습니다.")
return None, []
print(f"\n저점 기간 데이터: {bottom_data.index[0]} ~ {bottom_data.index[-1]}")
print(f"저점 기간 데이터 수: {len(bottom_data)}")
# 저점 기간의 기술적 지표 분석
print("\n=== 저점 기간 기술적 지표 분석 ===")
# 1. 가격 분석
min_price = bottom_data['Low'].min()
max_price = bottom_data['High'].max()
avg_price = bottom_data['Close'].mean()
print(f"최저가: {min_price:.4f}")
print(f"최고가: {max_price:.4f}")
print(f"평균가: {avg_price:.4f}")
print(f"가격 변동폭: {((max_price - min_price) / min_price * 100):.2f}%")
# 3. 볼린저 밴드 분석
bb_lower_min = bottom_data['Lower'].min()
bb_upper_max = bottom_data['Upper'].max()
print(f"\n볼린저 밴드 분석:")
print(f"하단 밴드 최저: {bb_lower_min:.4f}")
print(f"상단 밴드 최고: {bb_upper_max:.4f}")
# 4. 거래량 분석
volume_avg = bottom_data['Volume'].mean()
volume_max = bottom_data['Volume'].max()
print(f"\n거래량 분석:")
print(f"평균 거래량: {volume_avg:.0f}")
print(f"최대 거래량: {volume_max:.0f}")
# 5. 실제 저점 찾기
actual_bottom_idx = bottom_data['Low'].idxmin()
actual_bottom_price = bottom_data.loc[actual_bottom_idx, 'Low']
actual_bottom_date = actual_bottom_idx
print(f"\n실제 저점:")
print(f"날짜: {actual_bottom_date}")
print(f"가격: {actual_bottom_price:.4f}")
print(f"볼린저 하단 대비: {((actual_bottom_price - bottom_data.loc[actual_bottom_idx, 'Lower']) / bottom_data.loc[actual_bottom_idx, 'Lower'] * 100):.2f}%")
# 6. 매수 신호 분석 - 최적화된 버전
print(f"\n=== 매수 신호 분석 ===")
# 이미 계산된 매수 포인트에서 저점 기간만 필터링
bottom_alerts = bottom_data[bottom_data['buy_point'] == 1]
alerts = [(idx, row['Close']) for idx, row in bottom_alerts.iterrows()]
print(f"저점 기간 매수 신호 수: {len(alerts)}")
if alerts:
print("매수 신호 발생 시점:")
for date, price in alerts:
print(f" {date}: {price:.4f}")
return bottom_data, alerts
def run_simulation(symbol: str, interval_minutes: int, days: int = 30):
data = fetch_price_history(symbol, interval_minutes)
data = calculate_technical_indicators(data)
data = check_buy_point(symbol, data, simulation=True)
print(f"데이터 기간: {data.index[0]} ~ {data.index[-1]}")
print(f"총 데이터 수: {len(data)}")
# check_buy_point에서 이미 계산된 매수 포인트를 사용
alerts = []
for i in range(len(data)):
if data['buy_point'].iloc[i] == 1:
alerts.append((data.index[i], data['Close'].iloc[i]))
print(f"\n총 매수 신호 수: {len(alerts)}")
# 매수 신호 유형별 통계 출력
ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')])
dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')])
dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')])
dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')])
print(f" - MA 신호: {ma_signals}")
print(f" - Dev40 신호: {dev40_signals}")
print(f" - Dev240 신호: {dev240_signals}")
print(f" - Dev1440 신호: {dev1440_signals}")
# 서브플롯 생성 (가격 + Deviation)
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(15, 8), height_ratios=[3, 1])
fig.suptitle(f"{symbol} - 시뮬레이션 {interval_minutes}분봉", fontsize=14)
# ----------------- 마우스 휠 확대/축소 -----------------
def on_scroll(event):
# event.button: 'up' -> zoom in, 'down' -> zoom out
if event.inaxes not in [ax1, ax2]:
return
ax = event.inaxes
# x 축만 두 축을 동시에 조정
cur_xlim = ax1.get_xlim()
xdata = event.xdata
if xdata is None:
return
scale_factor = 0.9 if event.button == 'up' else 1/0.9
new_width = (cur_xlim[1] - cur_xlim[0]) * scale_factor
relx = (cur_xlim[1] - xdata) / (cur_xlim[1] - cur_xlim[0])
new_left = xdata - new_width * (1 - relx)
new_right = xdata + new_width * relx
# 데이터 영역 벗어나지 않도록 클램프
xmin, xmax = matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1])
if new_left < xmin:
new_left = xmin
if new_right > xmax:
new_right = xmax
ax1.set_xlim([new_left, new_right])
ax2.set_xlim([new_left, new_right])
# y축은 해당 축만 줌
cur_ylim = ax.get_ylim()
ydata = event.ydata
if ydata is not None:
new_height = (cur_ylim[1] - cur_ylim[0]) * scale_factor
rely = (cur_ylim[1] - ydata) / (cur_ylim[1] - cur_ylim[0])
ax.set_ylim([ydata - new_height * (1 - rely), ydata + new_height * rely])
ax.figure.canvas.draw_idle()
fig.canvas.mpl_connect('scroll_event', on_scroll)
# 캔들스틱 차트 추가 (matplotlib 기본 기능 사용)
import matplotlib.dates as mdates
# 캔들스틱 데이터 준비
ohlc_data = []
for i, (idx, row) in enumerate(data.iterrows()):
ohlc_data.append([
mdates.date2num(idx),
row['Open'],
row['High'],
row['Low'],
row['Close']
])
# 캔들스틱 그리기 (matplotlib 기본 기능으로 구현)
for ohlc in ohlc_data:
date, open_price, high, low, close = ohlc
# 캔들 색상 결정
color = 'red' if close >= open_price else 'blue'
# 캔들 몸통 그리기
body_height = abs(close - open_price)
body_bottom = min(open_price, close)
rect = plt.Rectangle((date - 0.3, body_bottom), 0.6, body_height,
facecolor=color, edgecolor='black', alpha=0.7)
ax1.add_patch(rect)
# 캔들 심지 그리기
ax1.plot([date, date], [low, high], color='black', linewidth=1)
# 메인 차트 (가격, 이동평균선, 볼린저 밴드)
line_close = ax1.plot(data.index, data["Close"], label="종가", color="black", linewidth=1.5, alpha=0.8)[0]
line_ma5 = ax1.plot(data.index, data["MA5"], label="MA5", color="red", linewidth=1)[0]
line_ma20 = ax1.plot(data.index, data["MA20"], label="MA20", color="blue", linewidth=1)[0]
line_ma40 = ax1.plot(data.index, data["MA40"], label="MA40", color="green", linewidth=1)[0]
line_ma120 = ax1.plot(data.index, data["MA120"], label="MA120", color="purple", linewidth=1)[0]
line_ma200 = ax1.plot(data.index, data["MA200"], label="MA200", color="brown", linewidth=1)[0]
line_ma240 = ax1.plot(data.index, data["MA240"], label="MA240", color="darkred", linewidth=1)[0]
line_ma720 = ax1.plot(data.index, data["MA720"], label="MA720", color="cyan", linewidth=1)[0]
line_ma1440 = ax1.plot(data.index, data["MA1440"], label="MA1440", color="magenta", linewidth=1)[0]
# Bollinger Bands
line_upper = ax1.plot(data.index, data["Upper"], label="볼린저 Upper", color="grey", linestyle="--", linewidth=1)[0]
line_lower = ax1.plot(data.index, data["Lower"], label="볼린저 Lower", color="grey", linestyle="--", linewidth=1)[0]
ax1.fill_between(data.index, data["Lower"], data["Upper"], color="grey", alpha=0.1)
# 매수 포인트를 신호 유형별로 다르게 표시 (수직선 포함)
# 이동평균선 기반 매수 포인트
ma_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')]
scatter_ma_buy_points = None
if len(ma_buy_points) > 0:
scatter_ma_buy_points = ax1.scatter(ma_buy_points.index, ma_buy_points['Close'], color='red', s=150, zorder=10, label='MA 매수 포인트', marker='o')
for time in ma_buy_points.index:
ax1.axvline(x=time, color='red', linestyle='-', alpha=0.5, linewidth=1)
# Deviation40 기반 매수 포인트
dev40_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')]
scatter_dev40_buy_points = None
if len(dev40_buy_points) > 0:
scatter_dev40_buy_points = ax1.scatter(dev40_buy_points.index, dev40_buy_points['Close'],
facecolors='none', edgecolors='red', linestyle='--',
linewidth=2, s=200, zorder=10, label='Dev40 매수 포인트')
for time in dev40_buy_points.index:
ax1.axvline(x=time, color='red', linestyle='--', alpha=0.5, linewidth=1)
# Deviation240 기반 매수 포인트
dev240_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')]
scatter_dev240_buy_points = None
if len(dev240_buy_points) > 0:
scatter_dev240_buy_points = ax1.scatter(dev240_buy_points.index, dev240_buy_points['Close'],
facecolors='none', edgecolors='blue', linestyle='--',
linewidth=2, s=200, zorder=10, label='Dev240 매수 포인트')
for time in dev240_buy_points.index:
ax1.axvline(x=time, color='blue', linestyle='--', alpha=0.5, linewidth=1)
# Deviation1440 기반 매수 포인트
dev1440_buy_points = data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')]
scatter_dev1440_buy_points = None
if len(dev1440_buy_points) > 0:
scatter_dev1440_buy_points = ax1.scatter(dev1440_buy_points.index, dev1440_buy_points['Close'],
facecolors='none', edgecolors='purple', linestyle='--',
linewidth=2, s=200, zorder=10, label='Dev1440 매수 포인트')
for time in dev1440_buy_points.index:
ax1.axvline(x=time, color='purple', linestyle='--', alpha=0.5, linewidth=1)
# 마우스 오버 기능 추가 (이동평균선 매수 포인트)
if scatter_ma_buy_points is not None:
cursor = mplcursors.cursor(scatter_ma_buy_points, hover=True)
cursor.connect("add", lambda sel: sel.annotation.set_text(
f'MA 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation40 매수 포인트)
if scatter_dev40_buy_points is not None:
cursor_dev40 = mplcursors.cursor(scatter_dev40_buy_points, hover=True)
cursor_dev40.connect("add", lambda sel: sel.annotation.set_text(
f'Dev40 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev40.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation240 매수 포인트)
if scatter_dev240_buy_points is not None:
cursor_dev240 = mplcursors.cursor(scatter_dev240_buy_points, hover=True)
cursor_dev240.connect("add", lambda sel: sel.annotation.set_text(
f'Dev240 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev240.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 마우스 오버 기능 추가 (Deviation1440 매수 포인트)
if scatter_dev1440_buy_points is not None:
cursor_dev1440 = mplcursors.cursor(scatter_dev1440_buy_points, hover=True)
cursor_dev1440.connect("add", lambda sel: sel.annotation.set_text(
f'Dev1440 매수신호\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor_dev1440.connect("remove", lambda sel: sel.annotation.set_visible(False))
# 모든 봉에 마우스 오버 기능 추가
cursor3 = mplcursors.cursor(line_close, hover=True)
cursor3.connect("add", lambda sel: sel.annotation.set_text(
f'종가\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime("%Y-%m-%d %H:%M")}\n가격: {sel.target[1]:.2f}'
))
cursor3.connect("remove", lambda sel: sel.annotation.set_visible(False))
ax1.set_ylabel("가격 (KRW)")
ax1.set_title(f"{symbol} 차트 - 캔들스틱 & 이동평균선", fontsize=14)
ax1.grid(True, linestyle='--', alpha=0.3)
# 홈 버튼 추가 (초기화 기능)
home_button = ax1.text(0.02, 0.98, '', transform=ax1.transAxes,
bbox=dict(boxstyle="round,pad=0.3", facecolor='lightblue', alpha=0.8),
fontsize=10, ha='left', va='top', picker=True)
# --- 범례 생성 및 인터랙티브 토글 ---
legend = ax1.legend(loc='upper left', bbox_to_anchor=(0.02, 0.85), fontsize=10)
# 범례 클릭 시 해당 선 토글 기능
lined = {}
# legend 핸들과 실제 plot 선을 매핑 (생성 순서가 동일하다고 가정)
# Matplotlib 버전에 따라 legend 객체의 핸들 보유 프로퍼티가 다를 수 있음
if hasattr(legend, "legend_handles"):
legend_handles = legend.legend_handles
elif hasattr(legend, "legendHandles"):
legend_handles = legend.legendHandles
else:
# 마지막 방어(일반적으로 선만 리턴)
legend_handles = legend.get_lines()
plot_lines = [line_close, line_ma5, line_ma20, line_ma40, line_ma120,
line_ma200, line_ma240, line_ma720, line_ma1440,
line_upper, line_lower]
# None이 아닌 scatter plot만 추가
if scatter_ma_buy_points is not None:
plot_lines.append(scatter_ma_buy_points)
if scatter_dev40_buy_points is not None:
plot_lines.append(scatter_dev40_buy_points)
if scatter_dev240_buy_points is not None:
plot_lines.append(scatter_dev240_buy_points)
if scatter_dev1440_buy_points is not None:
plot_lines.append(scatter_dev1440_buy_points)
# zip 길이가 짧은 쪽에 맞춰 매핑
for leg_handle, orig in zip(legend_handles, plot_lines):
leg_handle.set_picker(True) # 클릭 이벤트 활성화
lined[leg_handle] = orig
def on_pick(event):
leg_handle = event.artist
# 홈 버튼 클릭 처리
if leg_handle == home_button:
# 그래프 초기화
ax1.set_xlim(matplotlib.dates.date2num(data.index[0]), matplotlib.dates.date2num(data.index[-1]))
ax1.set_ylim(data['Low'].min() * 0.99, data['High'].max() * 1.01)
ax2.set_xlim(ax1.get_xlim())
ax2.set_ylim(data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].min().min() * 0.95,
data[['Deviation5', 'Deviation20', 'Deviation40', 'Deviation120', 'Deviation200', 'Deviation240', 'Deviation720', 'Deviation1440']].max().max() * 1.05)
fig.canvas.draw_idle()
return
# 기존 범례 클릭 처리
orig = lined.get(leg_handle)
if orig is None:
return
vis = not orig.get_visible()
orig.set_visible(vis)
# 범례 아이콘 투명도 조정
leg_handle.set_alpha(1.0 if vis else 0.2)
fig.canvas.draw_idle()
fig.canvas.mpl_connect('pick_event', on_pick)
# Deviation subplot (이격도 보조지표)
line_dev5 = ax2.plot(data.index, data['Deviation5'], color='red', label='Dev5', linewidth=1)[0]
line_dev20 = ax2.plot(data.index, data['Deviation20'], color='blue', label='Dev20', linewidth=1)[0]
line_dev40 = ax2.plot(data.index, data['Deviation40'], color='green', label='Dev40', linewidth=2)[0]
line_dev120 = ax2.plot(data.index, data['Deviation120'], color='purple', label='Dev120', linewidth=1)[0]
line_dev200 = ax2.plot(data.index, data['Deviation200'], color='brown', label='Dev200', linewidth=1)[0]
line_dev240 = ax2.plot(data.index, data['Deviation240'], color='darkred', label='Dev240', linewidth=2)[0]
line_dev720 = ax2.plot(data.index, data['Deviation720'], color='cyan', label='Dev720', linewidth=1)[0]
line_dev1440 = ax2.plot(data.index, data['Deviation1440'], color='magenta', label='Dev1440', linewidth=1)[0]
cursor_dev = mplcursors.cursor([line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440], hover=True)
cursor_dev.connect("add", lambda sel: sel.annotation.set_text(
f"{sel.artist.get_label()}\n날짜: {matplotlib.dates.num2date(sel.target[0]).replace(tzinfo=None).strftime('%Y-%m-%d %H:%M')}\n값: {sel.target[1]:.2f}"
))
# 이격도 기준선 추가
line_h90 = ax2.axhline(90, color='red', linestyle='--', linewidth=2, label='90', alpha=0.7)
line_h95 = ax2.axhline(95, color='green', linestyle='--', linewidth=2, label='95', alpha=0.7)
line_h100 = ax2.axhline(100, color='black', linestyle='-', linewidth=1, label='100', alpha=0.5)
ax2.set_ylabel('이격도 (%)')
ax2.set_title('이격도 보조지표', fontsize=12)
legend2 = ax2.legend(loc='upper left', fontsize=9)
# Deviation subplot 범례 클릭 토글 기능
if legend2 is not None:
if hasattr(legend2, "legend_handles"):
legend2_handles = legend2.legend_handles
elif hasattr(legend2, "legendHandles"):
legend2_handles = legend2.legendHandles
else:
legend2_handles = legend2.get_lines()
plot_lines2 = [line_dev5, line_dev20, line_dev40, line_dev120, line_dev200, line_dev240, line_dev720, line_dev1440, line_h90, line_h95]
# 레이블 기준으로 안정적 매핑
for leg_handle in legend2_handles:
label = leg_handle.get_label()
target_line = next((pl for pl in plot_lines2 if pl.get_label() == label), None)
if target_line is not None:
leg_handle.set_picker(True)
lined[leg_handle] = target_line
ax2.grid(True, linestyle='--', alpha=0.3)
plt.tight_layout()
print("그래프를 표시합니다...")
print(f"매수 포인트 수: MA={len(ma_buy_points)}, Dev40={len(dev40_buy_points)}, Dev240={len(dev240_buy_points)}")
# -------- 확대/축소 및 이동 기능 --------
press = {}
def on_scroll(event):
ax = event.inaxes
if ax is None:
return
x_left, x_right = ax.get_xlim()
x_range = (x_right - x_left)
if event.button == 'up': # zoom in
scale = 0.8
elif event.button == 'down': # zoom out
scale = 1.25
else:
scale = 1.0
new_range = x_range * scale
center = event.xdata if event.xdata is not None else (x_left + x_right) / 2
ax.set_xlim(center - new_range / 2, center + new_range / 2)
# 다른 축들도 동일 적용 (shared x)
for other_ax in fig.axes:
if other_ax is not ax:
other_ax.set_xlim(ax.get_xlim())
fig.canvas.draw_idle()
def on_press(event):
if event.button == 1 and event.inaxes is not None:
press['xpress'] = event.xdata
press['axes'] = event.inaxes
def on_motion(event):
if 'xpress' not in press or press.get('axes') is None or event.inaxes is None:
return
dx = press['xpress'] - event.xdata
for ax in fig.axes:
x_left, x_right = ax.get_xlim()
ax.set_xlim(x_left + dx, x_right + dx)
fig.canvas.draw_idle()
def on_release(event):
press.clear()
fig.canvas.mpl_connect('scroll_event', on_scroll)
fig.canvas.mpl_connect('button_press_event', on_press)
fig.canvas.mpl_connect('motion_notify_event', on_motion)
fig.canvas.mpl_connect('button_release_event', on_release)
plt.show()
if __name__ == "__main__":
interval = 60
days = 90 # 분석 기간을 90일로 늘림 (6월~8월 데이터 포함)
#target_coins = ['ADA','APE','ARB','BONK','HBAR','LINK','ONDO','PEPE','POL','SEI','SHIB','STORJ','SUI','TON','TRX','WLD','XLM','XRP']
target_coins = ['POL']
# 그래프 표시 여부 설정 (성능 향상을 위해 기본값은 False)
show_graphs = True # True로 설정하면 각 코인마다 그래프 표시
for symbol in target_coins:
print(f"\n=== {symbol} 저점 기간 분석 시작 ===")
try:
# 저점 기간 분석
bottom_data, alerts = analyze_bottom_period(symbol, interval, days)
# 전체 기간 시뮬레이션 (그래프 표시 옵션 추가)
print(f"\n=== {symbol} 전체 기간 시뮬레이션 ===")
if show_graphs:
run_simulation(symbol, interval, days)
else:
# 그래프 없이 빠른 분석만 수행
data = fetch_price_history(symbol, interval, days)
data = calculate_technical_indicators(data)
data = check_buy_point(symbol, data, simulation=True)
# 매수 신호 통계만 출력
total_buy_signals = len(data[data['buy_point'] == 1])
ma_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'movingaverage')])
dev40_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation40')])
dev240_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation240')])
dev1440_signals = len(data[(data['buy_point'] == 1) & (data['buy_signal'] == 'deviation1440')])
print(f"총 매수 신호: {total_buy_signals}")
print(f" - MA 신호: {ma_signals}")
print(f" - Dev40 신호: {dev40_signals}")
print(f" - Dev240 신호: {dev240_signals}")
print(f" - Dev1440 신호: {dev1440_signals}")
except Exception as e:
print(f"Error analyzing {symbol}: {str(e)}")