首页 小组 问答 话题 好文 素材 用户 唠叨 我的社区

[分享]深度剖析 Swoole6 PHP 多线程实现原理

zuiuiLv.1普通用户
2024-10-15 07:45:09
0
9

PHP作为一种常用的服务器端脚本语言,被广泛用于Web开发。然后相比JavaC++Golang等编程语言,PHP语言缺少多线程的支持,只能使用fork创建多个进程来实现并行处理。
由于进程之间并不共享内存堆栈和文件句柄,PHP只能借助RedisAPCu等内存数据库或共享内存来实现进程间的数据共享,编程的局限性较大。

所幸的是Swoole6增加了对多线程的支持,为PHP提供了一个稳定可靠的多线程支持。现在PHP也可以创建多线程,更加高效地编写并发程序。

本文将深入介绍PHPZTS机制和Swoole6多线程的实现原理,帮助PHP开发者彻底理解掌握Swoole6多线程的使用。

什么是线程

介绍多线程之前要介绍进程,进程是操作系统中资源分配的基本单位,是一个程序在执行过程中的一个实例。每个进程都有自己的独立地址空间、数据栈和其他用于跟踪的辅助数据。
通常进程只有一个执行单元,程序指令按照顺序执行。这样程序只能利用到CPU的一个核心,为了利用多核的计算能力,Linux系统支持了线程。进程内可以同时创建多个执行单元,
程序指令可以并行地在多个CPU核心上执行。

相比进程,线程比进程更轻量。多个线程共享同一进程的资源,因此创建和销毁线程的开销相对较小。同一进程内的线程可以直接访问进程所拥有的资源,如内存、全局变量等。
由于共享内存空间,线程间的通信比进程间的通信更简单,只需使用共享变量。

进程与线程的对比

特性

进程

线程

定义

程序执行的实例,独立的资源单位

进程内的执行单元,轻量级

资源分配

拥有独立的资源

共享同一进程的资源

创建开销

高,管理和分配资源

低,切换和创建速度快

通信方式

复杂,使用IPC机制

简单,通过共享内存

稳定性

高,一个进程崩溃不会影响其他进程

低,一个线程崩溃可能导致进程崩溃

应用场景

适合无状态的任务,如Web服务、数据处理

适合高并发和更复杂的任务

如何创建线程

在 Linux C++ 中,可以使用多种方式来创建线程。最常用的方法是使用POSIX 线程pthread)库。以下是通过 pthread 库创建线程的基本示例。

#include <pthread.h>
#include <iostream>

void* threadFunction(void* arg) {
    int* num = static_cast<int*>(arg);
    std::cout << "Hello from thread! Number: " << *num << std::endl;
    return nullptr;
}

int main() {
    pthread_t thread;
    int num = 42;
    
    // 创建线程
    if (pthread_create(&thread, nullptr, threadFunction, &num) != 0) {
        std::cerr << "Error creating thread" << std::endl;
        return 1;
    }

    // 等待线程结束
    pthread_join(thread, nullptr);

    std::cout << "Main thread ending." << std::endl;
    return 0;
}

可以使用 g++ 编译器编译此代码,然后执行就会创建多个线程并行地处理任务。

g++ -o test test.cpp -lpthread
./test

PHP ZTS

早期的PHP仅支持Apache服务器,作为Apacheprefork模块来运行,不支持WindowsIISApache (worker threads)服务器。为了解决此问题,PHP加入了ZTS的支持,也就是TSRM模块,可以在php-src/TSRM目录下找到相应的代码。

Python GIL的实现不同,PHP ZTS没有使用全局锁保护全局资源,而是一种thread_local的模式,将全局资源变成了线程局部资源。

Python语言虽然提供了threading模块,实际是伪多线程,Python代码并不能并行执行,仅在发生阻塞IO时,让出了控制权,利用IO等待的间隙,运行其他Python线程。而PHP ZTS多线程模式(例如:IIS+PHP)下,PHP 程序是并行执行的,但并不能读取到当前线程以外的资源。

PHP 底层的全局变量

在 PHP 的 Zend 引擎中,有一些全局变量和结构体用于存储运行时的状态和相关信息。以下是一些常见的全局变量,如 EGPG 和 CG

AG ZendVM 内存管理器

AG 保存了内存管理器相关的资源,主要的结构是:zend_mm_heap *mm_heapPHP 所有变量的内存分配全部由zend_mm_alloc_heap(AG(mm_heap), size, ...)函数所实现。

