Python内核阅读(十一): 虚拟机的控制流

Python 2017-08-11

起步

在所有编程语言中, 除了顺序执行外, 控制跳转都是基本的功能, 这节将看看python提供的流程控制, 包括异常机制.

if 控制流

如以下的源码:

a = 1
if a > 10:
    print("a > 10")
elif a < -2:
    print("a < -2")
elif a == 1:
    print("a == 1")
else:
    print("unkonwn")

对应的code与字节码指令:

co_consts : (1, 10, 'a > 10', 2, 'a < -2', 'a == 1', 'unkonwn', None, -2)
co_names : ('a', 'print')
co_stacksize : 2

  1           0 LOAD_CONST               0 (1)
              2 STORE_NAME               0 (a)

  2           4 LOAD_NAME                0 (a)
              6 LOAD_CONST               1 (10)
              8 COMPARE_OP               4 (>)
             10 POP_JUMP_IF_FALSE       22

  3          12 LOAD_NAME                1 (print)
             14 LOAD_CONST               2 ('a > 10')
             16 CALL_FUNCTION            1
             18 POP_TOP
             20 JUMP_FORWARD            44 (to 66)

  4     >>   22 LOAD_NAME                0 (a)
             24 LOAD_CONST               8 (-2)
             26 COMPARE_OP               0 (<)
             28 POP_JUMP_IF_FALSE       40

  5          30 LOAD_NAME                1 (print)
             32 LOAD_CONST               4 ('a < -2')
             34 CALL_FUNCTION            1
             36 POP_TOP
             38 JUMP_FORWARD            26 (to 66)

  6     >>   40 LOAD_NAME                0 (a)
             42 LOAD_CONST               0 (1)
             44 COMPARE_OP               2 (==)
             46 POP_JUMP_IF_FALSE       58

  7          48 LOAD_NAME                1 (print)
             50 LOAD_CONST               5 ('a == 1')
             52 CALL_FUNCTION            1
             54 POP_TOP
             56 JUMP_FORWARD             8 (to 66)

  9     >>   58 LOAD_NAME                1 (print)
             60 LOAD_CONST               6 ('unkonwn')
             62 CALL_FUNCTION            1
             64 POP_TOP
        >>   66 LOAD_CONST               7 (None)
             68 RETURN_VALUE

比较操作

与其他编程语言语言, 在if语句中, 根据条件的不同, 会选择向左走还是向右走. 从上面第二行源码对应的字节码可知, COMPARE_OP指令进行比较, 在执行一条JUMP指令进行跳转, 比较指令的定义:

[object.h]
#define Py_LT 0
#define Py_LE 1
#define Py_EQ 2
#define Py_NE 3
#define Py_GT 4
#define Py_GE 5

[opcode.h]
enum cmp_op {PyCmp_LT=Py_LT, PyCmp_LE=Py_LE, PyCmp_EQ=Py_EQ, PyCmp_NE=Py_NE,
                PyCmp_GT=Py_GT, PyCmp_GE=Py_GE, PyCmp_IN, PyCmp_NOT_IN,
                PyCmp_IS, PyCmp_IS_NOT, PyCmp_EXC_MATCH, PyCmp_BAD};

大于 PyCmp_GT 操作对应了4, 因此 COMPARE_OP 指令的参数为PyCmp_GT:

TARGET(COMPARE_OP) {
    PyObject *right = POP();
    PyObject *left = TOP();
    PyObject *res = cmp_outcome(oparg, left, right);
    Py_DECREF(left);
    Py_DECREF(right);
    SET_TOP(res);
    if (res == NULL)
        goto error;
    PREDICT(POP_JUMP_IF_FALSE);
    PREDICT(POP_JUMP_IF_TRUE);
    DISPATCH();
}

比较操作都需要两个参数, 因此从栈中弹出两个元素, 调用 cmp_outcome 进行判断, 里面就是一个switch的判断了:

static PyObject * cmp_outcome(int op, PyObject *v, PyObject *w)
{
    int res = 0;
    switch (op) {
    case PyCmp_IS:
        res = (v == w);
        break;
    case PyCmp_IS_NOT:
        res = (v != w);
        break;
    case PyCmp_IN:
        ...
        break;
    case PyCmp_NOT_IN:
        ...
        break;
    case PyCmp_EXC_MATCH:
        ...
        break;
    default:
        return PyObject_RichCompare(v, w, op);
    }
    v = res ? Py_True : Py_False;
    Py_INCREF(v);
    return v;
}

