Laravel Pipeline管道设计


Laravel Pipeline管道设计


引言

> ps -ef | grep php

这个 Linux 命令就是一个管道命令。前面一条命令的结果交给后面一条命令来执行,就像一条管道一样让这个命令请求的结果向下流动,这就是管道模式的应用。在 PHP 设计模式中,责任链模式 很明显就是管道模式在 面向对象 语言中的应用。

Pipeline管道模式,也有人叫它装饰模式。应该说管道是装饰模式的一个变种,虽然思想都是一样的,但这个是闭包的版本,实现方式与传统装饰模式也不太一样。在laravel的源码中算是一个比较核心的设计模式了。

管道模式,或者说装饰模式的思想,就是在不改变原有程序的基础上,可以方便的在已有程序上添加新的功能。

管道模式一般是和过滤器一起使用的,什么是过滤器呢?其实就是我们要处理请求的那些中间方法,比如说上面命令中的 grep ,或者是 wc 、awk 这些的命令。大家其实很快就能发现,在 Laravel 框架中,我们的中间件就是一个个的过滤器。而我们要处理的数据,就是那个 Request 请求对象。

在说管道模式之前,让我们看一下 array_reduce() 这个函数 https://www.php.net/manual/en/function.array-reduce.php

array_reduce — 用回调函数迭代地将数组简化为单一的值

说明:

array_reduce(array $array, callable $callback, mixed $initial = null): mixed

array_reduce() 将回调函数 callback 迭代地作用到 array 数组中的每一个单元中,从而将数组简化为单一的值。

参数:

array
    输入的 array。

callback
    callback(mixed $carry, mixed $item): mixed

    carry
        携带上次迭代的返回值; 如果本次迭代是第一次,那么这个值是 initial。
    item
        携带了本次迭代的值。

initial
    如果指定了可选参数 initial,该参数将用作处理开始时的初始值,如果数组为空,则会作为最终结果返回。

返回值:

返回结果值。

如果数组为空,并且没有指定 initial 参数,array_reduce() 返回 null。

基本工作原理:

function array_reduce(array $array, callable $callback, $initial = null) {
    $result = $initial;

    foreach ($array as $value) {
        $result = $callback($result, $value);
    }

    return $result;
}

看一下下面的示例:

<?php
function sum($carry, $item)
{
    $carry += $item;
    return $carry;
}

function product($carry, $item)
{
    $carry *= $item;
    return $carry;
}

$a = array(1, 2, 3, 4, 5);
$x = array();

var_dump(array_reduce($a, "sum")); // int(15), 因为:1+2+3+4+5
var_dump(array_reduce($a, "product", 10)); // int(1200), 因为:10*1*2*3*4*5
var_dump(array_reduce($x, "sum", "No data to reduce")); // string(17) "No data to reduce"

追踪过程再看一下

<?php
function sum($carry, $item)
{
    var_dump('$carry-  ' . $carry);
    var_dump('$item-  ' . $item);
    $carry += $item;
    var_dump('$carry2  ' . $carry);
    return $carry;
}

$a = array(1, 2, 3, 4, 5);


var_dump(array_reduce($a, "sum")); 

string(9) "$carry-  "
string(9) "$item-  1"
string(10) "$carry2  1"
string(10) "$carry-  1"
string(9) "$item-  2"
string(10) "$carry2  3"
string(10) "$carry-  3"
string(9) "$item-  3"
string(10) "$carry2  6"
string(10) "$carry-  6"
string(9) "$item-  4"
string(11) "$carry2  10"
string(11) "$carry-  10"
string(9) "$item-  5"
string(11) "$carry2  15"
int(15)


function product($carry, $item)
{
    var_dump('$carry-  ' . $carry);
    var_dump('$item-  ' . $item);
    $carry *= $item;
    var_dump('$carry2  ' . $carry);
    return $carry;
}

var_dump(array_reduce($a, "product", 10)); 

string(11) "$carry-  10"
string(9) "$item-  1"
string(11) "$carry2  10"
string(11) "$carry-  10"
string(9) "$item-  2"
string(11) "$carry2  20"
string(11) "$carry-  20"
string(9) "$item-  3"
string(11) "$carry2  60"
string(11) "$carry-  60"
string(9) "$item-  4"
string(12) "$carry2  240"
string(12) "$carry-  240"
string(9) "$item-  5"
string(13) "$carry2  1200"
int(1200)


$x = array();
var_dump(array_reduce($x, "sum", "No data to reduce")); 

string(17) "No data to reduce"

举例类

再看一个实例:

约定一个接口规范:

<?php 
interface RequestInterface
{
    // 我们之前讲过装饰者模式,这里是通过闭包函数实现
    // 通过之后实现类及调用就可以看出
    public static function handle(Closure $next);
}

遵循接口开始实现:

class Request1 implements RequestInterface
{
    public static function handle(Closure $next)
    {
        echo "Request1 Begin." . "<br />";
        $next();
        echo "Request1 End." . "<br />";
    }
}

class Request2 implements RequestInterface
{
    public static function handle(Closure $next)
    {
        echo "Request2 Begin." . "<br />";
        $next();
        echo "Request2 End." . "<br />";
    }
}

class Request3 implements RequestInterface
{
    public static function handle(Closure $next)
    {
        echo "Request3 Begin." . "<br />";
        $next();
        echo "Request3 End." . "<br />";
    }
}

class Request4 implements RequestInterface
{
    public static function handle(Closure $next)
    {
        echo "Request4 Begin." . "<br />";
        $next();
        echo "Request4 End." . "<br />";
    }
}

