diff --git a/111 b/111 new file mode 100644 index 0000000000000000000000000000000000000000..54ca031d0691e46102dcffc420462804c1c041da --- /dev/null +++ b/111 @@ -0,0 +1,113 @@ +import requests + +def send_serverchan_alert(message, title="ETH均线交叉提醒"): + sckey = 'SCT272757TXYu81pAVEhHFWtoj7KkqRq0r' # 替换为你的SendKey + url = f'https://sctapi.ftqq.com/{sckey}.send' + params = { + 'text': title, + 'desp': message + } + response = requests.get(url, params=params) + return response.json() +import requests +import time +import pandas as pd +from binance.client import Client +from datetime import datetime + +# 配置参数 +TELEGRAM_BOT_TOKEN = 'your_telegram_bot_token' +TELEGRAM_CHAT_ID = 'your_chat_id' +BINANCE_API_KEY = 'your_binance_api_key' +BINANCE_SECRET_KEY = 'your_binance_secret_key' + +# 初始化Binance客户端 +client = Client(BINANCE_API_KEY, BINANCE_SECRET_KEY) + +# 时间参数 +INTERVAL = Client.KLINE_INTERVAL_15MINUTE +LONG_PERIOD = 2880 # 120日(15分钟*2880=43200分钟=720小时=30天*24) +SHORT_PERIOD = 1 # 15分钟线 + +def get_historical_data(): + """获取历史数据""" + klines = client.get_klines( + symbol='ETHUSDT', + interval=INTERVAL, + limit=LONG_PERIOD + 100 # 多取100根确保足够计算 + ) + df = pd.DataFrame(klines, columns=[ + 'timestamp', 'open', 'high', 'low', 'close', 'volume', + 'close_time', 'quote_asset_volume', 'number_of_trades', + 'taker_buy_base', 'taker_buy_quote', 'ignore' + ]) + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') + df['close'] = df['close'].astype(float) + return df + +def calculate_ma(df): + """计算均线""" + df['120_ma'] = df['close'].rolling(window=LONG_PERIOD).mean() + df['15m_close'] = df['close'] + return df.dropna() + +def check_crossover(df): + """检查均线交叉""" + latest = df.iloc[-1] + previous = df.iloc[-2] + + # 上穿检查 + if (latest['15m_close'] > latest['120_ma'] and + previous['15m_close'] <= previous['120_ma']): + return 'UP' + + # 下穿检查 + if (latest['15m_close'] < latest['120_ma'] and + previous['15m_close'] >= previous['120_ma']): + return 'DOWN' + + return None + +def send_telegram_alert(message): + """发送Telegram提醒""" + url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" + params = { + 'chat_id': TELEGRAM_CHAT_ID, + 'text': message + } + requests.post(url, params=params) + +def main(): + last_alert = None + while True: + try: + df = get_historical_data() + df = calculate_ma(df) + + if len(df) < 2: + continue + + signal = check_crossover(df) + + if signal: + price = df.iloc[-1]['close'] + ma_value = df.iloc[-1]['120_ma'] + timestamp = df.iloc[-1]['timestamp'].strftime('%Y-%m-%d %H:%M:%S') + + message = f"🚨 ETH 15分钟线 {signal}穿120日均线\n"\ + f"时间: {timestamp}\n"\ + f"当前价格: {price:.2f}\n"\ + f"120日均线值: {ma_value:.2f}" + + if message != last_alert: + send_telegram_alert(message) + last_alert = message + + time.sleep(900) # 15分钟检查一次 + + except Exception as e: + print(f"Error: {e}") + time.sleep(60) + +if __name__ == "__main__": + main()