Python内核阅读(十九): 多线程机制(上)

Python 2017-08-30

起步

现代编程语言都为多线程开发提供了很好的支持. python中的线程是操作系统的原生线程, 而python虚拟机使用一个全局解释器锁(GIL)来互斥线程对虚拟机的使用.

GIL与线程调度

如果有两个线程A和B. 它们都保存这内存中同一个对象obj的引用, 也就是 obj->ob_refcnt = 2 . 如果A销毁对obj的引用, A将通过宏 Py_DECREF(op):

[object.h]
#define Py_DECREF(op)                                   \
    do {                                                \
        PyObject *_py_decref_tmp = (PyObject *)(op);    \
        if (_Py_DEC_REFTOTAL  _Py_REF_DEBUG_COMMA       \
        --(_py_decref_tmp)->ob_refcnt != 0)             \
            _Py_CHECK_REFCNT(_py_decref_tmp)            \
        else                                            \
            _Py_Dealloc(_py_decref_tmp);                \
    } while (0)

如果A线程执行完后, obj->ob_refcnt的值变成1, 但恰好线程被挂起, 线程B被激活,线程B中把obj->ob_refcnt值设为了0, 还顺利完成了内存释放. 当线程A再会唤醒. 但这是obj->ob_refcnt是0了, A会按照规则再一次内存释放. 同一个内存被释放两次会发生什么, 只有老天知道了.

为了支持多线程机制, 一个基本要求就是实现不同线程对共享资源访问的互斥. 因此python引入了GIL. 这是一个非常霸道的互斥实现. 一个线程拥有解释器访问权后, 其他所有线程都必须等它释放解释器的访问权. 这样的保护粒度太大.

为什么不只对共享资源加锁就好了呢? 在python发展历史中, 曾经有个分支去掉了GIL, 提供了颗粒更低的锁. 但是这个版本在单线程测试, 效率只有GIL的一半. 这是因为加锁和解锁对操作系统来说是一个比较重量级的操作, 频繁的加解锁导致性能低下.

另一方面, 由于很多python 的C 扩展也大大的依赖GIL. 由于GIL的缘故, 使得本来应该并行计算变成了串行.

也有一种可行的方案, 就是多进程, 每个进程都有自己的GIL. 它们就能完成并发, 发挥多核CPU的优势.

python模拟了进程, 线程操作, 也模拟了系统的时钟中断, 也就是执行n条字节码指令后启动线程调度机制. 这个n默认设定的是:

>>> import sys
>>> sys.getcheckinterval()
100

那么, 究竟是要唤醒那个线程来执行呢, 答案是未知,这里完全靠系统的线程调度, python完全没有插手.

线程的创建

python所提供的多线程机制的接口是 threading 模块, 这个模块源码在 Lib/threading.py , 多线程底层是c实现, python提供了更高层的封装, 使用如下例子:

import threading
import time

def threadProc():
    print("sub thread id : ", threading.get_ident())
    while True:
        print("sub thread ", threading.get_ident())
        time.sleep(1)

print("main thread ", threading.get_ident())
threading._start_new_thread(threadProc, ())
while True:
    print("main thread ", threading.get_ident())
    time.sleep(1)

# output
main thread  3656
main thread  3656
sub thread id :  8704
sub thread  8704
main thread  3656
sub thread  8704
main thread  3656
sub thread  8704
main thread  3656
sub thread  8704

threading.py中, 其实是对 _module 模块的简单包装, 甚至可以说是只是取个别名:

[Lib/threading.py]
import _thread
_start_new_thread = _thread.start_new_thread
_allocate_lock = _thread.allocate_lock
_set_sentinel = _thread._set_sentinel
get_ident = _thread.get_ident
ThreadError = _thread.error

_thread的实现在 _threadmodule.c 中, 这是一个内建模块:

[Modules/_threadmodule.c]
static PyMethodDef thread_methods[] = {
    {"start_new_thread",        (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"start_new",               (PyCFunction)thread_PyThread_start_new_thread,
     METH_VARARGS, start_new_doc},
    {"allocate_lock",           (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    {"allocate",                (PyCFunction)thread_PyThread_allocate_lock,
     METH_NOARGS, allocate_doc},
    ...
    {NULL,                      NULL}           /* sentinel */
};

接口可以说是少了, 这使得python的多线程编程简单而方便. 有的接口甚至只是不同形式的出现, 比如 start_new_threadstart_new 都是用 thread_PyThread_start_new_thread 这个函数, 包括参数完全一样. 估计是要兼容旧版的写法吧.

python 线程的创建

[Modules/_threadmodule.c]
static PyObject * thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs)
{
    PyObject *func, *args, *keyw = NULL;
    struct bootstate *boot;
    unsigned long ident;
    // 处理参数
    if (!PyArg_UnpackTuple(fargs, "start_new_thread", 2, 3,
                           &func, &args, &keyw))
        return NULL;
    // 创建 bootstate 结构
    boot = PyMem_NEW(struct bootstate, 1);
    if (boot == NULL)
        return PyErr_NoMemory();
    boot->interp = PyThreadState_GET()->interp;
    boot->func = func;
    boot->args = args;
    boot->keyw = keyw;
    boot->tstate = _PyThreadState_Prealloc(boot->interp);
    if (boot->tstate == NULL) {
        PyMem_DEL(boot);
        return PyErr_NoMemory();
    }
    Py_INCREF(func);
    Py_INCREF(args);
    Py_XINCREF(keyw);
    PyEval_InitThreads(); // 初始化多线程环境
    // 创建线程
    ident = PyThread_start_new_thread(t_bootstrap, (void*) boot);

    return PyLong_FromUnsignedLong(ident);
}

这个函数主要有三个动作:

  • 创建并初始化 bootstate 结构 boot.
  • 初始化python的多线程环境
  • 以boot为参数, 创建操作系统的原生线程

boot->interp = PyThreadState_GET()->interp; 保存了全局信息,interp 当然也包含了sys.modules, 因此所有线程都会共享这些全局信息.

为什么会有初始化多线程环境呢? 在python启动时, 是不支持多线程的. 因此支持多线程的GIL都是没有创建的. 因为大多数脚本都不需要多线程支持, 虚拟机的多线程规则会做无用功, 所以python是当用户需要多线程时才激活多线程机制.

建立多线程环境

多线程机制要先知道GIL在哪, 如何创建的:

[pythread.h]
typedef void *PyThread_type_lock;

[cveal.c]
static PyThread_type_lock pending_lock = 0; /* for pending calls */
static unsigned long main_thread = 0;
/* Request for dropping the GIL */
static _Py_atomic_int gil_drop_request = {0};

void PyEval_InitThreads(void)
{
    if (gil_created())  // 如果初始化过了
        return;
    create_gil();
    take_gil(PyThreadState_GET());
    main_thread = PyThread_get_thread_ident();
    if (!pending_lock)
        pending_lock = PyThread_allocate_lock();
}

可以看到, 无论创建多少个线程, 初始多线程环境的动作只会执行一次. pending_lock是 void * 类型, 它能指向任意东西. 创建GIL的工作由 create_gil 函数完成.

创建GIL

[ceval_gil.h]
static _Py_atomic_int gil_locked = {-1};
static _Py_atomic_address gil_last_holder = {0};
static COND_T gil_cond;
static MUTEX_T gil_mutex;

static void create_gil(void)
{
    MUTEX_INIT(gil_mutex);
    MUTEX_INIT(switch_mutex);
    COND_INIT(gil_cond);
    COND_INIT(switch_cond);
    _Py_atomic_store_relaxed(&gil_last_holder, 0);
    _Py_ANNOTATE_RWLOCK_CREATE(&gil_locked);
    _Py_atomic_store_explicit(&gil_locked, 0, _Py_memory_order_release);
}

这里就是GIL的神秘面目, 它 gil_locked 是一个原子变量. 将其初始为1 .

 获得GIL

[ceval_gil.h]
static void take_gil(PyThreadState *tstate)
{
    int err;
    err = errno;
    MUTEX_LOCK(gil_mutex);
    if (!_Py_atomic_load_relaxed(&gil_locked))
        goto _ready;

    while (_Py_atomic_load_relaxed(&gil_locked)) {
        int timed_out = 0;
        COND_TIMED_WAIT(gil_cond, gil_mutex, INTERVAL, timed_out);
        ...
    }
_ready:
    ...
    _Py_atomic_store_relaxed(&gil_locked, 1);
    ...
}

在获得GIL的函数中, 有一个 while 判断直到GIL被释放才结束, 而后 _Py_atomic_store_relaxed(&gil_locked, 1); 再次进行锁定.

线程环境初始化

初始化线程:

[Python/thread_pthread.h]
PyThread_type_lock PyThread_allocate_lock(void)
{
    sem_t *lock;
    int status, error = 0;

    if (!initialized)
        PyThread_init_thread();

    lock = (sem_t *)PyMem_RawMalloc(sizeof(sem_t));
    if (lock) {
        status = sem_init(lock,0,1);// 此处初始信号量设为1. 第二个参数为0表示不应用于其他进程.
        CHECK_STATUS("sem_init");
    }
    return (PyThread_type_lock)lock;
}

在linux平台中, GIL采用信号量的数据接口 sem_t 来作为标识用于多线程同步. 那windows系统用什么? python的多线程机制与平台有相关性, 需要进行封装成统一接口给python. 关于windows的线程创建则在 thread_nt.h 中定义. 他们封装原生thread. 提供给 thread.c 使用. 因此, 与平台相关的代码, 我们都以linux为例.

PyThread_allocate_lock 函数中, 有个 initialized 变量用来检测初始化是否完成, 这边的初始化是指原生thread所需的初始化动作.

[thread.c]
void PyThread_init_thread(void)
{
    if (initialized)
        return;
    initialized = 1;
    PyThread__init_thread();
}

[thread_pthread.h]
static void PyThread__init_thread(void)
{
}

PyThread_init_thread 的作用就是设置变量 initialized . thread_pthread.h 中会定义许多 PyThread__init_thread 函数, 也是针对不同的平台依赖, 在我的测试平台中, 这个函数什么都没做.

PyThread_allocate_lock 函数中用 sem_t 信号量, sem_init(lock,0,1) 设置好它的共享选项只能是当前进程使用, 并设置初始值1.

子线程的诞生

在完成了多线程环境的初始化后, Python会创建平台的原生thread. 以最上面的python代码为例. 通过了 threading._start_new_thread 创建线程, 也就是调用 thread_PyThread_start_new_thread(PyObject *self, PyObject *fargs) 函数. 函数里创建了 bootstate 结构并设置其成员, 然后调用 PyThread_start_new_thread 创建:

[thread.c]
/* Support for runtime thread stack size tuning.
   A value of 0 means using the platform's default stack size
   or the size specified by the THREAD_STACK_SIZE macro. */
static size_t _pythread_stacksize = 0;

[thread_pthread.h]
// 对#ifdef 进行的手动整理
unsigned long PyThread_start_new_thread(void (*func)(void *), void *arg)
{
    pthread_t th;
    int status;
    pthread_attr_t attrs;
    size_t      tss;

    dprintf(("PyThread_start_new_thread called\n"));
    if (!initialized)
        PyThread_init_thread();

    if (pthread_attr_init(&attrs) != 0)
        return PYTHREAD_INVALID_THREAD_ID;

    tss = (_pythread_stacksize != 0) ? _pythread_stacksize
                                     : THREAD_STACK_SIZE;
    if (tss != 0) {
        if (pthread_attr_setstacksize(&attrs, tss) != 0) {
            pthread_attr_destroy(&attrs);
            return PYTHREAD_INVALID_THREAD_ID;
        }
    }

    pthread_attr_setscope(&attrs, PTHREAD_SCOPE_SYSTEM);

    status = pthread_create(&th,
                             &attrs,
                             (void* (*)(void *))func,
                             (void *)arg
                             );

    pthread_attr_destroy(&attrs);
    if (status != 0)
        return PYTHREAD_INVALID_THREAD_ID;

    pthread_detach(th);
    return (unsigned long) th;
}

主线程上调用 PyThread_start_new_thread 完成创建子线程的工作. 为了清晰地理解创建子线程的工作. 我们可以跟踪到两个参数中, func实际是 t_bootstrap 函数, 而arg则是bootstate的结构体boot. 这个boot中, 保存这python程序所定义的线程信息.

提供下pthread.h中的函数:

  • pthread_t : 一个线程的标识符, typedef unsigned long int pthread_t;
  • pthread_attr_t : 线程属性, 可以通过该属性指定新线程的栈基址,栈大小,守护大小,调度策略和调度优先级等,如果使用默认值,取值可能为NULL 结构体为:
[pthread.h]
typedef struct __pthread_attr_s
{
    int __detachstate;  
    int __schedpolicy;
    struct __sched_param __schedparam;
    int __inheritsched;
    int __scope;
    size_t __guardsize;
    int __stackaddr_set;
    void *__stackaddr;
    size_t __stacksize;//表示堆栈的大小.
}pthread_attr_t;
  • pthread_attr_init(&attr): 初始化.初始化的值为:
__scope                 = PTHREAD_SCOPE_PROCESS
__tetachstate           = PTHREAD_CREATE_JOINABL
__stackaddr             = NULL
__stacksize             = 1M
__sched_param.priority  = 0 (使用创建线程的优先级)
__inheritsched          = PTHREAD_INHERIT_SCHED
__schedpolicy           = SCHED_OTHER
  • int pthread_attr_setstacksize(pthread_attr_t *tattr,int size); 设置堆栈大小 默认情况下线程保留1M的, 而且会在堆栈的顶增加一个空闲的内存页
  • int pthread_attr_setscope(pthread_attr_t *tattr,intscope); 设置cpu竞争模式: scope = PTHREAD_SCOPE_SYSTEM/PTHREAD_SCOPE_PROCESS PTHREAD_SCOPE_SYSTEM(绑定的)绑定状况下, 即某个线程固定的"绑"在一个轻进程之上.被绑定的线程具有较高的响应速度, 这是因为CPU时间片的调度是面向轻进程的, 绑定的线程可以保证在需要的时候它总有一个轻进程可用.通过设置被绑定的轻进程的优先级和调度级可以使得绑定的线程满足诸如实时反应之类的要求.

  • pthread_create : 创建一个线程, 原型为

    int pthread_create(pthread_t *restrict tidp,
                   const pthread_attr_t *restrict attr,
                   void *(*start_rtn)(void), 
                   void *restrict arg);

    第一个参数为指向线程标识符的指针 第二个参数用来设置线程属性 第三个参数是线程运行函数的起始地址 最后一个参数是运行函数的参数.

  • pthread_attr_destroy(&attr) 反初始化

可见, 主要是通过 pthread_create(&th, &attrs, (void* (*)(void *))func, (void *)arg) 来创建线程, 第三个参数运行的函数也就是 t_bootstrap 函数了:

[_threadmodule.c]
static void t_bootstrap(void *boot_raw)
{
    struct bootstate *boot = (struct bootstate *) boot_raw;
    PyThreadState *tstate;
    PyObject *res;

    tstate = boot->tstate;
    tstate->thread_id = PyThread_get_thread_ident();
    _PyThreadState_Init(tstate);
    PyEval_AcquireThread(tstate);
    nb_threads++;
    res = PyObject_Call(boot->func, boot->args, boot->keyw);
    ...
}

子线程从这里开始对GIL进行竞争. 在 t_bootstrap 中的 PyEval_AcquireThread(tstate) 就是申请获得GIL的控制权.接下来就是运行boot结构内的函数体.这里的 boot->func 就是用户自行定义的 def threadProc(): .

乍一看, 好像要等到boot->func函数运行完毕才回交还GIL的样子. 但事实并不是这样, 前面说了python虚拟机模拟的时钟中断. 会不断挂起和激活线程, 在子线程和主线程之间进行切换. 当然这部分的机制后文会提, 先来看看申请GIL的 PyEval_AcquireThread 有哪些动作:

[cveal.c]
void PyEval_AcquireThread(PyThreadState *tstate)
{
    if (tstate == NULL)
        Py_FatalError("PyEval_AcquireThread: NULL new thread state");
    // 确认GIL已经创建
    assert(gil_created());
    take_gil(tstate);   // 获得GIL
    // 与线程状态指针_PyThreadState_Current交换
    if (PyThreadState_Swap(tstate) != NULL)
        Py_FatalError("PyEval_AcquireThread: non-NULL old thread state");
}

PyEval_AcquireThread 中的take_gil(tstate)函数是等待GIL, 获得后再加锁的过程.

线程状态保护机制

要分析线程状态保护机制, 我们首先要回顾下线程的结构:

[pystate.h]
typedef struct _ts {
    struct _ts *prev;
    struct _ts *next;
    PyInterpreterState *interp;

    struct _frame *frame;
    ...

    int gilstate_counter;
    unsigned long thread_id; /* Thread id where this tstate was created */
    ...
} PyThreadState;

每一个线程对象都保存着当前的 PyFrameObject 对象, 线程id这样的信息. 因此应该要有这样的机制, 在A线程下获取的thread_id是A的. 在线程B获取的thread_id是B的. 因此在切换线程的时候要保存线程的上下文, 这里的上下文就是线程状态. 线程对象通过 next 可以形成一个链式结构, 在此称为 "状态对象链表" .

回顾下python初始化中 _Py_InitializeCore 函数在创建进程和线程后调用了 _PyGILState_Init:

[pylifecycle.c]
void _Py_InitializeCore(const _PyCoreConfig *config)
{
    interp = PyInterpreterState_New();
    tstate = PyThreadState_New(interp);
    ...
    _PyGILState_Init(interp, tstate);
    ...
}

_PyGILState_Init 里面会对线程状态进行存储, 用于后续的线程恢复:

[pystate.c]
static PyInterpreterState *autoInterpreterState = NULL;
static int autoTLSkey = -1;

void _PyGILState_Init(PyInterpreterState *i, PyThreadState *t)
{
    assert(i && t); /* must init with valid states */
    autoTLSkey = PyThread_create_key();
    if (autoTLSkey == -1)
        Py_FatalError("Could not allocate TLS entry");
    autoInterpreterState = i;
    assert(PyThread_get_key_value(autoTLSkey) == NULL);
    assert(t->gilstate_counter == 0);

    _PyGILState_NoteThreadState(t);
}

static void _PyGILState_NoteThreadState(PyThreadState* tstate)
{
    if (!autoInterpreterState)
        return;

    if (PyThread_get_key_value(autoTLSkey) == NULL) {
        if (PyThread_set_key_value(autoTLSkey, (void *)tstate) < 0)
            Py_FatalError("Couldn't create autoTLSkey mapping");
    }
    tstate->gilstate_counter = 1;
}

[thread_pthread.h]
int PyThread_create_key(void)
{
    pthread_key_t key;
    int fail = pthread_key_create(&key, NULL);
    if (fail)
        return -1;
    return (int)key;
}

PyThread_create_key 将创建一个key. 它是一个整数. 在多线程环境下, 由于数据空间是共享的, 因此全局变量也为所有线程所共有. 所有线程能都使用它, 修改它. 而如果希望每个线程能能单独拥有, 那么久要使用 线程存储 , 表面上看它是全局变量, 所有线程都能使用它, 但它的值是在每一个线程中单独存储的. 这样的数据结构可以由 Posix线程库 维护, 称为线程私有数据(Thread-specific Data, 或TSD). 具体做法是:

  1. 创建一个类型为 pthread_key_t 类型的变量.
  2. 调用 pthread_key_create() 来创建该变量.该函数有两个参数, 第一个参数就是上面声明的 pthread_key_t 变量, 第二个参数是一个清理函数, 用来在线程释放该线程存储的时候被调用.该函数指针可以设成NULL, 这样系统将调用默认的清理函数.
  3. 当线程中需要存储特殊值的时候, 可以调用 pthread_setspcific() .该函数有两个参数, 第一个为前面声明的pthread_key_t变量, 第二个为void*变量, 这样你可以存储任何类型的值.
  4. 如果需要取出所存储的值, 调用 pthread_getspecific() .该函数的参数为前面提到的 pthread_key_t 变量, 该函数返回 void * 类型的值.

不论是在哪个线程中 pthread_key_create() , 所创建的key都是所有线程可以访问的, 但各个线程可以根据自己的需要往key中填入不同的值. 就相当于提供了一个同名而不同值的全局变量.

在python中, 创建的key被python维护的全局变量 autoTLSkey 接收, TLS是 Thread Local Store 的缩写, 这个autoTLSkey 将用作python保存所有线程状态的对象的一个参数.

但是 autoTLSkey 并不能表示哪个线程对应哪个状态对象. 真正能代表状态对象的只有线程自己的id. 获得当前线程的id是通过 PyThread_get_thread_ident 函数来获取的:

[thread_pthread.h]
unsigned long PyThread_get_thread_ident(void)
{
    volatile pthread_t threadid;
    if (!initialized)
        PyThread_init_thread();
    threadid = pthread_self();
    return (unsigned long) threadid;
}

pthread_self() 是C语言的 pthread.h 库用来返回当前线程id. 因此通过已知的某个线程id可以通过 inter->tstate_head 链表遍历查找得到.

从GIL到字节码解释器

现在, 要回过头来看看python初始化中的创建线程的步骤:

[pystate.c]
PyThreadState * PyThreadState_New(PyInterpreterState *interp)
{
    return new_threadstate(interp, 1);
}

static PyThreadState * new_threadstate(PyInterpreterState *interp, int init)
{
    PyThreadState *tstate = (PyThreadState *)PyMem_RawMalloc(sizeof(PyThreadState));

    if (_PyThreadState_GetFrame == NULL)    // 设置获得线程中函数调用栈的操作
        _PyThreadState_GetFrame = threadstate_getframe;

    ...
    if (init)
        _PyThreadState_Init(tstate);
    ...
    return tstate;
}

void _PyThreadState_Init(PyThreadState *tstate)
{
#ifdef WITH_THREAD
    _PyGILState_NoteThreadState(tstate);
#endif
}

子线程在创建了自身的线程状态对象后, 通过 _PyGILState_NoteThreadState 将这个线程对象设置到 autoTLSkey 中.

有意思的是, 这时候当前活动的python线程不一定是获得GIL的线程. 这是因为主线程和子线程都是操作系统原生线程, 所以操作系统可能在主线程和子线程之间切换. 这里要区分的一点是, 系统级别的线程调度与GIL无关. 而python级的线程调度才与GIL有关, 前面也说了, python自己模拟一套线程的调度方式. 所以操作系统级的并不一定意味着GIL的得手, 当所有的线程都完成了初始化的动作之后, 操作系统的线程调度和python的线程调度才会同一. 那时, python的线程调度会强制当前活动线程释放GIL, 而这一操作会触发Event内核对象, 这个触发进而触发操作系统进行线程调度. 这个GIL其实是起到了从python级线程到操作系统级线程调度的桥梁作用.

真正争夺GIL的是在 t_bootstrap 中, 透过 PyEval_AcquireThread 获得GIL的话语权. 到了这一步, 子线程将自己挂起, 操作系统的线程调度不能唤起它, 只有等待python的线程调度强制主线程放弃GIL后, 子线程才回被唤醒. 而子线程被唤醒后, 主线程又将陷入等待中.

当子线程被python的线程调度唤醒后, 做的第一件事就是通过 PyThreadState_Swap(tstate) 设置python维护当前线程状态对象, 如操作系统的进程上下文环境恢复一样.

此时, 线程的初始化还未真正完成, 因为子线程还没进入字节码解释器. t_bootstrap 调用 PyObject_Call 最终 PyEval_EvalFrameEx 进入解释器, 这时候, 子线程和主线程一样, 就完全交给python线程调度机制控制了.

下一篇将会讲述, python解释器的线程调度机制.


本文由 hongweipeng 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

赏个馒头吧