客户端来发出请求:

class Client
{
    // 这里包含了所有的请求
    private $pipes = [
        'Request1',
        'Request2',
        'Request3',
        'Request4',
    ];

    // 这里就是思维导图中默认返回的匿名回调函数
    private function defaultClosure()
    {
        return function () {
            echo '请求处理中...' . "<br />";
        };
    }

    // 这里就是整个请求处理管道的关键
    private function getSlice()
    {
        // $stack是一个函数,$pipe是一个闭包函数,作用于$stack
        return function ($stack, $pipe) {
            // 返回一个闭包
            return function () use ($stack, $pipe) {
                return $pipe::handle($stack);
            };
        };
    }

    // 这里是负责发起请求处理
    public function then()
    {
        call_user_func(array_reduce($this->pipes, $this->getSlice(), $this->defaultClosure()));
    }
}

$worker = new Client();
$worker->then();

运行结果:

Request4 Begin.
Request3 Begin.
Request2 Begin.
Request1 Begin.
请求处理中...
Request1 End.
Request2 End.
Request3 End.
Request4 End.

getSlice() 循环闭包返回值也是一个闭包,在不断的循环中,闭包函数就像洋葱一样,一层一层不断包裹。

最终array_reduce()返回一个匿名函数:

function () {
    return Request4::handle(function () {
        return Request3::handle(function () {
            return Request2::handle(function () {
                return Reuqest1::handle(function () {
                    echo '请求处理中...' . "<br />";
                });
            });
        });
    });
}

当调用call_user_func时就是执行array_reduce返回的匿名函数。

这里没有用到类来对实例对象进行包装,而是通过闭包函数完成,每一次处理在上一次处理之上进行包装,最后获得响应,就像流水线一样,一个请求进来通过一道道工序加工(包装)最后生成响应。

我们把上面的例子再丰富一下:

<?php 
interface RequestInterface
{
    // 我们之前讲过装饰者模式,这里是通过闭包函数实现
    // 通过之后实现类及调用就可以看出
    public static function handle($param, Closure $next);
}
	
class Request1 implements RequestInterface
{
    public static function handle($param, Closure $next)
    {
        echo "Request1 Begin." . "<br />";
        $next($param);
        echo "Request1 End." . "<br />";
    }
}

class Request2 implements RequestInterface
{
    public static function handle($param, Closure $next)
    {
        echo "Request2 Begin." . "<br />";
        $next($param);
        echo "Request2 End." . "<br />";
    }
}

class Request3 implements RequestInterface
{
    public static function handle($param, Closure $next)
    {
        echo "Request3 Begin." . "<br />";
        $next($param);
        echo "Request3 End." . "<br />";
    }
}

class Request4 implements RequestInterface
{
    public static function handle($param, Closure $next)
    {
        echo "Request4 Begin." . "<br />";
        $next($param);
        echo "Request4 End." . "<br />";
    }
}

class Client
{
    // 这里包含了所有的请求
    private $pipes = [
        'Request1',
        'Request2',
        'Request3',
        'Request4',
    ];

    // 这里就是思维导图中默认返回的匿名回调函数
    private function defaultClosure($param)
    {
        return function ($param) {
            echo '请求处理中...'. $param . "<br />";
        };
    }

    // 这里就是整个请求处理管道的关键
    private function getSlice()
    {
        // $stack是一个函数,$pipe是一个闭包函数,作用于$stack
        return function ($stack, $pipe) {
            // 返回一个闭包
            return function ($param) use ($stack, $pipe) {
                return $pipe::handle($param, $stack);
            };
        };
    }

    // 这里是负责发起请求处理
    public function then()
    {
        call_user_func(array_reduce(array_reverse($this->pipes), $this->getSlice(), $this->defaultClosure($param)), 'abc123');
    }
}

$worker = new Client();
$worker->then();
  1. 增加参数 $param
  2. 反转中间件执行顺序

输出结果:

Request1 Begin.
Request2 Begin.
Request3 Begin.
Request4 Begin.
请求处理中...abc123
Request4 End.
Request3 End.
Request2 End.
Request1 End.

call_user_func()函数:

call_user_func(callable $callback, mixed ...$args): mixed

因为上面array_reduce([…], func(f, b), func(arg))第三个参数是函数,所以返回的是一个函数,call_user_func(func(), arg)调用时就要加第二个参数arg。

array_reduce() 返回一个匿名函数:

function ($param) {
    return Request1::handle($param, function ($param) {
        return Request2::handle($param, function ($param) {
            return Request3::handle($param, function ($param) {
                return Reuqest4::handle($param, function ($param) {
                    echo '请求处理中...' . $param . "<br />";
                });
            });
        });
    });
}

再改一下看下过程:

private function getSlice()
{
    echo 'a1' ."<br />";
    return function ($stack, $pipe) {
        echo 'b2(' . $pipe . ')' ."<br />";
        return function ($param) use ($stack, $pipe) {
            echo 'c3(' . $pipe . ')' ."<br />";
            return $pipe::handle($param, $stack);
        };
    };
}

输出结果:

a1
b2(Request4)
b2(Request3)
b2(Request2)
b2(Request1)
c3(Request1)
Request1 Begin.
c3(Request2)
Request2 Begin.
c3(Request3)
Request3 Begin.
c3(Request4)
Request4 Begin.
请求处理中...abc123
Request4 End.
Request3 End.
Request2 End.
Request1 End.

为什么先是多个b2呢?不应该是b2、c3交替出现吗?

核心原因:中间件管道的「构建期」与「执行期」分离。