这段代码中还能看到对in和not in的集合的判断操作 PyCmp_IN, PyCmp_NOT_IN , 这就对应了python代码上的if 1 in lst: . 这段打码也看到有进行is操作. 而显然, 例子中的大于操作走的是 PyObject_RichCompare , 在 PyObject_RichCompare 中, 首先会确保比较操作在 Py_LT 和 Py_GE 之间: assert(Py_LT <= op && op <= Py_GE); ; 如果进行比较的两个对象类型相同, 且这两个对象不是用户自定义的类实例, 那么会选择对象对应PyTypeObject定义的 tp_richcompare 操作; 如果类型对象没有定义这个动作, 就选择类型对象中定义的 tp_compare 操作. 在Python中, 无论是内建对象, 还是用户自定义的类实例对象, 其比较操作都是在各自对应的类型对象的 tp_richcomparetp_compare 中. 如果两个都没有, 就报出两个对象不支持比较操作的错误信息了.

bool 对象

比较操作的结果是一个bool值, bool值只有两个,真和假. python中用 Py_True Py_False 表示:

[boolobject.h]
/* Don't use these directly */
PyAPI_DATA(struct _longobject) _Py_FalseStruct, _Py_TrueStruct;

/* Use these macros */
#define Py_False ((PyObject *) &_Py_FalseStruct)
#define Py_True ((PyObject *) &_Py_TrueStruct)

c语言中没有bool类型, 采用的是1和0. python一样用int类型充当布尔型:

[boolobject.c]
struct _longobject _Py_FalseStruct = {
    PyVarObject_HEAD_INIT(&PyBool_Type, 0)
    { 0 }
};

struct _longobject _Py_TrueStruct = {
    PyVarObject_HEAD_INIT(&PyBool_Type, 1)
    { 1 }
};

从而可以得知, true和false两个python布尔值是常驻内存的.

指针跳跃

判断语句 if a > 10: 因为a赋值为1, 显然这里判断返回是false, 则需要进行跳转, 跳转指令使用了 PREDICT(POP_JUMP_IF_FALSE) 宏:

#define PREDICT(op) \
    do{ \
        _Py_CODEUNIT word = *next_instr; \
        opcode = _Py_OPCODE(word); \
        if (opcode == op){ \
            oparg = _Py_OPARG(word); \
            next_instr++; \
            goto PRED_##op; \
        } \
    } while(0)

#define PREDICTED(op)           PRED_##op:

这两个宏意味着, 如果有个地方使用了宏 PREDICTED(POP_JUMP_IF_FALSE); 那这个地方会替换为 PRED_POP_JUMP_IF_FALSE: , 用来作为 goto 的跳转点, 因此判断的结果就可以为:

...
if (*next_instr == POP_JUMP_IF_FALSE)
    goto PRED_POP_JUMP_IF_FALSE;
if (*next_instr == POP_JUMP_IF_TRUE)
    goto PRED_POP_JUMP_IF_TRUE

goto指令目录是为了绕过一些无谓的操作:

PREDICTED(POP_JUMP_IF_FALSE);
TARGET(POP_JUMP_IF_FALSE) {
    PyObject *cond = POP(); // 获取条件判断的结果
    int err;
    if (cond == Py_True) {  // true 顺序执行
        Py_DECREF(cond);
        FAST_DISPATCH();
    }
    if (cond == Py_False) { // false 进行跳转
        Py_DECREF(cond);
        JUMPTO(oparg);
        FAST_DISPATCH();
    }
    err = PyObject_IsTrue(cond); // 通用判断
    Py_DECREF(cond);
    if (err > 0)
        ;
    else if (err == 0)
        JUMPTO(oparg);
    else
        goto error;
    DISPATCH();
}

优先会判断bool类型的, 通用因为要兼容如if s:if xx_obj: 这种没有比较操作的布尔判断. 而最终进行跳转是宏 JUMPTO(oparg) 其中, 根据上下文可知参数 oparg = 22:

#define JUMPTO(x)       (next_instr = first_instr + (x) / sizeof(_Py_CODEUNIT))

_Py_CODEUNITuint16_t 的别名 typedef uint16_t _Py_CODEUNIT; 因此它的sizeof是2. 这边就是将指令跳到第11条指令了.

for 循环控制

在if控制中, 只存在两个分支结构, 也就是字节码指令仍然是向前跳跃的. 相对的, for循环则可以让指令回退. 例如如下的代码:

lst = [1, 2]
for i in lst:
    print(i)

对应的code和字节码指令:

co_consts : (1, 2, None)
co_names : ('lst', 'i', 'print')
co_stacksize : 3
1         0 LOAD_CONST               0 (1)
          2 LOAD_CONST               1 (2)
          4 BUILD_LIST               2
          6 STORE_NAME               0 (lst)

