在 PHP 中使用协程进行协同多任务处理

2020/06/05 PHP

未完待续

翻译自Cooperative multitasking using coroutines (in PHP!)

PHP 5.5 最大新特性之一就是支持生成器和协程。(注:官方)文档和大量的博客文章已经对生成器进行了充分的说明(像这篇以及这篇)。相反,协程受到的关注则相对较少。这是因为协程更加强大以及更难理解和解释。

我希望本文可以通过使用协程实现一个任务调度器来引导你,使你感受到它的作用。我会以一些介绍性的章节开始。如果你感觉自己已经很好地掌握了生成器和协程背后的基础,可以直接跳到”Cooperative multitasking(协同多任务)”一节。

生成器

生成器背后的基本思想是,函数不返回一个单独的值,而是返回一系列值,其中每个值都是逐个发出的。或者说,生成器使得你更容易实现迭代器。以下是一个简单的xrange()函数示例:

<?php
function xrange($start, $end, $step = 1) {
    for ($i = $start; $i <= $end; $i += $step) {
        yield $i; 
    }
}

foreach (xrange(1, 1000000) as $sum) {
    echo $num, "\n";
}

以上的xrange()函数跟内置的range()函数提供了同样的功能。唯一的不同就是在上例中,range()会返回一个有 1000000 个成员的数组,而xrange()则返回一个可以发出这些数量的迭代器,但实际上从来不会计算出数组的所有值。

这种实现的优点是明显的。它允许你在处理大型数据集时不需要将数据一次性加载到内存中。你甚至可以处理无限的数据流。

通过手动实现迭代器接口,所有这些都可以在不使用生成器的情况下完成。生成器只是使它更加方便,因为你不需要再为每个迭代器实现五种不同的方法。

作为中断函数的生成器

要从生成器转到协程,理解它们的内部工作原理很重要:生成器就是可中断函数,其中 yield 语句构成中断点。

按照上例,如果调用xrange(1, 1000000)xrange()函数里的代码实际上不会运行。相反,PHP 只返回实现迭代器接口的生成器类的实例:

<?php
$range = xrange(1, 1000000);
var_dump($range); // object(Generator)#1
var_dump($range instanceof Iterator); // bool(true);

只有在你调用实例的迭代器方法时,代码才会运行。例如,如果你调用$range->rewind()xrange()函数中的代码会一直运行到控制流中的第一个yield出现。在该例中,这意味着$i = $start,然后yield $i会运行。可以使用$range->current()获取传递给yield语句的任何内容。

如果要继续运行生成器中的代码,你需要调用$range->next()方法。这会恢复迭代器,直至遇到yield语句。因此,只要连续地使用->next()->current()调用,你就能获取生成器的所有值,直到不再遇到yield。对于xrange()来说,一旦$i超过$end。在这种情况下,控制流会到达函数尾部,这样就没有可运行的代码了。一旦发生这种情况,->valid()方法会返回false,这说明迭代结束。

协程

协程为上述功能增加的主要功能是将值发送回生成器的能力。这将生成器和调用者之间的单向通信转换为两者之间的双向通道。

通过调用->send()方法而不是->next(),可以将值传到协程中。以下的logger()协程就是如此工作的示例:

<?php
function logger($fileName) {
    $fileHandle = fopen($fileName, 'a');
    while (true) {
        fwrite($fileHandle, yield. "\n");
    }
}

$logger = logger(__DIR__. '/log');
$logger->send('Foo');
$logger->send('Bar');

如你所见,这里的yield并没有作为语句,而是作为表达式来使用,也就是说,它有一个返回值。yield的返回值是通过->send()传递的所有数据。在该例中,yield先返回'Foo',再返回'Bar'

上例中的yield仅仅作为接收者。可以同时使用两种用法,也就是说,既发送也接收。以例演示了这如何工作:

<?php
function gen() {
    $ret = (yield 'yield1');
    var_dump($ret);
    $ret = (yield 'yield2');
    var_dump($ret);
}

$gen = gen();
var_dump($gen->current()); // string(6) "yield1"
var_dump($gen->send('ret1')); // string(4) "ret1" (gen 中的第一个 var_dump)
                              // string(6) "yeild2" (->send() 返回值的 var_dump)
var_dump($gen->send('ret2')); // string(4) "ret2" (继续是 gen 内的)
                              // NULL (->send() 的返回值)

一开始,输出的顺序可能有点难以理解,所以一定要弄清楚为什么会以这种方式输出。我想特别指向两点:第一,yield表达式周围使用括号并不是偶然的。出于技术原因,这些括号是必须的(尽管我一直在考虑为赋值添加一个异常,就像在 Python 中一样)。其次,你可能已经注意到->current()是在不首先调用->rewind()的情况下使用的。如果完成该操作,则隐式地执行 rewind 操作。

协同多任务

如果在读以上的logger()示例时你有想”为什么我要在这时用协程?为什么我不能只使用普通的类?”,这就对了。这个示例说明了基本的用法,但实际上在这种环境下使用协程没有任何优点。这是大部分协程示例的情况。之前已经介绍过协程是非常强大的概念,但它们的应用是非常罕见的,并且还非常复杂,因此很难使用简单的示例来说明。

我在本文中将要做的就是使用协程实现协同多任务。我们尝试解决的难题是想要并发运行多任务(或程序)。但一个处理器一次只能运行一个任务(在这里不要考虑多核了)。因此处理器需要在不同的任务之间切换,让每个任务都运行”一小会儿”。

术语”协同”部分说明了切换是如何完成的:要求当前运行的任务自愿将控制权让出给调度器,这时就可以运行另一个任务。这跟”抢占式”多任务不同,调度器可以在一定时间后中断任务,不管它喜欢与否。协同多作务在早期的 Windows(Win 95 之前的版本)和 MacOS 中就使用了,但是后来换成抢占式调度了。理由应该明显很公平地:如果你依赖程序自愿让出控制权,行为恶劣的软件可以轻易地独占 CPU,不会跟其它任务共享。