1、构建期。array_reduce 会一次性遍历所有中间件(Request1~Request4),每次遍历都会执行:

return function ($stack, $pipe) {
    echo 'b2' ."<br />"; // 此处立即输出 b2
    return function ($param) use ($stack, $pipe) { ... };
};

遍历4个中间件$pipe → 输出4次 b2,也可以说用C语言写成的array_reduce()的底层代码知道需要把中间件$pipes遍历一遍,所以先把所有$pipes遍历一遍。此时仅构建中间件嵌套结构,并未执行实际业务逻辑。

2、执行期。当 call_user_func 触发最终闭包handle时,才会从外层到内层依次执行嵌套闭包:

Request1 的闭包({
    echo 'c3';
    Request2 的闭包({
        echo 'c3';
        Request3 的闭包({
            echo 'c3';
            Request4 的闭包({
                echo 'c3';
                执行默认闭包();
            });
        });
    });
});

此时才会依次输出 c3 和中间件逻辑。

这就是Laravel中的Pipeline管道设计。

Laravel实例

在Laravel中最典型的应用就是中间件,每个中间件都是一个管道,控制器接受一个请求,请求会被每个中间件处理,最后返回给用户。

看一下请求的处理:

<?php

namespace Illuminate\Foundation\Http;

class Kernel implements KernelContract
{
    /**省略若干**/

    /**
     * Handle an incoming HTTP request.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function handle($request)
    {
        try {
            $request->enableHttpMethodParameterOverride();

            $response = $this->sendRequestThroughRouter($request);  // 调用下面的方法
        } catch (Exception $e) {
            $this->reportException($e);

            $response = $this->renderException($request, $e);
        } catch (Throwable $e) {
            $this->reportException($e = new FatalThrowableError($e));

            $response = $this->renderException($request, $e);
        }

        $this->app['events']->dispatch(
            new RequestHandled($request, $response)
        );

        return $response;
    }

    /**
     * Send the given request through the middleware / router.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    protected function sendRequestThroughRouter($request)
    {
        $this->app->instance('request', $request);

        Facade::clearResolvedInstance('request');

        $this->bootstrap();

        return (new Pipeline($this->app))
                    ->send($request)
                    ->through($this->app->shouldSkipMiddleware() ? [] : $this->middleware)
                    ->then($this->dispatchToRouter());
    }

    /**省略若干**/
}

如在论坛用户发布的贴子中,用 纯文本替换链接标记、用“*”替换敏感词、从内容中完全删除脚本标记。下面的伪代码:

<?php

public function create(Request $request)
{
    // task 类集合
    $pipes = [
        RemoveBadWords::class,
        ReplaceLinkTags::class,
        RemoveScriptTags::class
    ];

    $post = app(Pipeline::class)
        ->send($request->content)
        ->through($pipes)
        ->then(function ($content) {
            return Post::create(['content' => 'content']);
        });
    // return any type of response
}

每个 “task” 类应该有一个 “handle” 方法来执行操作。每个类都确定一个统一的约束:

<?php

namespace App;

use Closure;

interface Pipe
{
    public function handle($content, Closure $next);
}

替换敏感词 RemoveBadWords类:

<?php

namespace App;

use Closure;

class RemoveBadWords implements Pipe
{
    public function handle($content, Closure $next)
    {
        // Here you perform the task and return the updated $content to the next pipe
        $content = str_replace('badword', '****', $content);

        return $next($content);
    }
}

用于执行任务的方法应该接收两个参数,第一个参数是合格的对象,第二个参数是当前操作处理完后会接管的下一个闭包(匿名函数)。

可以使用自定义方法名称而不是“handle”。然后你需要指定pipeline要使用的方法名称,比如:

$post = app(Pipeline::class)
    ->send($request->content)
    ->through($pipes)
    ->via('customMethodName') // <---- This one,取代 handle 方法
    ->then(function ($content) {
        return Post::create(['content' => $content]);
    });

提交的内容将会被各个$pipes 所处理, 被处理的结果将会存储下来。

Laravel代码分析

先看下Laravel框架中Pipeline类的代码:

<?php

namespace Illuminate\Pipeline;

use Closure;
use Exception;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Pipeline\Pipeline as PipelineContract;
use RuntimeException;
use Symfony\Component\Debug\Exception\FatalThrowableError;
use Throwable;

class Pipeline implements PipelineContract
{
    /**
     * The container implementation.
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * The object being passed through the pipeline.
     *
     * @var mixed
     */
    protected $passable;

    /**
     * The array of class pipes.
     *
     * @var array
     */
    protected $pipes = [];

    /**
     * The method to call on each pipe.
     *
     * @var string
     */
    protected $method = 'handle';

    /**
     * Create a new class instance.
     *
     * @param  \Illuminate\Contracts\Container\Container|null  $container
     * @return void
     */
    public function __construct(Container $container = null)
    {
        $this->container = $container;
    }

    /**
     * Set the object being sent through the pipeline.
     *
     * @param  mixed  $passable
     * @return $this
     */
    public function send($passable)
    {
        $this->passable = $passable;

        return $this;
    }

    /**
     * Set the array of pipes.
     *
     * @param  array|mixed  $pipes
     * @return $this
     */
    public function through($pipes)
    {
        $this->pipes = is_array($pipes) ? $pipes : func_get_args();

        return $this;
    }

    /**
     * Set the method to call on the pipes.
     *
     * @param  string  $method
     * @return $this
     */
    public function via($method)
    {
        $this->method = $method;

        return $this;
    }