GC_G ZendVM 垃圾回收器

GC_G是垃圾回收器对象,通过引用计数和循环引用垃圾回收算法进行PHP变量的生命周期管理和资源回收。

EG (Executor Globals)

EG 是一个指向 executor_globals 结构的指针,包含了执行器的全局状态信息,包括当前执行的上下文、错误处理、安全上下文等。

主要字段:

  • current_execute_data: 指向当前正在执行的函数调用的执行数据结构

  • active_symbol_table: 当前活动的符号表,用于存储变量及其值

  • HashTable *function_table:函数表

  • HashTable *class_table:类表

  • zend_object *exception:运行时的异常

  • zend_vm_stack vm_stack:运行的函数调用栈

PG (Persistent Globals)

PG 是一个指向 persistent_globals 结构的指针,包含了持久化(跨请求)全局状态信息,主要用于存储在请求之间保持不变的数据。

主要字段:

  • auto_prepend_file: 自动包含在脚本执行前的文件

  • auto_append_file: 自动包含在脚本执行后的文件

  • display_errors: 控制是否显示错误的配置选项

CG (Compiler Globals)

CG 是一个指向 compiler_globals 结构的指针,包含了与编译相关的全局状态和信息,在 PHP 代码的编译阶段使用。

主要字段:

  • compiler_options: 编译选项的配置

  • active_symbol_table: 当前编译阶段的活动符号表

  • open_files:当前打开的文件列表

SG (SAPI Globals)

SG 是一个用于存储与当前脚本执行相关的全局变量的结构。它主要用于管理与当前请求或执行上下文相关的信息。

主要字段:

  • request_info:包含与当前请求相关的信息,例如请求的 URI 和方法等。

  • sapi_headers:当前HTTP Headers

  • rfc1867_uploaded_files:当前上传的文件列表

其他扩展的全局变量

除了ZendVM之外,加载的每个扩展可能都使用全局变量保存了数据,例如:

  • BCGbcmath

  • MYSQLND_Gmysqlnd

php-src中使用ZEND_BEGIN_MODULE_GLOBALS定义全局变量。

ZEND_BEGIN_MODULE_GLOBALS(gmp)
    bool rand_initialized;
    gmp_randstate_t rand_state;
ZEND_END_MODULE_GLOBALS(gmp)

TSRM 介绍

TSRMThread Safe Resource Management)是 PHP 中的一种机制,旨在为多线程环境提供资源管理的线程安全支持。它允许多个线程安全地访问和操作共享资源,确保在并发执行时不会发生数据竞争或状态不一致的问题。

TSRM由编译参数控制,因此是否开启ZTS决定于php-src编译时的选项。增加--eanble-zts就可以开启ZTS

NTS

AG为例,在NTSAG(mm_heap)宏展开后是:alloc_globals.mm_heap,实际定义是

static zend_alloc_globals alloc_globals;

即进程全局变量,此全局变量保存了所有内存分配器的资源。

ZTS

ZTS下宏展开后实际的符号是:

(((zend_alloc_globals *) (((char*) tsrm_get_ls_cache())+(alloc_globals_offset)))->mm_heap)

tsrm_get_ls_cache()函数就是获取一个Thread Local变量,在Linux系统下使用了pthread_getspecific()实现。

pthread_getspecific 是 POSIX 线程库中的一个函数,用于在多线程程序中访问与特定线程相关的线程局部存储(Thread Local StorageTLS)数据。该函数允许线程获取已存储的特定数据指针,这些指针是在先前通过 pthread_setspecific 存储的。

另外一个关键的函数是ts_resource_ex(),在线程创建时分配内存,调用pthread_setspecific设置为TLS指针。

/* fetches the requested resource for the current thread */
TSRM_API void *ts_resource_ex(ts_rsrc_id id, THREAD_T *th_id) {
    ...
    if (!thread_resources) {
        allocate_new_resource(&tsrm_tls_table[hash_value], thread_id);
        tsrm_mutex_unlock(tsmm_mutex);
        return ts_resource_ex(id, &thread_id);
    } 
}

总结

