撸毛 Bera 圣诞节

熊链圣诞活动 Box 脚本,Dolomite 存一次Bera奖励2个 Box,集齐 NFT 圣诞开奖。

合约自动发送脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from web3 import Web3, HTTPProvider
from itertools import cycle
import time
from web3.middleware import geth_poa_middleware
import argparse
import csv
from proxy import get_proxy_config


# 节点列表
node_urls = [
"https://bera-testnet.nodeinfra.com",
"https://bartio.drpc.org",
"https://bartio.rpc.b-harvest.io",
"https://bartio.rpc.berachain.com",
"https://berat2.lava.build"
]

# 创建 Web3 对象
w3 = Web3()
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
# 定义一个迭代器,用于循环遍历节点列表
node_cycle = cycle(node_urls)


def get_next_node():
# 从节点列表中获取下一个节点
return next(node_cycle)


def convert_to_checksum_address(address):
checksum_address = w3.to_checksum_address(address)
return checksum_address


def send_transaction(sender_address,private_key,token,result_file):
nonce = 0
try:
# 获取当前节点
current_node = get_next_node()
print("use node : ", current_node)
# 设置 Web3 对象的提供程序为当前节点
w3.provider = HTTPProvider(current_node,request_kwargs={"proxies": get_proxy_config()})
# if (nonce < w3.eth.get_transaction_count(sender_address)):
# nonce = w3.eth.get_transaction_count(sender_address)
print("count:", w3.eth.get_transaction_count(convert_to_checksum_address('0x36864db0396b1ac36c5d6609ded5cc7f8073d08c')))
# 检查是否成功连接到节点
if w3.is_connected():
print("Connected to Ethereum node")
# 创建交易
transaction = {
'to':convert_to_checksum_address('0x36864db0396b1ac36c5d6609ded5cc7f8073d08c'),
'value': w3.to_wei(0.00001, 'ether'),
# 'gas': 22024,
# 'maxFeePerGas': w3.to_wei('900', 'gwei'),
# 'maxPriorityFeePerGas': w3.to_wei('120', 'gwei'),
# 'gasPrice': w3.to_wei('135', 'gwei'), # 设置 gas price
'gas': 500000,
'gasPrice': w3.to_wei('13', 'gwei'), # 设置 gas price
'nonce': w3.eth.get_transaction_count(sender_address),
'chainId': 80084,
'data': '0x33282ded',
# 你的数据,以十六进制表示
}