    /**
     * Run the pipeline with a final destination callback.
     *
     * @param  \Closure  $destination
     * @return mixed
     */
    public function then(Closure $destination)
    {
        // 返回一个洋葱函数体
        $pipeline = array_reduce(
            array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
        );

        return $pipeline($this->passable);  // 在这里传入 carry() 中的 $passable参数
    }

    /**
     * Run the pipeline and return the result.
     *
     * @return mixed
     */
    public function thenReturn()
    {
        return $this->then(function ($passable) {
            return $passable;
        });
    }

    /**
     * Get the final piece of the Closure onion.
     *
     * @param  \Closure  $destination
     * @return \Closure
     */
    protected function prepareDestination(Closure $destination)
    {
        return function ($passable) use ($destination) {
            try {
                return $destination($passable);
            } catch (Exception $e) {
                return $this->handleException($passable, $e);
            } catch (Throwable $e) {
                return $this->handleException($passable, new FatalThrowableError($e));
            }
        };
    }

    /**
     * Get a Closure that represents a slice of the application onion.
     *
     * @return \Closure
     */
    protected function carry()
    {
        return function ($stack, $pipe) {
            return function ($passable) use ($stack, $pipe) {
                try {
                    if (is_callable($pipe)) {
                        // If the pipe is a callable, then we will call it directly, but otherwise we
                        // will resolve the pipes out of the dependency container and call it with
                        // the appropriate method and arguments, returning the results back out.
                        return $pipe($passable, $stack);
                    } elseif (! is_object($pipe)) {
                        [$name, $parameters] = $this->parsePipeString($pipe);

                        // If the pipe is a string we will parse the string and resolve the class out
                        // of the dependency injection container. We can then build a callable and
                        // execute the pipe function giving in the parameters that are required.
                        $pipe = $this->getContainer()->make($name);

                        $parameters = array_merge([$passable, $stack], $parameters);
                    } else {
                        // If the pipe is already an object we'll just make a callable and pass it to
                        // the pipe as-is. There is no need to do any extra parsing and formatting
                        // since the object we're given was already a fully instantiated object.
                        $parameters = [$passable, $stack];
                    }

                    $carry = method_exists($pipe, $this->method)
                                    ? $pipe->{$this->method}(...$parameters)
                                    : $pipe(...$parameters);

                    return $this->handleCarry($carry);
                } catch (Exception $e) {
                    return $this->handleException($passable, $e);
                } catch (Throwable $e) {
                    return $this->handleException($passable, new FatalThrowableError($e));
                }
            };
        };
    }

    /**
     * Parse full pipe string to get name and parameters.
     *
     * @param  string  $pipe
     * @return array
     */
    protected function parsePipeString($pipe)
    {
        [$name, $parameters] = array_pad(explode(':', $pipe, 2), 2, []);

        if (is_string($parameters)) {
            $parameters = explode(',', $parameters);
        }

        return [$name, $parameters];
    }

    /**
     * Get the array of configured pipes.
     *
     * @return array
     */
    protected function pipes()
    {
        return $this->pipes;
    }

    /**
     * Get the container instance.
     *
     * @return \Illuminate\Contracts\Container\Container
     *
     * @throws \RuntimeException
     */
    protected function getContainer()
    {
        if (! $this->container) {
            throw new RuntimeException('A container instance has not been passed to the Pipeline.');
        }

        return $this->container;
    }

    /**
     * Handle the value returned from each pipe before passing it to the next.
     *
     * @param  mixed  $carry
     * @return mixed
     */
    protected function handleCarry($carry)
    {
        return $carry;
    }

    /**
     * Handle the given exception.
     *
     * @param  mixed  $passable
     * @param  \Exception  $e
     * @return mixed
     *
     * @throws \Exception
     */
    protected function handleException($passable, Exception $e)
    {
        throw $e;
    }
}

请求调用中,使用中间件:

(new Pipeline($this->app))
    ->send($request)
    ->through($this->app->shouldSkipMiddleware() ? [] : $this->middleware)
    ->then($this->dispatchToRouter());

shouldSkipMiddleware() 判断中间件是否可用:

public function shouldSkipMiddleware()
{
    return $this->bound('middleware.disable') &&
            $this->make('middleware.disable') === true;
}

中间件 $this->middlewareapp/Http/Kernel.php 下,我们自己可以增加:

/**
 * The application's global HTTP middleware stack.
 *
 * These middleware are run during every request to your application.
 *
 * @var array
 */
protected $middleware = [
    \App\Http\Middleware\TrustProxies::class,
    \App\Http\Middleware\CheckForMaintenanceMode::class,
    \Illuminate\Foundation\Http\Middleware\ValidatePostSize::class,
    \App\Http\Middleware\TrimStrings::class,
    \Illuminate\Foundation\Http\Middleware\ConvertEmptyStringsToNull::class,
];

看一下 \App\Http\Middleware\TrimStrings::class 的代码,用于字符trim处理:

<?php

namespace App\Http\Middleware;

use Illuminate\Foundation\Http\Middleware\TrimStrings as Middleware;

class TrimStrings extends Middleware
{
    /**
     * The names of the attributes that should not be trimmed.
     *
     * @var array
     */
    protected $except = [
        'password',
        'password_confirmation',
    ];
}

父类:

<?php

namespace Illuminate\Foundation\Http\Middleware;

class TrimStrings extends TransformsRequest
{
    /**
     * The attributes that should not be trimmed.
     *
     * @var array
     */
    protected $except = [
        //
    ];

