这是基于上一篇随笔:关于Web编程异步模型的白日梦的实现。这一思路我记得在 05 年还是 07 年的时候就在 ChinaUnix 上有高人所讨论,只是自己当时愚钝未能明晰本质,纠结于 PHP 的多线程之中……
这个实现写好有段时间了,最近琐碎的事情很多,一直没有整理出来。今日得闲记录下来。
利用PHP自带的 stream_select 函数实现异步,利用这个函数使得 PHP 原生支持的异步调用实现,无须第三方服务或库。不过只能实现二段式异步调用,就是说会有明显的 Begin 和 End 两个阶段。
代码比较容易理解,大家自己看吧。
先看看,封装好的方法,如何异步调用:
/**
* DEMO
*/
/**
* 请求集合
*/
$requestSet = array(
'foobar' => array(
'stream' => '127.0.0.1:80',
'path' => '/api.php',
'params' => array('a'=>1),
'callback' => 'echoArray',
),
'foo-bar' => array(
'stream' => '127.0.0.1:80',
'path' => '/api.php',
'params' => array('method'=>'get', 'table'=>'user', 'id'=>1),
'callback' => 'echoArray',
),
'another' => array(
'stream' => '127.0.0.1:80',
'path' => '/api.php',
'params' => array(),
'callback' => 'echoArray',
),
);
/**
* 演示处理函数,这里只作打印
* @param $a array 输入数组
*/
function echoArray($a) {
var_dump($a);
}
/**
* 二段异步调用
*/
/**
* 第一阶段,发起请求
*/
$fps = beginRequest($requestSet);
/**
* 第二阶段,处理返回
*/
endReponse($requestSet, $fps);
封装的异步函数:
/**
* 发起一个请求
* @param $stream string 主机、端口
* @param $path string api路径
* @param $params array 参数
* @return resource 文件描述符
*/
function request($stream, $path, $params) {
$fp = stream_socket_client("tcp://{$stream}", $errno, $errstr, 30);
if (!$fp) {
throw new Exception($errstr, $errno);
} else {
$params = http_build_query($params);
$host = explode(':', $stream);
$str = "GET {$path}?{$params} HTTP/1.0\r\nHost: {$host[0]}\r\nAccept: */*\r\n\r\n";
fwrite($fp, $str);
}
return $fp;
}
/**
* 获取请求的返回
* @param $fp resource 文件描述符
* @return array 取得的返回值
*/
function response($fp) {
$r = stream_get_contents($fp);
// 简陋的 HTTP 协议解析
$r = explode("\n", $r);
return json_decode($r[count($r) - 1]);
}
/**
* 发起请求
* @param $requestSet array 请求队列数组
* @return array 文件描述符数组
*/
function beginRequest($requestSet) {
$fps = array();
foreach($requestSet as $key => $request) {
try {
$fps[$key] = request($request['stream'], $request['path'], $request['params']);
} catch (Exception $e) {
echo $e;
}
}
return $fps;
}
/**
* 处理返回
* @param $requestSet array 请求队列数组
* @param $fps array 文件描述符数组
*/
function endReponse($requestSet, $fps) {
$r = $fps;
while(count($fps) > 0) {
// 这里是重点,stream_select 保证了先返回的先处理。
$n = stream_select($r, $w, $e, 0);
if ($n === false) {
echo $e;
continue;
}
if ($n > 0) {
foreach($r as $socket) {
// 读取返回值,并调用回调
$r = response($socket);
$key = array_search($socket, $fps);
$callback = $requestSet[$key]['callback'];
$callback($r);
fclose($socket);
unset($fps[$key]);
}
}
$r = $fps;
}
}
api.php 接口:
<?php
$a = array(
'hello' => 'world',
'foobar' => array(0,1,2,3,4),
'GET' => $_GET,
);
echo json_encode($a);
为了演示的需要剔除了面向对象、异常处理的一些内容。同时硬性规定 api 接口使用 json 通过 http 协议传递数据。
实际上,由于 php 的 stream 的功能可以处理包括 udp 协议在内的I/O。可利用其他协议和开销更小的报文格式,以便能获得更好的性能。
由于模型的限制,callback 异步无法直接实现,可将存在数据读取依赖关系的数据读取过程拆解为若干个二段异步调用。
PS:这里只是一个实现方法的演示,并不涉及该方法是否在真实环境可用。也就是说,虽然实现了PHP的异步调用,但是实际运行可能并不比顺序阻塞读取的性能高。
Leave a Reply