2         8 SETUP_LOOP              20 (to 30)
         10 LOAD_NAME                0 (lst)
         12 GET_ITER
    >>   14 FOR_ITER                12 (to 28)
         16 STORE_NAME               1 (i)

3        18 LOAD_NAME                2 (print)
         20 LOAD_NAME                1 (i)
         22 CALL_FUNCTION            1
         24 POP_TOP
         26 JUMP_ABSOLUTE           14
    >>   28 POP_BLOCK
    >>   30 LOAD_CONST               2 (None)
         32 RETURN_VALUE

循环结构初始化

for循环控制从 SETUP_LOOP 指令开始的:

TARGET(SETUP_LOOP)
TARGET(SETUP_EXCEPT)
TARGET(SETUP_FINALLY) {

    PyFrame_BlockSetup(f, opcode, INSTR_OFFSET() + oparg,
                       STACK_LEVEL());
    DISPATCH();
}

SETUP_LOOP 的实现中仅仅是简单调用一下 PyFrame_BlockSetup 函数:

[frameobject.c]
void PyFrame_BlockSetup(PyFrameObject *f, int type, int handler, int level)
{
    PyTryBlock *b;
    if (f->f_iblock >= CO_MAXBLOCKS)    // #define CO_MAXBLOCKS 20
        Py_FatalError("XXX block stack overflow");
    b = &f->f_blockstack[f->f_iblock++];
    b->b_type = type;
    b->b_level = level;
    b->b_handler = handler;
}

CO_MAXBLOCKS 被定义为20, 暂时不清楚这意味着什么, f->f_iblockPyFrame_New 中初始化为0;关于 PyTryBlock 的定义如下:

[frameobject.h]
typedef struct {
    int b_type;                 /* what kind of block this is */
    int b_handler;              /* where to jump to find handler */
    int b_level;                /* value stack level to pop to */
} PyTryBlock;

PyTryBlock 除了循环控制, 在异常机制中也有用到.

list 迭代器

SETUP_LOOP 指令是从PyFrameObject的f_blockstack中申请一块PyTryBlock, 再通过 GET_ITER 获取list对象的迭代器:

TARGET(GET_ITER) {
    /* before: [obj]; after [getiter(obj)] */
    PyObject *iterable = TOP();
    PyObject *iter = PyObject_GetIter(iterable);
    Py_DECREF(iterable);
    SET_TOP(iter);
    if (iter == NULL)
        goto error;
    PREDICT(FOR_ITER);
    PREDICT(CALL_FUNCTION);
    DISPATCH();
}

其中 #define TOP() (stack_pointer[-1]) ,居然有个负数索引? 我们知道, 对于一个数组array, array[i] 相当于 (array + i) . 那array[-1] 也就相当于 (array - 1) , 这样做虽然不安全却完全合法. 而SET_TOP就是 #define SET_TOP(v) (stack_pointer[-1] = (v)) , 这段替换成POP和PUSH也完全没问题. 总之, GET_ITER 指令是先从运行时栈中获取栈顶上的PyListObject对象, 然后通过 PyObject_GetIter 获取迭代器:

[abstract.c]
PyObject * PyObject_GetIter(PyObject *o)
{
    PyTypeObject *t = o->ob_type;
    getiterfunc f;

    f = t->tp_iter; // 获得类型对象的tp_iter操作
    if (f == NULL) {
        if (PySequence_Check(o))
            return PySeqIter_New(o);
        // 不支持迭代操作
        return type_error("'%.200s' object is not iterable", o);
    }
    else {
        // 通过tp_iter操作获得iterator
        PyObject *res = (*f)(o);

        return res;
    }
}

显然 PyObject_GetIter 是通过类型对象的 tp_iter 操作获得与对象关联的迭代器, 迭代器的概念类似于STL, 都是为容器元素遍历提供一层接口, 实现遍历操作与容器的具体分离. 而迭代器也是一个python对象:

[listobject.c]
typedef struct {
    PyObject_HEAD
    Py_ssize_t it_index;
    PyListObject *it_seq; /* Set to NULL when iterator is exhausted */
} listiterobject;

既然迭代器是PyObject对象, 那它肯定也有对应的类型对象:

PyTypeObject PyListIter_Type = {
    PyVarObject_HEAD_INIT(&PyType_Type, 0)
    "list_iterator",                            /* tp_name */
    sizeof(listiterobject),                     /* tp_basicsize */
    0,                                          /* tp_itemsize */
    /* methods */
    (destructor)listiter_dealloc,
    ... 
    PyObject_SelfIter,                          /* tp_iter */
    (iternextfunc)listiter_next,                /* tp_iternext */
    listiter_methods,                           /* tp_methods */
    0,                                          /* tp_members */
};