    /**
     * Transform the given value.
     *
     * @param  string  $key
     * @param  mixed  $value
     * @return mixed
     */
    protected function transform($key, $value)
    {
        if (in_array($key, $this->except, true)) {
            return $value;
        }

        return is_string($value) ? trim($value) : $value;
    }
}

父类:

<?php

namespace Illuminate\Foundation\Http\Middleware;

use Closure;
use Symfony\Component\HttpFoundation\ParameterBag;

class TransformsRequest
{
    /**
     * Handle an incoming request.
     *
     * @param  \Illuminate\Http\Request  $request
     * @param  \Closure  $next
     * @return mixed
     */
    public function handle($request, Closure $next)
    {
        $this->clean($request);

        return $next($request);
    }

    /**
     * Clean the request's data.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return void
     */
    protected function clean($request)
    {
        $this->cleanParameterBag($request->query);

        if ($request->isJson()) {
            $this->cleanParameterBag($request->json());
        } elseif ($request->request !== $request->query) {
            $this->cleanParameterBag($request->request);
        }
    }

    /**
     * Clean the data in the parameter bag.
     *
     * @param  \Symfony\Component\HttpFoundation\ParameterBag  $bag
     * @return void
     */
    protected function cleanParameterBag(ParameterBag $bag)
    {
        $bag->replace($this->cleanArray($bag->all()));
    }

    /**
     * Clean the data in the given array.
     *
     * @param  array  $data
     * @param  string  $keyPrefix
     * @return array
     */
    protected function cleanArray(array $data, $keyPrefix = '')
    {
        return collect($data)->map(function ($value, $key) use ($keyPrefix) {
            return $this->cleanValue($keyPrefix.$key, $value);
        })->all();
    }

    /**
     * Clean the given value.
     *
     * @param  string  $key
     * @param  mixed  $value
     * @return mixed
     */
    protected function cleanValue($key, $value)
    {
        if (is_array($value)) {
            return $this->cleanArray($value, $key.'.');
        }

        return $this->transform($key, $value);
    }

    /**
     * Transform the given value.
     *
     * @param  string  $key
     * @param  mixed  $value
     * @return mixed
     */
    protected function transform($key, $value)
    {
        return $value;
    }
}

除了中间件,还有路由执行:

<?php

namespace Illuminate\Routing;

class Router implements BindingRegistrar, RegistrarContract
{
    /**省略若干**/

    /**
     * Run the given route within a Stack "onion" instance.
     *
     * @param  \Illuminate\Routing\Route  $route
     * @param  \Illuminate\Http\Request  $request
     * @return mixed
     */
    protected function runRouteWithinStack(Route $route, Request $request)
    {
        $shouldSkipMiddleware = $this->container->bound('middleware.disable') &&
                                $this->container->make('middleware.disable') === true;

        $middleware = $shouldSkipMiddleware ? [] : $this->gatherRouteMiddleware($route);

        return (new Pipeline($this->container))
                        ->send($request)
                        ->through($middleware)
                        ->then(function ($request) use ($route) {
                            return $this->prepareResponse(
                                $request, $route->run()
                            );
                        });
    }

    /**省略若干**/
}

队列执行:

<?php

namespace Illuminate\Queue;

class CallQueuedHandler
{
    /**省略若干**/

    /**
     * Dispatch the given job / command through its specified middleware.
     *
     * @param  \Illuminate\Contracts\Queue\Job  $job
     * @param  mixed  $command
     * @return mixed
     */
    protected function dispatchThroughMiddleware(Job $job, $command)
    {
        return (new Pipeline($this->container))->send($command)
                ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
                ->then(function ($command) use ($job) {
                    return $this->dispatcher->dispatchNow(
                        $command, $this->resolveHandler($job, $command)
                    );
                });
    }

    /**省略若干**/
}
<?php

namespace Illuminate\Bus;

class Dispatcher implements QueueingDispatcher
{
    /**省略若干**/

    /**
     * The pipeline instance for the bus.
     *
     * @var \Illuminate\Pipeline\Pipeline
     */
    protected $pipeline;

    public function __construct(Container $container, Closure $queueResolver = null)
    {
        $this->container = $container;
        $this->queueResolver = $queueResolver;
        $this->pipeline = new Pipeline($container);
    }


    /**省略若干**/
}

Pipeline源码

Pipeline 服务提供者在服务启动时,引入了注册配置 app\config\app.php 的 providers 数组:

<?php