这些全局资源和相关的逻辑了ZendVM,在ZTS模式下,底层的全局变量被编译为了TLS线程局部变量。这就相当于每个线程都有一个独立的ZendVM环境,彼此是隔离的。因此ZTS模式下,即便在同一个线程内,实际上程序中创建的全局变量或资源,例如:$_GET/$_POST/$_FILES或其他使用global $vars,以及include $file等均为TLS资源,只能在当前线程内使用。

这相当于是PHP层面,线程变成了进程,但在底层视角(C/C++)仍然是共享堆栈的线程环境。

Swoole6 线程

由于Swoole使用了C++11,因此可以直接使用C++标准的多线程支持,而不是直接使用pthread底层库。

创建线程

static PHP_METHOD(swoole_thread, __construct) {
    char *script_file;
    size_t l_script_file;
    zval *args;
    int argc;
    ZendArray *argv = nullptr;

    ZEND_PARSE_PARAMETERS_START(1, -1)
    Z_PARAM_STRING(script_file, l_script_file)
    Z_PARAM_VARIADIC('+', args, argc)
    ZEND_PARSE_PARAMETERS_END();

    if (l_script_file < 1) {
        zend_throw_exception(swoole_exception_ce, "exec file name is empty", SW_ERROR_INVALID_PARAMS);
        return;
    }

    ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
    zend_string *file = zend_string_init(script_file, l_script_file, 1);

    if (argc > 0) {
        argv = new ZendArray();
        for (int i = 0; i < argc; i++) {
            argv->append(&args[i]);
        }
    }

    try {
        to->thread = new std::thread([file, argv]() { php_swoole_thread_start(file, argv); });
    } catch (const std::exception &e) {
        zend_throw_exception(swoole_exception_ce, e.what(), SW_ERROR_SYSTEM_CALL_FAIL);
        return;
    }
    zend_update_property_long(
        swoole_thread_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("id"), (zend_long) to->thread->native_handle());
}

底层直接使用了C++std::thread创建线程,子线程会执行php_swoole_thread_start()函数初始化子线程。

构造方法接受2个参数,第一个是子线程要执行的脚本文件,第二个是线程参数数组。

线程初始化

void php_swoole_thread_start(zend_string *file, ZendArray *argv) {
    thread_num.fetch_add(1);
    ts_resource(0);
#if defined(COMPILE_DL_SWOOLE) && defined(ZTS)
    ZEND_TSRMLS_CACHE_UPDATE();
#endif
    zend_file_handle file_handle{};
    zval global_argc, global_argv;

    PG(expose_php) = 0;
    PG(auto_globals_jit) = 1;
#if PHP_VERSION_ID >= 80100
    PG(enable_dl) = false;
#else
    PG(enable_dl) = 0;
#endif

    swoole_thread_init();

    if (php_request_startup() != SUCCESS) {
        EG(exit_status) = 1;
        goto _startup_error;
    }

    PG(during_request_startup) = 0;
    SG(sapi_started) = 0;
    SG(headers_sent) = 1;
    SG(request_info).no_headers = 1;
    SG(request_info).path_translated = request_info.path_translated;
    SG(request_info).argc = request_info.argc;

    zend_stream_init_filename(&file_handle, ZSTR_VAL(file));
    file_handle.primary_script = 1;

    zend_first_try {
        thread_bailout = EG(bailout);
        if (request_info.argv_serialized) {
            php_swoole_unserialize(request_info.argv_serialized, &global_argv);
            ZVAL_LONG(&global_argc, request_info.argc);
            zend_hash_update(&EG(symbol_table), ZSTR_KNOWN(ZEND_STR_ARGV), &global_argv);
            zend_hash_update(&EG(symbol_table), ZSTR_KNOWN(ZEND_STR_ARGC), &global_argc);
        }
        if (argv) {
            argv->toArray(&thread_argv);
            argv->del_ref();
        }
        php_swoole_thread_register_stdio_file_handles(true);
        php_execute_script(&file_handle);
    }
    zend_end_try();

    zend_destroy_file_handle(&file_handle);

    php_request_shutdown(NULL);
    file_handle.filename = NULL;

_startup_error:
    zend_string_release(file);
    ts_free_thread();
    swoole_thread_clean();
    thread_num.fetch_sub(1);
}

关键的几个流程:

  • ts_resource 使用 TSRM API 分配了 TLS 资源

  • php_request_startup 在子线程内执行 RINIT ,这会调用所有扩展的RINIT函数

  • php_execute_script 在子线程内执行PHP脚本

  • php_request_shutdown 执行RSHUTDOWN函数

  • ts_free_thread 使用 TSRM API 释放 TLS 资源