PyList_Type 中成员 tp_iter 赋值为 list_iter , 这就是前面的f, 也正是创建迭代器对象的实现所在:

[listobject.c]
static PyObject * list_iter(PyObject *seq)
{
    listiterobject *it;

    if (!PyList_Check(seq)) {
        PyErr_BadInternalCall();
        return NULL;
    }
    it = PyObject_GC_New(listiterobject, &PyListIter_Type);
    if (it == NULL)
        return NULL;
    it->it_index = 0;
    Py_INCREF(seq);
    it->it_seq = (PyListObject *)seq; // 这个seq是之前创建的PyListObject对象
    _PyObject_GC_TRACK(it);
    return (PyObject *)it;
}

从这可以看出, 迭代器是list上的一层封装, list对象只是迭代器中成员 it_seq . 迭代器维护了当前访问的元素在list对象中的序号即 it_index , 实现了对list的遍历.

GET_ITER指令之后就是FOR_ITER了.

迭代控制

获得数组迭代器后, 就应该进入一个循环结构了:

PREDICTED(FOR_ITER);
TARGET(FOR_ITER) {
    /* before: [iter]; after: [iter, iter()] *or* [] */
    PyObject *iter = TOP(); // 取得迭代器对象
    // 通过迭代器对象获取集合中下一个元素对象
    PyObject *next = (*iter->ob_type->tp_iternext)(iter);
    if (next != NULL) {
        // 入栈
        PUSH(next);
        PREDICT(STORE_FAST);
        PREDICT(UNPACK_SEQUENCE);
        DISPATCH();
    }
    /* iterator ended normally */
    // next = NULL 迭代正常结束
    STACKADJ(-1);
    Py_DECREF(iter);
    JUMPBY(oparg);
    PREDICT(POP_BLOCK);
    DISPATCH();
}

FOR_ITER指令首先从运行时栈中获得迭代器对象, 然后调用 tp_iternext 进行迭代. 该函数总是返回与迭代器关联的容器的下一个元素, 如果已经遍历完毕则返回NULL.

获得一个有效元素后, 将这个元素入栈, PREDICT(STORE_FAST); PREDICT(UNPACK_SEQUENCE); 会做一些字节码预测, 这边就假设这两句不存在吧.

[listobject.c]
static PyObject * listiter_next(listiterobject *it)
{
    PyListObject *seq;
    PyObject *item;

    assert(it != NULL);
    seq = it->it_seq;
    if (seq == NULL)
        return NULL;
    assert(PyList_Check(seq));

    if (it->it_index < PyList_GET_SIZE(seq)) {
        item = PyList_GET_ITEM(seq, it->it_index);
        ++it->it_index;
        Py_INCREF(item);
        return item;
    }

    it->it_seq = NULL;
    Py_DECREF(seq);
    return NULL;
}

大概意思就是 return it->it_seq->ob_item[ it->it_index++ ]; , 这之后python虚拟机沿着字节码顺序一条一条执行下去. 完成输出后有个 JUMP_ABSOLUTE 参数是14:

TARGET(JUMP_ABSOLUTE) {
    JUMPTO(oparg);
    DISPATCH();
}

跳转到 14 FOR_ITER 12 (to 28) 指令, 也就是进行下一次循环,

终止迭代

FOR_ITER 指令中迭代器遍历完成, 返回NULL值后, 执行 JUMPBY(oparg); 显然这里参数为12:

#define JUMPBY(x)       (next_instr += (x) / sizeof(_Py_CODEUNIT))

也就是将栈指针向前偏移6个指令, 也就是到 >> 28 POP_BLOCK :

PREDICTED(POP_BLOCK);
TARGET(POP_BLOCK) {
    PyTryBlock *b = PyFrame_BlockPop(f);
    UNWIND_BLOCK(b);
    DISPATCH();
}

其中宏 UNWIND_BLOCK 定义为:

#define UNWIND_BLOCK(b) \
    while (STACK_LEVEL() > (b)->b_level) { \
        PyObject *v = POP(); \
        Py_XDECREF(v); \
    }

#define STACK_LEVEL()     ((int)(stack_pointer - f->f_valuestack))

在执行 POP_BLOCK 的指令时, 会获取在 SETUP_LOOP 处申请的 PyTryBlock 结构, 实际上是将这个PyTryBlock结构归还给f->f_blockstack, 同时, 虚拟机会将运行时栈的深度信息恢复到SETUP_LOOP之前的状态, 从而完成整个for循环结构.