return [

    /*
    |--------------------------------------------------------------------------
    | Application Name
    |--------------------------------------------------------------------------
    |
    | This value is the name of your application. This value is used when the
    | framework needs to place the application's name in a notification or
    | any other location as required by the application or its packages.
    |
    */

    'name' => env('APP_NAME', 'Laravel'),

    /*
    |--------------------------------------------------------------------------
    | Application Environment
    |--------------------------------------------------------------------------
    |
    | This value determines the "environment" your application is currently
    | running in. This may determine how you prefer to configure various
    | services the application utilizes. Set this in your ".env" file.
    |
    */

    'env' => env('APP_ENV', 'production'),

    /*
    |--------------------------------------------------------------------------
    | Application Debug Mode
    |--------------------------------------------------------------------------
    |
    | When your application is in debug mode, detailed error messages with
    | stack traces will be shown on every error that occurs within your
    | application. If disabled, a simple generic error page is shown.
    |
    */

    'debug' => (bool) env('APP_DEBUG', false),

    /*
    |--------------------------------------------------------------------------
    | Application URL
    |--------------------------------------------------------------------------
    |
    | This URL is used by the console to properly generate URLs when using
    | the Artisan command line tool. You should set this to the root of
    | your application so that it is used when running Artisan tasks.
    |
    */

    'url' => env('APP_URL', 'http://localhost'),

    'asset_url' => env('ASSET_URL', null),

    /*
    |--------------------------------------------------------------------------
    | Application Timezone
    |--------------------------------------------------------------------------
    |
    | Here you may specify the default timezone for your application, which
    | will be used by the PHP date and date-time functions. We have gone
    | ahead and set this to a sensible default for you out of the box.
    |
    */

    'timezone' => 'Asia/Shanghai',

    /*
    |--------------------------------------------------------------------------
    | Application Locale Configuration
    |--------------------------------------------------------------------------
    |
    | The application locale determines the default locale that will be used
    | by the translation service provider. You are free to set this value
    | to any of the locales which will be supported by the application.
    |
    */

    'locale' => 'en',

    /*
    |--------------------------------------------------------------------------
    | Application Fallback Locale
    |--------------------------------------------------------------------------
    |
    | The fallback locale determines the locale to use when the current one
    | is not available. You may change the value to correspond to any of
    | the language folders that are provided through your application.
    |
    */

    'fallback_locale' => 'en',

    /*
    |--------------------------------------------------------------------------
    | Faker Locale
    |--------------------------------------------------------------------------
    |
    | This locale will be used by the Faker PHP library when generating fake
    | data for your database seeds. For example, this will be used to get
    | localized telephone numbers, street address information and more.
    |
    */

    'faker_locale' => 'en_US',

    /*
    |--------------------------------------------------------------------------
    | Encryption Key
    |--------------------------------------------------------------------------
    |
    | This key is used by the Illuminate encrypter service and should be set
    | to a random, 32 character string, otherwise these encrypted strings
    | will not be safe. Please do this before deploying an application!
    |
    */

    'key' => env('APP_KEY'),

    'cipher' => 'AES-256-CBC',

    /*
    |--------------------------------------------------------------------------
    | Autoloaded Service Providers
    |--------------------------------------------------------------------------
    |
    | The service providers listed here will be automatically loaded on the
    | request to your application. Feel free to add your own services to
    | this array to grant expanded functionality to your applications.
    |
    */

    'providers' => [

        /*
         * Laravel Framework Service Providers...
         */
        Illuminate\Auth\AuthServiceProvider::class,
        Illuminate\Broadcasting\BroadcastServiceProvider::class,
        Illuminate\Bus\BusServiceProvider::class,
        Illuminate\Cache\CacheServiceProvider::class,
        Illuminate\Foundation\Providers\ConsoleSupportServiceProvider::class,
        Illuminate\Cookie\CookieServiceProvider::class,
        Illuminate\Database\DatabaseServiceProvider::class,
        Illuminate\Encryption\EncryptionServiceProvider::class,
        Illuminate\Filesystem\FilesystemServiceProvider::class,
        Illuminate\Foundation\Providers\FoundationServiceProvider::class,
        Illuminate\Hashing\HashServiceProvider::class,
        Illuminate\Mail\MailServiceProvider::class,
        Illuminate\Notifications\NotificationServiceProvider::class,
        Illuminate\Pagination\PaginationServiceProvider::class,
        Illuminate\Pipeline\PipelineServiceProvider::class,                // 注册 Pipeline  
        Illuminate\Queue\QueueServiceProvider::class,
        Illuminate\Redis\RedisServiceProvider::class,
        Illuminate\Auth\Passwords\PasswordResetServiceProvider::class,
        Illuminate\Session\SessionServiceProvider::class,
        Illuminate\Translation\TranslationServiceProvider::class,
        Illuminate\Validation\ValidationServiceProvider::class,
        Illuminate\View\ViewServiceProvider::class,

        /*
         * Package Service Providers...
         */

        /*
         * Application Service Providers...
         */
        App\Providers\AppServiceProvider::class,
        App\Providers\AuthServiceProvider::class,
        // App\Providers\BroadcastServiceProvider::class,
        App\Providers\EventServiceProvider::class,
        App\Providers\RouteServiceProvider::class,

    ],

    /*
    |--------------------------------------------------------------------------
    | Class Aliases
    |--------------------------------------------------------------------------
    |
    | This array of class aliases will be registered when this application
    | is started. However, feel free to register as many as you wish as
    | the aliases are "lazy" loaded so they don't hinder performance.
    |
    */

    'aliases' => [

        'App' => Illuminate\Support\Facades\App::class,
        'Arr' => Illuminate\Support\Arr::class,
        'Artisan' => Illuminate\Support\Facades\Artisan::class,
        'Auth' => Illuminate\Support\Facades\Auth::class,
        'Blade' => Illuminate\Support\Facades\Blade::class,
        'Broadcast' => Illuminate\Support\Facades\Broadcast::class,
        'Bus' => Illuminate\Support\Facades\Bus::class,
        'Cache' => Illuminate\Support\Facades\Cache::class,
        'Config' => Illuminate\Support\Facades\Config::class,
        'Cookie' => Illuminate\Support\Facades\Cookie::class,
        'Crypt' => Illuminate\Support\Facades\Crypt::class,
        'DB' => Illuminate\Support\Facades\DB::class,
        'Eloquent' => Illuminate\Database\Eloquent\Model::class,
        'Event' => Illuminate\Support\Facades\Event::class,
        'File' => Illuminate\Support\Facades\File::class,
        'Gate' => Illuminate\Support\Facades\Gate::class,
        'Hash' => Illuminate\Support\Facades\Hash::class,
        'Lang' => Illuminate\Support\Facades\Lang::class,
        'Log' => Illuminate\Support\Facades\Log::class,
        'Mail' => Illuminate\Support\Facades\Mail::class,
        'Notification' => Illuminate\Support\Facades\Notification::class,
        'Password' => Illuminate\Support\Facades\Password::class,
        'Queue' => Illuminate\Support\Facades\Queue::class,
        'Redirect' => Illuminate\Support\Facades\Redirect::class,
        'Redis' => Illuminate\Support\Facades\Redis::class,
        'Request' => Illuminate\Support\Facades\Request::class,
        'Response' => Illuminate\Support\Facades\Response::class,
        'Route' => Illuminate\Support\Facades\Route::class,
        'Schema' => Illuminate\Support\Facades\Schema::class,
        'Session' => Illuminate\Support\Facades\Session::class,
        'Storage' => Illuminate\Support\Facades\Storage::class,
        'Str' => Illuminate\Support\Str::class,
        'URL' => Illuminate\Support\Facades\URL::class,
        'Validator' => Illuminate\Support\Facades\Validator::class,
        'View' => Illuminate\Support\Facades\View::class,

    ],

];