线程结束后,会调用std::threadjoin()方法回收线程。

线程创建的线程就可以并行地执行了,但每个线程彼此是完全隔离的,这和多进程并没有区别。接下来就需要实现线程资源的共享。

ThreadResource

Swoole底层封装了ThreadResource来管理跨线程的共享资源。这个类使用引用计数来管理内存。底层使用了atomic来增加、减少引用计数,因此不需要加锁。当没有任何线程持有此资源时就会执行delete释放对象。

class ThreadResource {
    sw_atomic_t ref_count;

  public:
    ThreadResource() {
        ref_count = 1;
    }

    void add_ref() {
        sw_atomic_add_fetch(&ref_count, 1);
    }

    void del_ref() {
        if (sw_atomic_sub_fetch(&ref_count, 1) == 0) {
            delete this;
        }
    }

  protected:
    virtual ~ThreadResource() {}
};

包括以下对象,均继承了ThreadResource

  • Swoole\Thread\Atomic

  • Swoole\Thread\Lock

  • Swoole\Thread\ArrayList

  • Swoole\Thread\Map

  • Swoole\Thread\Queue

  • Swoole\Thread\Barrier

这些对象可以安全地在线程之间传递。

ZendArray

ArrayListMap使用了ZendVM提供的zend_array(persistent)来实现,因此内存是直接由glibcmalloc/free管理。对于数组的操作底层使用了RWLock来防止竞争。

class ZendArray : public ThreadResource {
 protected:
    swoole::RWLock lock_;
    zend_array ht;

    static void item_dtor(zval *pDest) {
        ArrayItem *item = (ArrayItem *) Z_PTR_P(pDest);
        delete item;
    }

 public:
    ZendArray() : ThreadResource(), lock_(0) {
        zend_hash_init(&ht, 0, NULL, item_dtor, 1);
    }

    ~ZendArray() override {
        zend_hash_destroy(&ht);
    }

    ...

    void strkey_offsetGet(zval *zkey, zval *return_value) {
        zend::String skey(zkey);
        lock_.lock_rd();
        ArrayItem *item = (ArrayItem *) zend_hash_find_ptr(&ht, skey.get());
        if (item) {
            item->fetch(return_value);
        }
        lock_.unlock();
    }

    void strkey_offsetSet(zval *zkey, zval *zvalue) {
        zend::String skey(zkey);
        auto item = new ArrayItem(zvalue);
        item->setKey(skey);
        lock_.lock();
        zend_hash_update_ptr(&ht, item->key, item);
        lock_.unlock();
    }

    ...
}
  • 读操作使用lock_rd()共享锁,因此$map['key']这样的操作,多线程并行执行时不会出现竞争

  • 写操作使用lock()独占锁,若多线程向同一个$map写入时会出现竞争

ArrayItem

所有写入线程数据容器的元素,均使用此类操作。

  • 数值:例如intfloatnullbool,直接复制其值

  • 字符串:需要完全复制字符串的内存

  • PHP对象:需要序列化后,作为字符串存储,读取时再进行反序列化

  • 资源:例如php socketphp streamswoole co_socket需要进行dup(fd)对文件描述符增加一次引用计数,读取时再增加一次引用计数

  • 线程资源:调用ThreadResource::add_ref()增加引用计数,删除时减少引用计数

  • 数组:转为ArrayListMap对象

数据容器是支持嵌套结构的,例如Map中可以写入ArrayListArrayList中可以再添加一个Queue

线程参数

线程参数本身是一个ArrayList对象,通过引用计数管理,在不同的线程之间传递。

Queue

Queue使用了C++std::queue实现,它不仅是一个数据容器,还内置了线程条件变量(std::condition_variable),队列的消费者在队列为空时等待条件变量,生产者push()写入数据时可以唤醒队列的消费者。

struct Queue : ThreadResource {
    std::queue<ArrayItem *> queue;
    std::mutex lock_;
    std::condition_variable cv_;
}

等待