while 循环结构

对于代码:

a = 0
while a < 10:
    a += 1
    if a == 5:
        continue
    if a == 7:
        break
    print(a)

对应的code和字节码:

co_consts : (0, 10, 1, 5, 7, None)
co_names : ('a', 'print')
co_stacksize : 2
  1           0 LOAD_CONST               0 (0)
              2 STORE_NAME               0 (a)

  2           4 SETUP_LOOP              48 (to 54)
        >>    6 LOAD_NAME                0 (a)
              8 LOAD_CONST               1 (10)
             10 COMPARE_OP               0 (<)
             12 POP_JUMP_IF_FALSE       52

  3          14 LOAD_NAME                0 (a)
             16 LOAD_CONST               2 (1)
             18 INPLACE_ADD
             20 STORE_NAME               0 (a)

  4          22 LOAD_NAME                0 (a)
             24 LOAD_CONST               3 (5)
             26 COMPARE_OP               2 (==)
             28 POP_JUMP_IF_FALSE       32

  5          30 JUMP_ABSOLUTE            6

  6     >>   32 LOAD_NAME                0 (a)
             34 LOAD_CONST               4 (7)
             36 COMPARE_OP               2 (==)
             38 POP_JUMP_IF_FALSE       42

  7          40 BREAK_LOOP

  8     >>   42 LOAD_NAME                1 (print)
             44 LOAD_NAME                0 (a)
             46 CALL_FUNCTION            1
             48 POP_TOP
             50 JUMP_ABSOLUTE            6
        >>   52 POP_BLOCK
        >>   54 LOAD_CONST               5 (None)
             56 RETURN_VALUE

while循环一样通过 SETUP_LOOP 申请一块PyTryBlock空间. 这节主要探究 continuebreak.

continue 指令

a == 5 判断为true时, 执行了 continue 也就是对应指令中的 30 JUMP_ABSOLUTE 6:

PREDICTED(JUMP_ABSOLUTE);
TARGET(JUMP_ABSOLUTE) {
    JUMPTO(oparg);
    DISPATCH();
}

oparg的值是6, 这里就讲流程跳转到 6 LOAD_NAME 0 (a) 处了, 继续进行下一次的while判断.

break 指令

a == 5 判断为true时, 执行了 break 语句:

TARGET(BREAK_LOOP) {
    why = WHY_BREAK;
    goto fast_block_end;
}

将python虚拟机状态设置为 WHY_BREAK , 然后进入 fast_block_end 代码处. 该处也包含了异常处理机制的处理, 这里只显示和break相关:

fast_block_end:
    /* Unwind stacks if a (pseudo) exception occurred */
    while (why != WHY_NOT && f->f_iblock > 0) {
        /* Peek at the current block. */
        PyTryBlock *b = &f->f_blockstack[f->f_iblock - 1];
        ...
        f->f_iblock--;
        UNWIND_BLOCK(b);
        if (b->b_type == SETUP_LOOP && why == WHY_BREAK) {
            why = WHY_NOT;
            JUMPTO(b->b_handler);
            break;
        }
        ...
    }

首先得到在 SETUP_LOOP 指令申请的PyTryBlock, 将why值设为WHY_NOT, 表明退出状态没有任何错误, 再通过宏 JUMPTO 将虚拟机中下一条指令的指示器 next_instr 设置为距离code开始位置 b->b_handler 处的位置.

在执行 SETUP_LOOP 时, b->b_handler 是被设置为 INSTR_OFFSET() + oparg 的, 其中宏:

#define INSTR_OFFSET()  (sizeof(_Py_CODEUNIT) * (int)(next_instr - first_instr))

因此在指令 4 SETUP_LOOP 48 (to 54) 中, 此时, oparg = 48, (sizeof(_Py_CODEUNIT) * (int)(next_instr - first_instr)) 中, 下一条指令next_instr - first_instr是下一条在第一条的偏移量, 每条指令占两个字节, next_instr是第4条指令,所以next_instr - first_instr=_Py_CODEUNIT[3]-_Py_CODEUNIT[0] = 3.

所以 b->b_handler = INSTR_OFFSET() + oparg = (2 * (3 - 0) + 48 = 54

因为#define JUMPTO(x) (next_instr = first_instr + (x) / sizeof(_Py_CODEUNIT))

因此 JUMPTO(b->b_handler) 的动作就是, next_instr = 0 + 54 / 2 = 27. 也就是第27条指令:54 LOAD_CONST 5 (None) , 它已经跳出while循环了.


本文由 hongweipeng 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

赏个馒头吧