PipelineServiceProvider 类源码:

<?php

namespace Illuminate\Pipeline;

use Illuminate\Contracts\Pipeline\Hub as PipelineHubContract;
use Illuminate\Contracts\Support\DeferrableProvider;
use Illuminate\Support\ServiceProvider;

class PipelineServiceProvider extends ServiceProvider implements DeferrableProvider
{
    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
    {
        $this->app->singleton(
            PipelineHubContract::class, Hub::class
        );
    }

    /**
     * Get the services provided by the provider.
     *
     * @return array
     */
    public function provides()
    {
        return [
            PipelineHubContract::class,
        ];
    }
}

Illuminate\Contracts\Pipeline\Hub 源码:

<?php

namespace Illuminate\Contracts\Pipeline;

interface Hub
{
    /**
     * Send an object through one of the available pipelines.
     *
     * @param  mixed  $object
     * @param  string|null  $pipeline
     * @return mixed
     */
    public function pipe($object, $pipeline = null);
}

Illuminate\Pipeline\Hub 源码:

<?php

namespace Illuminate\Pipeline;

use Closure;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Pipeline\Hub as HubContract;

class Hub implements HubContract
{
    /**
     * The container implementation.
     *
     * @var \Illuminate\Contracts\Container\Container|null
     */
    protected $container;

    /**
     * All of the available pipelines.
     * 所有可用的管道.
     *
     * @var array
     */
    protected $pipelines = [];

    /**
     * Create a new Hub instance.
     *
     * @param  \Illuminate\Contracts\Container\Container|null  $container
     * @return void
     */
    public function __construct(Container $container = null)
    {
        $this->container = $container;
    }

    /**
     * Define the default named pipeline.
     * 定义默认命名管道.
     *
     * @param  \Closure  $callback
     * @return void
     */
    public function defaults(Closure $callback)
    {
        return $this->pipeline('default', $callback);
    }

    /**
     * Define a new named pipeline.
     * 定义新的管道名,存入 $pipelines .
     *
     * @param  string  $name
     * @param  \Closure  $callback
     * @return void
     */
    public function pipeline($name, Closure $callback)
    {
        $this->pipelines[$name] = $callback;
    }

    /**
     * Send an object through one of the available pipelines.
     * 通过其中一个可用管道发送对象。
     *
     * @param  mixed  $object
     * @param  string|null  $pipeline
     * @return mixed
     */
    public function pipe($object, $pipeline = null)
    {
        $pipeline = $pipeline ?: 'default';

        return call_user_func(
            $this->pipelines[$pipeline], new Pipeline($this->container), $object
        );
    }
}

Illuminate\Contracts\Pipeline\Pipeline 接口源码:

<?php

namespace Illuminate\Contracts\Pipeline;

use Closure;

interface Pipeline
{
    /**
     * Set the traveler object being sent on the pipeline.
     *
     * @param  mixed  $traveler
     * @return $this
     */
    public function send($traveler);

    /**
     * Set the stops of the pipeline.
     *
     * @param  dynamic|array  $stops
     * @return $this
     */
    public function through($stops);

    /**
     * Set the method to call on the stops.
     *
     * @param  string  $method
     * @return $this
     */
    public function via($method);

    /**
     * Run the pipeline with a final destination callback.
     *
     * @param  \Closure  $destination
     * @return mixed
     */
    public function then(Closure $destination);
}

Illuminate\Pipeline\Pipeline 源码:

<?php

namespace Illuminate\Pipeline;

use Closure;
use Exception;
use Illuminate\Contracts\Container\Container;
use Illuminate\Contracts\Pipeline\Pipeline as PipelineContract;
use RuntimeException;
use Symfony\Component\Debug\Exception\FatalThrowableError;
use Throwable;

class Pipeline implements PipelineContract
{
    /**
     * The container implementation.
     * 通过管道传递的对象。
     *
     * @var \Illuminate\Contracts\Container\Container
     */
    protected $container;

    /**
     * The object being passed through the pipeline.
     *
     * @var mixed
     */
    protected $passable;

    /**
     * The array of class pipes.
     * 类管道的数组。
     *
     * @var array
     */
    protected $pipes = [];

    /**
     * The method to call on each pipe.
     * 要在每个管道上调用的方法。
     *
     * @var string
     */
    protected $method = 'handle';

    /**
     * Create a new class instance.
     *
     * @param  \Illuminate\Contracts\Container\Container|null  $container
     * @return void
     */
    public function __construct(Container $container = null)
    {
        $this->container = $container;
    }