# 签名交易
signed_transaction = w3.eth.account.sign_transaction(transaction, private_key)
# 发送交易
transaction_hash = w3.eth.send_raw_transaction(signed_transaction.rawTransaction)
print(f"Transaction sent. Transaction Hash: {transaction_hash.hex()}")
# 将交易哈希保存到CSV文件
with open(result_file, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow([sender_address,transaction_hash.hex(),token])
time.sleep(2)
else:
print("Connection failed")
except Exception as e:
# 如果发生异常(例如节点不可用),记录错误并尝试下一个节点
print(f"Error using node {current_node}: {e}")
# if 'already' in e.args['message']:
# nonce += 1
# if 'already known' in e.args[0]["message"]:
# nonce += 1
# if 'nonce too low: ' in e.args[0]["message"]:
# nonce = w3.eth.get_transaction_count(sender_address)

# time.sleep(2) # 可以根据需要调整重试间隔

import pandas as pd
if __name__ == "__main__":

df = pd.read_csv("0_accounts.csv")
# 将数据转换为元组列表
address_private_key_tuple = list(zip(df['address'], df['private_key'], df['token']))
# 获取选择的发送者信息
while True:
for item in address_private_key_tuple:
print("use the sender : ", item[0])
send_transaction(item[0],item[1],item[2],'1_transaction_hashes.csv')
# send_transaction()
# print(convert_to_checksum_address(sender_address))

TxHash 提交脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import requests
import csv
import time
import base64
import os
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from proxy import get_proxy_config


def encrypt_with_aes256_cbc(plaintext):
# 固定的密钥字符串
s = "yQmRxqD#c^DefKhxeuK,2V-M?}3om~eu"

# 将字符串转换为字节
key = s.encode("utf-8")
# print("key:", key.hex())

# 生成随机的 16 字节 IV
iv = os.urandom(16)
# iv = bytes.fromhex("bcb5d322febaa005286401694c0e0dea")

# print("iv:", iv.hex())

# 创建 AES 加密器
cipher = AES.new(key, AES.MODE_CBC, iv)

# 使用 PKCS7 填充,并加密数据
plaintext_bytes = plaintext.encode("utf-8")
encrypted_bytes = cipher.encrypt(pad(plaintext_bytes, AES.block_size))

# 将 IV 和加密数据编码为 Base64 后拼接
iv_base64 = base64.b64encode(iv).decode("utf-8")
encrypted_base64 = base64.b64encode(encrypted_bytes).decode("utf-8")

return iv_base64 + encrypted_base64



def send_request(account,txHash, token,max_retries=3, retry_delay=3):
headers = {
'accept': '*/*',
'accept-language': 'zh,zh-CN;q=0.9,en;q=0.8,und;q=0.7',
'authorization': f'Bearer {token}',
'content-type': 'application/json',
'origin': 'https://beratown.dapdap.net',
'priority': 'u=1, i',
'referer': 'https://beratown.dapdap.net/',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
}
for attempt in range(max_retries):
try:
json_data = {
'action_type': 'Lending',
'account_id': account,
'template': 'Dolomite',
'sub_type': 'Supply',
'action_switch': 0,
'action_status': 'Success',
'tx_id': txHash,
'action_network_id': 'Berachain bArtio',
'chain_id': 80084,
'action_title': 'Deposit 0.000 BERA on Dolomite',
'action_tokens': '["BERA"]',
'action_amount': '0.0001',
'ss': '',
'source': 'lending',
'wallet': 'OKX Wallet',
}

t = f"template=Dolomite&action_type=Lending&tx_hash={txHash}&chain_id=80084&time={int(time.time())}"
print("hashTx:" + txHash)
# print("raw:" + t)
encrypted_data = encrypt_with_aes256_cbc(t)
json_data['ss'] = encrypted_data
# print("encrypted:" + encrypted_data)

response = requests.post('https://testnet-api.beratown.app/api/action/add', headers=headers, json=json_data,
proxies=get_proxy_config())
print(f"Response for address:{account} txHash {txHash}: {response.status_code}")
print(response.json())
if response.status_code == 200:
return response.status_code, response.json()
else:
print(f"Request failed with status code {response.status_code}. {response.text} Retrying...")
except Exception as e:
print(f"An error occurred: {e}. Retrying...")

time.sleep(retry_delay)

print(f"Failed to send request after {max_retries} attempts.")
return None, None


if __name__ == "__main__":
csv_file_path = '1_transaction_hashes.csv' # CSV文件路径

output_csv_file_path = '2_successful_hashes.csv' # 输出CSV文件路径

with open(csv_file_path, mode='r') as infile, open(output_csv_file_path, mode='a', newline='') as outfile:
reader = csv.reader(infile)
writer = csv.writer(outfile)
for row in reader:
if row: # 确保行不为空
address = row[0].strip()
txHash = row[1].strip()
token = row[2].strip()
status_code, response = send_request(address, txHash,token)
writer.writerow([address,txHash, token,status_code, response])

自动开箱

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import requests
import csv

import logging
# 配置日志记录
logging.basicConfig(
handlers=[
logging.FileHandler('claim.log'), # 将日志写入文件
logging.StreamHandler() # 同时输出到终端
],
level=logging.INFO, # 日志级别
format='%(asctime)s - %(levelname)s - %(message)s', # 日志格式
datefmt='%Y-%m-%d %H:%M:%S' # 日期格式
)

headers = {
'accept': '*/*',
'accept-language': 'zh,zh-CN;q=0.9,en;q=0.8,und;q=0.7',
'authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjozMTcwNTc1LCJleHAiOjE3MzczOTA0NjUsInN1YiI6ImFjY2VzcyJ9.pLUIJ0ClYe8vycR_uJm_OJ8AivvdJ-f-6S4b6VET9so',
'content-type': 'application/json',
'origin': 'https://beratown.dapdap.net',
'priority': 'u=1, i',
'referer': 'https://beratown.dapdap.net/',
'sec-ch-ua': '"Google Chrome";v="131", "Chromium";v="131", "Not_A Brand";v="24"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"macOS"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'cross-site',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
}



def send_get_request(account):
json_data = {
'all': False,
}
response1 = requests.post('https://testnet-api.beratown.app/api/mas/reward/draw', headers=headers, json=json_data)
url = f'https://testnet-api.beratown.app/api/mas/user/{account}'
logging.info(f'Sending GET request to {url}')
response2 = requests.get(url, headers=headers)
logging.info(f'Received response with status code {response1.status_code} {response2.status_code}')
return response1.status_code,response1.text,response2.status_code, response2.text


if __name__ == "__main__":
input_csv_file_path = '2_successful_hashes_1.csv' # 输入CSV文件路径
output_csv_file_path = '3_claim_results.csv' # 输出CSV文件路径

logging.info(f'Reading successful hashes from {input_csv_file_path}')
logging.info(f'Writing results to {output_csv_file_path}')

with open(input_csv_file_path, mode='r') as infile, open(output_csv_file_path, mode='w', newline='') as outfile:
reader = csv.reader(infile)
writer = csv.writer(outfile)

# 写入表头
writer.writerow(
['txHash', 'request_number', 'draw_status_code', 'draw_response', 'claim_status_code', 'claim_response'])

for row in reader:
if row: # 确保行不为空
address = row[0].strip()
txHash = row[1].strip()
logging.info(f'Processing txHash: {txHash}')
for i in range(2): # 执行两次GET请求
status_code1, response1, status_code2, response2 = send_get_request(address)
writer.writerow([txHash, i + 1, status_code1, response1, status_code2, response2])
logging.info(
f'Request {i + 1} for {address} txHash {txHash} completed with status code {status_code1} {status_code2}')


Author

Gavin

Posted on

2024-12-22

Updated on

2024-12-22

Licensed under

Comments