这时候,你应该可以看到协和和任务调度之间的联系了:yield指令提供了任务中断自身并让出控制权的方法,以便调度器可以运行其它任务。此外,yield还可以用于任务和调度器间的通信。

就我们的目的而言,”任务”将是协程函数的轻量封装:

<?php
class Task {
    protected $taskId;
    protected $coroutine;
    protected $sendValue = null;
    protected $beforeFirstYield = true;

    public function __construct($taskId, Generator $coroutine) {
        $this->taskId = $taskId;
        $this->coroutine = $coroutine;
    }

    public function getTaskId() {
        return $this->taskId;
    }

    public function setSendValue($sendValue) {
        $this->sendValue = $sendValue;
    }

    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }

    public function isFinished() {
        return !$this->coroutine->valid();
    }
}

一个任务就是一个被 task ID 标记的协程。通过setSendValue()方法,你指定下一次恢复时将向其中发送哪个值(稍后你将看到我们需要什么)。run()函数其实只不过是在协程上调用了send()方法。要了解为什么需要额外的beforeFirstYield标记,请思考以下片段:

<?php
function gen() {
    yield 'foo';
    yield 'bar';
}

$gen = gen();
var_dump($gen->send('something'));

// As the send() happens before the first yield there is an implicit rewind() call,
// so what really happens is this:
// 由于 send() 发生在第一个 yield 之前,所以有一个隐式的 rewind() 调用,所以真正发生的是:
// $gen->rewind();
// var_dump($gen->send('something'));

// The rewind() will advance to the first yield (and ignore its value), the send() will
// advance to the second yield (and dump its value). Thus we loose the first yielded value!
// rewind() 将前进到第一个 yield(并忽略其值),send() 将前进到第二个 yield(并存储其值)。这样我们就失去了第一个产生的值!

通过添加额外的beforeFirstYield条件,我们可以确保第一个yield的值也能返回。

<?php
class Scheduler {
    protected $maxTaskId = 0;
    protected $taskMap = [];
    protected $taskQueue;

    public function __construct() {
        $this->taskQueue = new SplQueue();
    }

    public function newTask(Generator $coroutine) {
        $tid= ++$this->maxTaskId;
        $task = new Task($tid, $coroutine);
        $this->taskMap[$tid] = $task;
        $this->schedule($task);
        return $tid;
    }

    public function schedule(Task $task) {
        $this->taskQueue->enqueue($task);
    }

    public function run() {
        while (!$this->taskQueue->isEmpty()) {
            $task = $this->taskQueue->dequeue();
            $task->run();

            if ($task->isFinished()) {
                unset($this->taskMap[$task->getTaskId()]);
            } else {
                $this->schedule($task);
            }
        }
    }
}

newTask()方法创建新任务(使用下一个可用的任务 id),然后将其放进 task 映射中。此外,它将任务放到任务队列中,使得调度器可以调度任务。run()方法会查找任务队列并运行任务。如果任务完成了,则删除,否则将其放入队列尾部重新调度。

下面尝试用调度器处理两个简单的(以及非常没有意义的)任务:

<?php
function task1() {
    for ($i = 1; $i <= 10; $i ++) {
        echo "This is task 1 iteration $i.\n";
        yield;
    }
}

function task2() {
    for ($i = 1; $i <= 5; $i ++) {
        echo "This is task 2 iteration $i.\n";
        yield;
    }
}

$scheculer = new Scheduler;
$scheculer->newTask(task1());
$scheculer->newTask(task2());
$scheculer->run();

两个任务都只是echo一个消息,接着用yield将控制权让给调度器。以下是输出结果:

This is task 1 iteration 1.
This is task 2 iteration 1.
This is task 1 iteration 2.
This is task 2 iteration 2.
This is task 1 iteration 3.
This is task 2 iteration 3.
This is task 1 iteration 4.
This is task 2 iteration 4.
This is task 1 iteration 5.
This is task 2 iteration 5.
This is task 1 iteration 6.
This is task 1 iteration 7.
This is task 1 iteration 8.
This is task 1 iteration 9.
This is task 1 iteration 10.

输出与预期完全一致:前五次迭代中,任务交替执行,然后第二个任务完成,只剩第一个任务继续运行。

与调度器通信

现在调度器可以工作了,我们可以转到议程的下一点:任务和调度器之间的通信。我们将使用跟进程用来和操作系统通信的相同方法:通过系统调用。我们需要系统调用的原因是操作系统跟进程的权限级别不一样。因此,为了执行特权操作(例如 kill 另一个进程),必须有某种方法将控制权传递回内核,这样它才能执行上述操作。在内部,这再次使用到中断指令来实现。以前我们使用的是能用的整型指令,现在有更专门和更快的 syscall/sysenter 指令。

我们的任务调度系统将反映这种设计:我们将通过yield表达式传递的系统调用进行通信,而不是简单地将调度器传递到任务中(从而允许它做它想做的任何事情)。这里的yield既可以作为中断,也可以作为向调度程序传递(或从调度程序传递)信息的方式。

为了表示一个系统调用,我将在一个可调用函数外面使用一个封装器:

<?php
class SystemCall {
    protected $callback;

    public function __construct(callable $callback) {
        $this->callback = $callback;
    }

    public function __invoke(Task $task, Scheduler $scheduler) {
        $callback = $this->callback;
        return $callback($task, $scheduler);
    }
}

它的行为就跟任何可调用类型一样(使用__invoke()

// TODO: 未完待续

Search

    Table of Contents