1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
| from web3 import Web3 import time import threading import sys import logging from logging.handlers import TimedRotatingFileHandler import os from stop_event_trigger import StopEventTrigger # 日志配置 log_dir = "logs" if not os.path.exists(log_dir): os.makedirs(log_dir)
app_name="block_monitor-7.143" can_call=False stop_event_trigger = StopEventTrigger() log_file = os.path.join(log_dir, "block_monitor.log")
# 创建一个TimedRotatingFileHandler,用于按照日期切割日志文件 file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7, encoding='utf-8') file_handler.setFormatter(logging.Formatter(app_name+' %(asctime)s - %(levelname)s - %(message)s'))
# 创建一个StreamHandler,用于将日志输出到控制台 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
# 配置root logger logging.basicConfig(level=logging.INFO, handlers=[file_handler, console_handler]) from chatbot import DingtalkChatbot
# 配置 DingTalk 消息通知 webhook = "https://oapi.dingtalk.com/robot/send?access_token={机器人TOKEN}" secret = "SEC开头密钥" dd = DingtalkChatbot(webhook, secret=secret, fail_notice=True)
private_rpc_url = "http://私有节点ip:8545/" # 公共节点 public_rpc_urls = [ "https://bsc-dataseed.binance.org", "https://bsc-dataseed1.defibit.io", "https://bsc-dataseed1.binance.org", "https://bsc-dataseed2.defibit.io", "https://bsc-dataseed3.ninicoin.io" ]
def get_ethereum_block_height(rpc_url): start_time = time.time() # 记录开始时间 web3 = Web3(Web3.HTTPProvider(rpc_url)) block_height = web3.eth.block_number end_time = time.time() # 记录结束时间 elapsed_time = end_time - start_time logging.info(f"节点 {rpc_url} 当前区块高度: {block_height},查询耗时: {elapsed_time:.3f}秒") return block_height
def send_dingtalk_message(message): dd.send_text(msg=message)
def monitor_block_height(private_rpc_url, public_rpc_urls): inspection_timer = 0 # 初始化巡检计时器,单位为秒
while True: try: my_block_height = get_ethereum_block_height(private_rpc_url)
# 获取所有节点的区块高度 public_block_heights = [get_ethereum_block_height(url) for url in public_rpc_urls] logging.info(f"私有节点高度:{my_block_height} 所有节点的区块高度: {public_block_heights}")
# 找到最大的区块高度 max_block_height = max(public_block_heights) logging.info(f"最大区块高度: {max_block_height}")
# 比对节点是否领先 # 如果私有节点的区块高度小于最大区块高度,并且区块差异大于3 则发送警告 diff_height = max_block_height - my_block_height if my_block_height < max_block_height and diff_height > 3: # if my_block_height < max_block_height: currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) message = f"{app_name}私有节点 {my_block_height} 低于于其他节点,最大区块高度为 {max_block_height},差异:{diff_height} 时间 {currentTime}" logging.warning(message) # 使用警告级别的日志 send_dingtalk_message(message) if can_call: stop_event_trigger.pin_point_and_stop_engine_event()
# 每隔10分钟发送一次巡检记录 if inspection_timer >= 1800: currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) inspection_message = f"{app_name} 巡检记录:私有节点 {my_block_height}," inspection_message += f"公共节点最大区块高度: {max_block_height}," inspection_message += f"时间 {currentTime}" logging.info(inspection_message) send_dingtalk_message(inspection_message) inspection_timer = 0 # 重置计时器
except Exception as e: logging.error(f"发生错误:{e}")
time.sleep(5) inspection_timer += 5 # 每次循环增加计时器的时间,单位为秒
if __name__ == "__main__": # 读取命令行传入参数 if len(sys.argv) > 1: app_name = sys.argv[1] if app_name is None: app_name = "block_monitor" if len(sys.argv) > 2: canCall = sys.argv[2] if canCall is None: canCall = False
if can_call: stop_event_trigger.pin_point_and_stop_engine_event(app_name)
logging.info(f"app_name: {app_name}")
t = threading.Thread(target=monitor_block_height, args=(private_rpc_url, public_rpc_urls)) t.start() t.join()
|