from datetime import datetime import telegram import asyncio import platform from multiprocessing import Pool class TelegramBot: enable = None BOT_TOKEN = None CHANNEL_ID = None client = None def __init__(self, enable=True): """ botname: coinbot username for the bot: ncue_coin_bot token to access the HTTP API: 6435061393:AAHOh9wB5yGNGUdb3SfCYJrrWTBe7wgConM botname: lottobot username for the bot: ncue_lotto_bot token to access the HTTP API:6791293398:AAFi1zrQTs6UmuHycAuNdsBgHDHaHcOJcYA botname: stockbot username for the bot: ncue_stock_bot token to access the HTTP API: 6874078562:AAEHxGDavfc0ssAXPQIaW8JGYmTR7LNUJOw """ self.botname = "coinbot" self.username = "ncue_coin_bot" self.token = "6435061393:AAHOh9wB5yGNGUdb3SfCYJrrWTBe7wgConM" self.chat_id = '574661323' self.client = telegram.Bot(token=self.token) self.enable = enable return # https://velog.io/@gyunghoe/%ED%85%94%EB%A0%88%EA%B7%B8%EB%9E%A8-%EB%B4%87-%EC%84%B1%EB%8A%A5-%EC%B5%9C%EC%A0%81%ED%99%94%ED%95%98%EA%B8%B0 @staticmethod def send(text): client = telegram.Bot(token="6435061393:AAHOh9wB5yGNGUdb3SfCYJrrWTBe7wgConM") if platform.system().lower() == 'windows': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(client.send_message(chat_id='574661323', text=text)) return def alarm_live(self, stock_code, stock_name): if self.enable: this_time = datetime.now() text = "[ALIVE] {} {} ({})".format(this_time.strftime('%H:%M'), stock_code, stock_name) pool = Pool(12) pool.map(self.send, [text]) print(text) return def post(self, stock_code, stock_name, type, price, amount, rsi, balance=0): if self.enable: this_time = datetime.now() if 0 < balance: text = "{}, {}, code: {}, name: {}, price: {}, amount: {}, (balance: {:2f}), (rsi: {:2f})".format(type, this_time.strftime('%H:%M'), stock_code, stock_name, price, amount, balance, rsi) else: text = "{}, {}, code: {}, name: {}, price: {}, amount: {}, (rsi: {:2f})".format(type, this_time.strftime('%H:%M'), stock_code, stock_name, price, amount, rsi) pool = Pool(12) pool.map(self.send, [text]) print(text) return def sendMsg(self, msg): if self.enable: this_time = datetime.now() text = "{}: {}".format(this_time.strftime('%H:%M'), msg) pool = Pool(12) pool.map(self.send, [text]) print(text) return if __name__ == "__main__": this_time = datetime.now() stock_code = "252670" stock_name = "x2" type = "BUY" price = 2000 telegramBot = TelegramBot() telegramBot.alarm_live(stock_code, stock_name) telegramBot.post(stock_code, stock_name, type, price)