BSC节点区块监控脚本

脚本主要监听私有BSC节点区块状态,如发生区块漏块过多,发送告警消息到DD群中,carry-coin调整rpc访问策略;

监控脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from web3 import Web3
import time
import threading
import sys
import logging
from logging.handlers import TimedRotatingFileHandler
import os
from stop_event_trigger import StopEventTrigger
# 日志配置
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)

app_name="block_monitor-7.143"
can_call=False
stop_event_trigger = StopEventTrigger()
log_file = os.path.join(log_dir, "block_monitor.log")

# 创建一个TimedRotatingFileHandler,用于按照日期切割日志文件
file_handler = TimedRotatingFileHandler(log_file, when="midnight", interval=1, backupCount=7, encoding='utf-8')
file_handler.setFormatter(logging.Formatter(app_name+' %(asctime)s - %(levelname)s - %(message)s'))

# 创建一个StreamHandler,用于将日志输出到控制台
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))

# 配置root logger
logging.basicConfig(level=logging.INFO, handlers=[file_handler, console_handler])
from chatbot import DingtalkChatbot

# 配置 DingTalk 消息通知
webhook = "https://oapi.dingtalk.com/robot/send?access_token={机器人TOKEN}"
secret = "SEC开头密钥"
dd = DingtalkChatbot(webhook, secret=secret, fail_notice=True)

private_rpc_url = "http://私有节点ip:8545/"
# 公共节点
public_rpc_urls = [
"https://bsc-dataseed.binance.org",
"https://bsc-dataseed1.defibit.io",
"https://bsc-dataseed1.binance.org",
"https://bsc-dataseed2.defibit.io",
"https://bsc-dataseed3.ninicoin.io"
]


def get_ethereum_block_height(rpc_url):
start_time = time.time() # 记录开始时间
web3 = Web3(Web3.HTTPProvider(rpc_url))
block_height = web3.eth.block_number
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time
logging.info(f"节点 {rpc_url} 当前区块高度: {block_height},查询耗时: {elapsed_time:.3f}秒")
return block_height


def send_dingtalk_message(message):
dd.send_text(msg=message)


def monitor_block_height(private_rpc_url, public_rpc_urls):
inspection_timer = 0 # 初始化巡检计时器,单位为秒

while True:
try:
my_block_height = get_ethereum_block_height(private_rpc_url)

# 获取所有节点的区块高度
public_block_heights = [get_ethereum_block_height(url) for url in public_rpc_urls]
logging.info(f"私有节点高度:{my_block_height} 所有节点的区块高度: {public_block_heights}")

# 找到最大的区块高度
max_block_height = max(public_block_heights)
logging.info(f"最大区块高度: {max_block_height}")

# 比对节点是否领先
# 如果私有节点的区块高度小于最大区块高度,并且区块差异大于3 则发送警告
diff_height = max_block_height - my_block_height
if my_block_height < max_block_height and diff_height > 3:
# if my_block_height < max_block_height:
currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
message = f"{app_name}私有节点 {my_block_height} 低于于其他节点,最大区块高度为 {max_block_height},差异:{diff_height} 时间 {currentTime}"
logging.warning(message) # 使用警告级别的日志
send_dingtalk_message(message)
if can_call:
stop_event_trigger.pin_point_and_stop_engine_event()

# 每隔10分钟发送一次巡检记录
if inspection_timer >= 1800:
currentTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
inspection_message = f"{app_name} 巡检记录:私有节点 {my_block_height},"
inspection_message += f"公共节点最大区块高度: {max_block_height},"
inspection_message += f"时间 {currentTime}"
logging.info(inspection_message)
send_dingtalk_message(inspection_message)
inspection_timer = 0 # 重置计时器

except Exception as e:
logging.error(f"发生错误:{e}")

time.sleep(5)
inspection_timer += 5 # 每次循环增加计时器的时间,单位为秒

if __name__ == "__main__":
# 读取命令行传入参数
if len(sys.argv) > 1:
app_name = sys.argv[1]
if app_name is None:
app_name = "block_monitor"
if len(sys.argv) > 2:
canCall = sys.argv[2]
if canCall is None:
canCall = False

if can_call:
stop_event_trigger.pin_point_and_stop_engine_event(app_name)

logging.info(f"app_name: {app_name}")

t = threading.Thread(target=monitor_block_height, args=(private_rpc_url, public_rpc_urls))
t.start()
t.join()

启动脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash
# 增加一个启动参数,app_name,如传入则使用传入的app_name,并加入到启动命令中
app_name=$1

# 设置启动日志文件路径
start_log_file="logs/start_script.log"

# 启动 Python 脚本,并将输出保存到启动日志文件
nohup python3.10 monitor.py $app_name > $start_log_file 2>&1 &

# 获取启动的 Python 进程的PID并保存到文件中
echo $! > pid_file.txt

echo "脚本已在后台运行。PID为:$(cat pid_file.txt)"

停止脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/bin/bash

# 获取之前保存的PID文件
pid_file="pid_file.txt"
pid=$(cat $pid_file 2>/dev/null)

if [ -z "$pid" ]; then
echo "未找到PID。脚本可能未在运行。"
else
# 终止Python进程
kill -TERM $pid
echo "PID为 $pid 的脚本已停止。"

# 删除PID文件
rm $pid_file
fi
Author

Gavin

Posted on

2024-11-02

Updated on

2024-11-02

Licensed under

Comments