void pop_wait(zval *return_value, double timeout) {
    ArrayItem *item = nullptr;
    std::unique_lock<std::mutex> _lock(lock_);
    SW_LOOP {
        if (!queue.empty()) {
            item = queue.front();
            queue.pop();
            break;
        } else {
            if (timeout > 0) {
                if (cv_.wait_for(_lock, std::chrono::duration<double>(timeout)) == std::cv_status::timeout) {
                    break;
                }
            } else {
                cv_.wait(_lock);
            }
            // All threads have been awakened,
            // but the data has already been acquired by other thread, returning NULL.
            if (queue.empty()) {
                RETVAL_NULL();
                swoole_set_last_error(SW_ERROR_NO_PAYLOAD);
                break;
            }
        }
    }
    _lock.unlock();
    if (item) {
        item->fetch(return_value);
        delete item;
    }
}

这里有一个细节是队列弹出的元素转为PHP变量时,实在锁的同步区域之外,原因是pop之后仅当前的线程持有此元素,可以安全地进行操作,所以不需要加锁。

通知

void push_notify(zval *zvalue, bool notify_all) {
    auto item = new ArrayItem(zvalue);
    std::unique_lock<std::mutex> _lock(lock_);
    queue.push(item);
    if (notify_all) {
        cv_.notify_all();
    } else {
        cv_.notify_one();
    }
}

调用了条件变量的notify_one()/notify_all()方法唤醒处于等待状态的消费者线程。

其他实现细节

1. 线程中的协程调度器

在线程中可以创建协程调度器,底层实现直接使用了C++thread_local关键词来隔离全局变量。每个线程的协程和异步IO环境是隔离的。包括:

  • EventLoop

  • Coroutine Scheduler

  • Timer

  • Async Threads

  • Logger

相比ZendVMTLS要简单很多,可读性更高。

#ifdef SW_THREAD
#define SW_THREAD_LOCAL thread_local
extern std::mutex sw_thread_lock;
#else
#define SW_THREAD_LOCAL
#endif

SW_THREAD_LOCAL bool PHPCoroutine::activated = false;
SW_THREAD_LOCAL zend_array *PHPCoroutine::options = nullptr;

2. Server 的多线程模式

多线程模式下将Worker进程、Task进程、UserWorker进程全部修改为线程的方式运行。由于线程模式下,无法复制线程的资源,需要在线程创建之后,重新创建一次。

工作线程是将同样的代码,再次执行一遍。例如 new Server 和 Server::on(),但worker线程不允许执行 Server::set() 方法。在 Server::start() 方法中,工作进程将进入 worker_thread_fn() 执行单元,而主线程则是创建线程,以及管理子线程,负责退出线程的重启和回收,以及shutdown

static PHP_METHOD(swoole_server, start) {
    zval *zserv = ZEND_THIS;
    Server *serv = php_swoole_server_get_and_check_server(zserv);

#ifdef SW_THREAD
    if (serv->is_worker_thread()) {
        worker_thread_fn();
        RETURN_TRUE;
    }
#endif

    if (serv->is_started()) {
        php_swoole_fatal_error(
            E_WARNING, "server is running, unable to execute %s->start()", SW_Z_OBJCE_NAME_VAL_P(zserv));
        RETURN_FALSE;
    }

    ...
}

3. AIO 线程池

AIO线程池是共享的,它是一个多对多的队列MMCQ (Many To Many Concurrent Queue),避免创建过多AIO线程。

async_thread_lock.lock();
if (!async_thread_pool) {
    async_thread_pool = std::make_shared<async::ThreadPool>(
        SwooleG.aio_core_worker_num, SwooleG.aio_worker_num, SwooleG.aio_max_wait_time, SwooleG.aio_max_idle_time);
}
if (!async_thread_pool->is_running()) {
    async_thread_pool->start();
}
pool = async_thread_pool;
async_thread_lock.unlock();

需要为每个PHP线程创建一个独立的管道来获取AIO线程池的通知。

class AsyncThreads {
  public:
    size_t task_num = 0;
    Pipe *pipe = nullptr;
    std::shared_ptr<async::ThreadPool> pool;
    network::Socket *read_socket = nullptr;
    network::Socket *write_socket = nullptr;
}

结语

Swoole v6PHP提供了一个稳定可靠的多线程方案。Swoole的核心仍然是协程,多线程的支持只是为了补齐了Swoole的最后一块短板,相比APCuRedis,多线程在数据和资源共享有巨大的优势。

除了当前提供的数据容器之外,未来Swoole会持续增加更多高性能的多线程C++组件,不断增强多线程支持。

zuiui
zuiui

7 天前

签名 :   9       0
评论
站长交流