    /**
     * Set the object being sent through the pipeline.
     * 设置通过管道发送的对象
     *
     * @param  mixed  $passable
     * @return $this
     */
    public function send($passable)
    {
        $this->passable = $passable;

        return $this;
    }

    /**
     * Set the array of pipes.
     * 设置管道数组。
     *
     * @param  array|mixed  $pipes
     * @return $this
     */
    public function through($pipes)
    {
        $this->pipes = is_array($pipes) ? $pipes : func_get_args();

        return $this;
    }

    /**
     * Set the method to call on the pipes.
     * 设置要在管道上调用的方法,这里修改默认管道调用 handle 方法
     *
     * @param  string  $method
     * @return $this
     */
    public function via($method)
    {
        $this->method = $method;

        return $this;
    }

    /**
     * Run the pipeline with a final destination callback.
     * 使用最终目标回调运行管道(核心点)
     *
     * @param  \Closure  $destination
     * @return mixed
     */
    public function then(Closure $destination)
    {
        $pipeline = array_reduce(
            array_reverse($this->pipes()), $this->carry(), $this->prepareDestination($destination)
        );

        return $pipeline($this->passable);
    }

    /**
     * Run the pipeline and return the result.
     * 运行管道并返回结果
     *
     * @return mixed
     */
    public function thenReturn()
    {
        return $this->then(function ($passable) {
            return $passable;
        });
    }

    /**
     * Get the final piece of the Closure onion.
     *
     * @param  \Closure  $destination
     * @return \Closure
     */
    protected function prepareDestination(Closure $destination)
    {
        return function ($passable) use ($destination) {
            try {
                return $destination($passable);
            } catch (Exception $e) {
                return $this->handleException($passable, $e);
            } catch (Throwable $e) {
                return $this->handleException($passable, new FatalThrowableError($e));
            }
        };
    }

    /**
     * Get a Closure that represents a slice of the application onion.
     *
     * @return \Closure
     */
    protected function carry()
    {
        return function ($stack, $pipe) {
            return function ($passable) use ($stack, $pipe) {
                try {
                    if (is_callable($pipe)) {
                        // If the pipe is a callable, then we will call it directly, but otherwise we
                        // will resolve the pipes out of the dependency container and call it with
                        // the appropriate method and arguments, returning the results back out.
                        return $pipe($passable, $stack);
                    } elseif (! is_object($pipe)) {
                        [$name, $parameters] = $this->parsePipeString($pipe);

                        // If the pipe is a string we will parse the string and resolve the class out
                        // of the dependency injection container. We can then build a callable and
                        // execute the pipe function giving in the parameters that are required.
                        $pipe = $this->getContainer()->make($name);

                        $parameters = array_merge([$passable, $stack], $parameters);
                    } else {
                        // If the pipe is already an object we'll just make a callable and pass it to
                        // the pipe as-is. There is no need to do any extra parsing and formatting
                        // since the object we're given was already a fully instantiated object.
                        $parameters = [$passable, $stack];
                    }

                    $carry = method_exists($pipe, $this->method)
                                    ? $pipe->{$this->method}(...$parameters)
                                    : $pipe(...$parameters);

                    return $this->handleCarry($carry);
                } catch (Exception $e) {
                    return $this->handleException($passable, $e);
                } catch (Throwable $e) {
                    return $this->handleException($passable, new FatalThrowableError($e));
                }
            };
        };
    }

    /**
     * Parse full pipe string to get name and parameters.
     *
     * @param  string  $pipe
     * @return array
     */
    protected function parsePipeString($pipe)
    {
        [$name, $parameters] = array_pad(explode(':', $pipe, 2), 2, []);

        if (is_string($parameters)) {
            $parameters = explode(',', $parameters);
        }

        return [$name, $parameters];
    }

    /**
     * Get the array of configured pipes.
     *
     * @return array
     */
    protected function pipes()
    {
        return $this->pipes;
    }

    /**
     * Get the container instance.
     *
     * @return \Illuminate\Contracts\Container\Container
     *
     * @throws \RuntimeException
     */
    protected function getContainer()
    {
        if (! $this->container) {
            throw new RuntimeException('A container instance has not been passed to the Pipeline.');
        }

        return $this->container;
    }

    /**
     * Handle the value returned from each pipe before passing it to the next.
     *
     * @param  mixed  $carry
     * @return mixed
     */
    protected function handleCarry($carry)
    {
        return $carry;
    }

    /**
     * Handle the given exception.
     *
     * @param  mixed  $passable
     * @param  \Exception  $e
     * @return mixed
     *
     * @throws \Exception
     */
    protected function handleException($passable, Exception $e)
    {
        throw $e;
    }
}

array_reduce的C语言源码

在 https://github.com/php/php-src/blob/master/ext/standard/array.c 中:

参数解析 → 初始化返回值 → 获取哈希表 → 遍历数组元素 → 传递(累积结果, 当前元素) → 调用回调 → 更新累积结果 → 返回最终值

参考资料

Laravel框架源码分析之Pipeline管道设计 https://zhuanlan.zhihu.com/p/669130742

一文了解Laravel中的Pipeline(管道) https://www.php.cn/faq/497864.html

Laravel系列6.4 管道过滤器 https://cloud.tencent.com/developer/article/2232112

laravel5.5源码笔记(五、Pipeline管道模式) https://www.cnblogs.com/wyycc/p/9986757.html

深度挖掘 Laravel 生命周期 https://learnku.com/articles/10421/depth-mining-of-laravel-life-cycle


返回