文件服务器
开发中中碰到一个问题,B项目要用到A项目中的图片,如何实现呢?
这里就要用到文件服务器,A项目和B项目共同使用文件服务器,包括文件新增、修改、删除。
PHP中使用Stream系列函数,折腾了一天没实现,问题卡在fread()
函数上。
因为文件会比较大,如何读取和组装就成了问题,关键就是要解决这个问题。
最后选择了使用Socket实现。
实现过程中需要考虑应用如何认证、文件是新增还是删除。下面写了一个无认证的新增文件服务端程序。
服务端
server.php
:
<?php
//设置不超时
set_time_limit(0);
class SocketServer
{
public function __construct($port)
{
$listen_socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_bind($listen_socket, '0.0.0.0', $port);
socket_listen($listen_socket);
while (true) {
$conn = socket_accept($listen_socket);
$data = '';
try {
while (!preg_match('/\r\n\r\n/', $data)) {
$data .= socket_read($conn, 1024);
}
$data = preg_replace('/\r\n\r\n/', '', $data);
$data = explode('<<<<<<>>>>>>', $data);
$file = new FileCreate($data['0']);
$file->write($data['1']);
$data = [
'code' => '1',
'data' => [
'path' => $file->getFilePath()
],
'msg' => 'ok'
];
} catch (Throwable $e) {
$data = [
'code' => '-1',
'data' => [],
'msg' => $e->getMessage()
];
}
$data = json_encode($data);
socket_write($conn, $data, strlen($data));
socket_close($conn);
}
socket_close($listen_socket);
}
}
class FileCreate
{
private $fd;
private $path_out;
public function __construct(string $name = '')
{
$path_upload = '/upload/' .date('Ymd');
$path_www = './www' .$path_upload;
$suffix = strtolower(pathinfo($name, PATHINFO_EXTENSION));
$suffix = $suffix && preg_match("/^[a-zA-Z0-9]+$/", $suffix) ? $suffix : 'file';
$filename = time() .uniqid() .'.' .$suffix;
if (!is_dir($path_www)) {
mkdir($path_www, 0755, true);
}
$this->fd = fopen($path_www .'/' .$filename, 'w+');
$this->path_out = $path_upload .'/' .$filename;
}
public function write($stream)
{
fwrite($this->fd, $stream);
fclose($this->fd);
}
public function getFilePath()
{
return $this->path_out;
}
}
new SocketServer(1034);
运行:
php server.php 1>/dev/null 2>&1 &
最好使用Supervisor。
具体在使用中,后端还要可以对文件进行管理,借用Yii2搭了个管理项目,命令行也使用项目中的console部分。
看一下封装:
<?php
namespace console\controllers;
use common\models\App;
use common\models\File;
use Yii;
use yii\console\Controller;
class FileController extends Controller
{
public function actionSocket()
{
if (($listen_socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)) === false) {
echo "socket_create() failed: reason: " . socket_strerror(socket_last_error()) . "\n";
die;
}
if ((socket_bind($listen_socket, '0.0.0.0', Yii::$app->fileShare->port)) === false) {
echo "socket_bind() failed: reason: " . socket_strerror(socket_last_error($listen_socket)) . "\n";
die;
}
if (socket_listen($listen_socket) === false) {
echo "socket_listen() failed: reason: " . socket_strerror(socket_last_error($listen_socket)) . "\n";
die;
}
while (true) {
if (($conn = socket_accept($listen_socket)) === false) {
continue;
}
$data = '';
try {
while (!preg_match('/\r\n\r\n/', $data)) {
$data .= socket_read($conn, 4096);
}
$data = preg_replace('/\r\n\r\n/', '', $data);
$data = explode('<<<<<<>>>>>>', $data);
if (!in_array(count($data), [4, 5]) || empty($data['0']) || empty($data['1']) || !in_array($data['2'], ['add', 'delete'])) {
throw new \yii\base\Exception('参数错误');
}
$modelApp = App::find()
->where(['app_uuid' => $data['0']])
->one();
if (empty($modelApp) || $modelApp->status != App::STATUS_ENABLE) {
throw new \yii\base\Exception('应用未找到或不可用');
}
if (!Yii::$app->security->validatePassword($data['1'], $modelApp->password_hash)) {
throw new \yii\base\Exception('应用密码错误');
}
if ($data['2'] == 'add') {
if (empty($data['3']) || empty($data['4'])) {
throw new \yii\base\Exception('文件参数为空');
}
$file_path = Yii::$app->fileShare->write($data['3'], $data['4']);
$modelFile = new File();
$modelFile->app_id = $modelApp->app_id;
$modelFile->location = $file_path;
$modelFile->status = File::STATUS_ENABLE;
$modelFile->created_at = $modelFile->updated_at = time();
if (!($modelFile->save())) {
throw new \yii\base\Exception('文件记录保存失败');
}
$data = [
'code' => '1',
'data' => [
'path' => $file_path
],
'msg' => 'ok'
];
} elseif ($data['2'] == 'delete') {
if (empty($data['3'])) {
throw new \yii\base\Exception('文件地址为空');
}
$modelFile = File::find()
->where(['app_id' => $modelApp->app_id, 'location' => $data['3']])
->one();
if (empty($modelFile) || $modelFile->status != File::STATUS_ENABLE) {
throw new \yii\base\Exception('文件未找到或已被删除');
}
$modelFile->status = File::STATUS_DELETE;
if (!($modelFile->save())) {
throw new \yii\base\Exception('文件删除失败');
}
$data = [
'code' => '1',
'data' => [],
'msg' => 'ok'
];
}
} catch (\Throwable $e) {
$data = [
'code' => '-1',
'data' => [],
'msg' => $e->getMessage()
];
}
$data = json_encode($data) ."\r\n\r\n";
socket_write($conn, $data, strlen($data));
socket_close($conn);
}
socket_close($listen_socket);
}
}
<?php
namespace common\components;
use yii;
use yii\base\Component;
class FileShare extends Component
{
public $path_www;
public $path_upload;
public $port;
public function init()
{
parent::init();
}
public function write($name, $stream)
{
$path_upload = $this->path_upload .date('Ymd');
$path_www = Yii::getAlias($this->path_www) .$path_upload;
$suffix = strtolower(pathinfo($name, PATHINFO_EXTENSION));
$suffix = $suffix && preg_match("/^[a-zA-Z0-9]+$/", $suffix) ? $suffix : 'file';
$filename = time() .uniqid() .'.' .$suffix;
if (!is_dir($path_www)) {
mkdir($path_www, 0755, true);
}
$fd = fopen($path_www .'/' .$filename, 'w+');
fwrite($fd, $stream);
fclose($fd);
return $path_upload .'/' .$filename;
}
}
配置文件:
'components' => [
'fileShare' => [
'class' => 'common\components\FileShare',
'path_www' => '@backend/web',
'path_upload' => '/upload/',
'port' => '1055',
],
]
启动命令:
php yii file/socket 1>/dev/null 2>&1 &
最后说一下这里使用 传统socket 与 select、epoll、协程 的区别。 对于这里的 监听socket,无论是在 传统socket,还是 select 或 epoll 中,都是同步阻塞服务,QPS没有什么区别, 都是对单端口的监听,多开几个进程也一样。 区别是在使用协程上。 使用PHP原生的yield,这里数据库连接需要一段时间,发起数据库连接的同时可以用协程把文件内容写入,这样在一定程度上可以提高QPS。 不过使用Swoole的协程,可以在 监听socket 上做文章,相当于在 各监听socket 间做协程切换,所以可以大幅度提高QPS。
客户端
客户端原本想用Socket实现的,发现通过Nginx请求PHP的web服务时报错:
Call to undefined function socket_create()
也是神奇,最后用了stream来实现,fread()
时需要注意第二个参数,这个参数是必填,
注意读取的大小,小了会不完整,大了又具体不知道究竟多大,尽量大一些。
客户端:
<?php
$file_name = $_FILES['image']['name'];
$file_path = $_FILES['image']['tmp_name'];
$data = $file_name .'<<<<<<>>>>>>';
$data .= file_get_contents($file_path);
$socket = stream_socket_client('tcp://192.168.56.108:1034', $errno, $errstr);
if (!$socket) {
die($errno . $errstr);
}
fwrite($socket, $data);
fwrite($socket, "\r\n\r\n");
$response = fread($socket, 8192);
$response = json_decode($response, true);
// 返回的文件地址
$file_path = '';
if ($response['code'] == 1) {
$file_path = $response['data']['path'];
}
除了新增文件,还要删除文件,这里自己写了个封装类:
<?php
namespace common\components;
class FileShare
{
private $link;
private $app_uuid;
private $app_password;
public function __construct($ip ='', $port = '', $app_uuid = '', $app_password = '')
{
$this->link = 'tcp://' .$ip .':' .$port;
$this->app_uuid = $app_uuid;
$this->app_password = $app_password;
}
public function add($file_name = '', $file_path = '')
{
if (empty($file_name)) {
throw new \Exception('file_name is empty');
}
if (!file_exists($file_path)) {
throw new \Exception('file is not found');
}
$data = $this->app_uuid .'<<<<<<>>>>>>'
. $this->app_password .'<<<<<<>>>>>>'
. 'add' .'<<<<<<>>>>>>'
. $file_name .'<<<<<<>>>>>>';
$data .= file_get_contents($file_path) ."\r\n\r\n";
$socket = stream_socket_client($this->link, $errno, $errstr);
if (!$socket) {
throw new \Exception($errno . $errstr);
}
fwrite($socket, $data);
$response = fread($socket, 8192);
$response = json_decode($response, true);
fclose($socket);
if ($response['code'] != 1) {
throw new \Exception($response['msg']);
}
return $response['data']['path'];
}
public function delete($path = '')
{
if (empty($file_name)) {
throw new \Exception('path is empty');
}
$data = $this->app_uuid .'<<<<<<>>>>>>'
. $this->app_password .'<<<<<<>>>>>>'
. 'delete' .'<<<<<<>>>>>>'
. $path
. "\r\n\r\n";
$socket = stream_socket_client($this->link, $errno, $errstr);
if (!$socket) {
throw new \Exception($errno . $errstr);
}
fwrite($socket, $data);
$response = fread($socket, 8192);
$response = json_decode($response, true);
fclose($socket);
if ($response['code'] != 1) {
throw new \Exception($response['msg']);
}
return true;
}
}
图片压缩
图片太大时,会用到压缩,这里提供一个图片压缩函数:
<?php
/**
* @param $image_src 源文件地址
* @param $image_dist 目的文件地址
* @param int $dist_width 目的宽度
* @param int $dist_height 目的高度
* @param int $quality 压缩质量
* @param string $filename 文件名称,用于确定目的文件后缀
* @return bool
* @throws UploadException
*/
function compress($image_src, $image_dist, $dist_width = 320, $dist_height = 240, $quality = 70, $filename = "")
{
$imagecreate_list = [
1 => function ($path) {
return imagecreatefromgif($path);
},
2 => function ($path) {
return imagecreatefromjpeg($path);
},
3 => function ($path) {
return imagecreatefrompng($path);
},
// 4 => function($path) {return imagecreatefromswf($path);},
// 5 => function($path) {return imagecreatefrompsd($path);},
6 => function ($path) {
return imagecreatefrombmp($path);
},
// 7 => function($path) {return imagecreatefromtiff($path);},
// 8 => function($path) {return imagecreatefromtiff($path);},
9 => function ($path) {
return imagecreatefromjpeg($path);
},
10 => function ($path) {
return imagecreatefromjpeg($path);
},
11 => function ($path) {
return imagecreatefromjpeg($path);
},
12 => function ($path) {
return imagecreatefromjpeg($path);
},
// 13 => function($path) {return imagecreatefromswc($path);},
// 14 => function($path) {return imagecreatefromiff($path);},
15 => function ($path) {
return imagecreatefromwbmp($path);
},
16 => function ($path) {
return imagecreatefromxbm($path);
},
// 17 => function($path) {return imagecreatefromico($path);},
18 => function ($path) {
return imagecreatefromwebp($path);
},
];
// 后缀名
$file_extension = null;
if (!empty($filename) && strrpos($filename, '.') !== false) {
$file_extension = substr($filename, strrpos($filename, '.') + 1);
}
try {
list($src_width, $src_height, $stype) = getimagesize($image_src);
if ($dist_width && ($src_width < $src_height)) {
$dist_width = ($dist_height / $src_height) * $src_width;
} else {
$dist_height = ($dist_width / $src_width) * $src_height;
}
$image_resource = imagecreatetruecolor($dist_width, $dist_height);
$src_image = $imagecreate_list[$stype]($image_src);
imagecopyresampled($image_resource, $src_image, 0, 0, 0, 0, $dist_width, $dist_height, $src_width, $src_height);
if (strtolower($file_extension) == 'jpg' || strtolower($file_extension ) == 'jpeg') {
imagejpeg($image_resource, $image_dist, $quality);
} else {
imagepng($image_resource, $image_dist, intval(($quality - 10) / 10));
}
imagedestroy($image_resource);
imagedestroy($src_image);
return true;
} catch (\Exception $e) {
throw new UploadException($e->getMessage());
return false;
}
}
WebSocket客户端实现
PHP通过TCP连接向WebSocket服务器发送数据。
common/widgets/WSClient.php:
namespace common\widgets;
/**
* Websocket client base class
*/
class WSClient
{
const hashstring = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
public $host;
public $port;
public $uri;
public $secure;
public $origin = 'null';
public $errno = 0;
public $errstr = "";
private $async = false;
private $socket = false;
private $curframe;
private $recvframes;
function __construct($host, $uri = '/', $secure = false, $port = false)
{
$this->host = $host;
$this->uri = $uri;
if ($port === false) {
$this->port = $secure ? 443 : 80;
} else {
$this->port = $port;
}
$this->secure = $secure;
$this->recvframes = array();
}
function __destruct()
{
$this->disconnect();
}
/*
* Connect to websocket server, switch protocols
* name: WSClient::connect
* @param boolean $async
* @return boolean
*
*/
function connect($async = false)
{
$this->async = $async;
$remote = ($this->secure ? 'tls' : 'tcp') . '://' . $this->host . ':' . $this->port;
$this->socket = @stream_socket_client($remote, $this->errno, $this->errstr);
if ($this->socket === false) return false;
/*
Create and send HTTP request
*/
$seckey = base64_encode($this->makeKey());
$request = "GET $this->uri HTTP/1.1\r\n".
"Connection: Upgrade\r\n".
"Upgrade: websocket\r\n".
"Pragma: no-cache\r\n".
"Cache-Control: no-cache\r\n".
"Host: $this->host\r\n".
"Origin: $this->origin\r\n".
"Sec-WebSocket-Version: 13\r\n".
"Sec-WebSocket-Key: $seckey\r\n".
"\r\n";
if (!$this->fwrite_stream($request)) {
$this->errstr = "Error sending request";
$this->disconnect();
return false;
}
/*
Read and check HTTP response
*/
$response = array();
do {
$line = fgets($this->socket, 1024);
if ($line === false) {
$this->errstr = "Error reading response";
$this->disconnect();
return false;
}
$response[] = trim($line);
} while (strlen(trim($line)) > 0);
// var_dump($response);
$parsed = $this->parseResponse($response);
// var_dump($parsed);
if ($parsed['responseCode'] != 101) {
$this->errstr = "Invalid response code";
$this->disconnect();
return false;
}
if (!isset($parsed['headers']['Upgrade']) || strcasecmp($parsed['headers']['Upgrade'], 'websocket') ||
!isset($parsed['headers']['Connection']) || strcasecmp($parsed['headers']['Connection'], 'Upgrade')) {
$this->errstr = "Invalid response headers";
$this->disconnect();
return false;
}
// echo base64_decode($parsed['headers']['Sec-WebSocket-Accept'])."\n";
// echo sha1($seckey.self::hashstring, true)."\n";
if (!isset($parsed['headers']['Sec-WebSocket-Accept']) || strcmp(base64_decode($parsed['headers']['Sec-WebSocket-Accept']), sha1($seckey.self::hashstring, true))) {
$this->errstr = "Invalid security key";
$this->disconnect();
return false;
}
$this->curframe = new WSFrame();
return true;
}
/*
* Disconnect from server
* name: WSClient::disconnect
*
*/
function disconnect()
{
if ($this->socket) @fclose($this->socket);
$this->socket = false;
}
/*
* Get information about connection
* name: WSClient::getMetadata
* @return array
*
*/
function getMetadata()
{
if ($this->socket) {
return stream_get_meta_data($this->socket);
} else {
return array();
}
}
/*
* Make 16-byte random string
* @return string
*/
function makeKey()
{
$key = "";
mt_srand();
for ($i = 0; $i < 16; $i++) $key .= chr(mt_rand(1, 255));
return $key;
}
/*
* Send string to stream
* @param string $string
* @return mixed
*/
function fwrite_stream($string)
{
$count = 0;
for ($written = 0; $written < strlen($string); $written += $fwrite)
{
$fwrite = @fwrite($this->socket, substr($string, $written));
if ($fwrite === false) return false;
if ($fwrite === 0) {
$count ++;
if ($count >= 10) {
return false;
} else {
continue;
}
} else {
$count = 0;
}
}
return $written;
}
/*
* Parse HTTP response
* name: WSClient::parseResponse
* @param array $response
* @return array
*
*/
function parseResponse($response)
{
$res = array();
if (!preg_match('/^HTTP\/([0-9\.]+)\s+([0-9]+)\s+(.*)/', $response[0], $parts)) return false;
$res['httpVersion'] = $parts[1];
$res['responseCode'] = $parts[2];
$res['responseText'] = $parts[3];
$res['headers'] = array();
foreach ($response as $num=>$line)
{
if (!$num) continue;
$parts = preg_split('/:\s+/', $line, 2);
if (count($parts) != 2) continue;
if (array_key_exists($parts[0], $res['headers'])) {
if (!is_array($res['headers'][$parts[0]])) {
$res['headers'][$parts[0]] = array($res['headers'][$parts[0]]);
}
$res['headers'][$parts[0]][] = $parts[1];
} else {
$res['headers'][$parts[0]] = $parts[1];
}
}
return $res;
}
/*
* Prepare and send frame
* name: WSClient::send
* @param int $opcode
* @param string $payload
* @param int $masked
* @param array $masking_key
* @return boolean
*
*/
function send($opcode, $payload, $masked = 1, $masking_key = array())
{
$frame = new WSFrame();
$frame->set($opcode, $payload, $masked, $masking_key);
return $this->sendFrame($frame);
}
/*
* Send prepared frame
* name: WSClient::sendFrame
* @param WSFrame $frame
* @return boolean
*
*/
function sendFrame($frame)
{
if ($this->fwrite_stream($frame->encode()) === false) {
$this->errstr = "Write error";
$this->disconnect();
return false;
}
return true;
}
/*
* Read frame from websocket
* name: WSClient::read
* @return mixed
*
*/
function read()
{
if ($this->async) {
$read = array($this->socket);
$write = null;
$except = null;
$num = @stream_select($read, $write, $accept, 0, 200000);
if ($num === false) {
$this->errstr = "Select error";
$this->disconnect();
return false;
}
if ($num) {
$recvbuf = @fread($this->socket, 8192);
while (strlen($recvbuf) > 0) {
$decoded = $this->curframe->decode($recvbuf);
if ($this->curframe->framestate == FRAME_STATE_ERROR) {
$this->errstr = "Frame decoding error " . $this->curframe->errcode;
$this->disconnect();
return false;
} elseif ($this->curframe->framestate == FRAME_STATE_COMPLETED) {
if ($this->processFrame($this->curframe)) {
$recvbuf = substr($recvbuf, $decoded);
$this->curframe = new WSFrame();
} else {
$this->disconnect();
return false;
}
}
}
}
} else {
for (; ; ) {
$recvbuf = @fread($this->socket, 1);
$decoded = $this->curframe->decode($recvbuf);
if ($this->curframe->framestate == FRAME_STATE_ERROR) {
$this->errstr = "Frame decoding error " . $this->curframe->errcode;
$this->disconnect();
return false;
} elseif ($this->curframe->framestate == FRAME_STATE_COMPLETED) {
if ($this->processFrame($this->curframe)) {
$this->curframe = new WSFrame();
break;
} else {
$this->disconnect();
return false;
}
}
}
}
return count($this->recvframes);
}
/*
* Process frame
* name: WSClient::processFrame
* @param WSFrame $frame
* @return boolean;
*
*/
private function processFrame($frame)
{
switch ($frame->opcode)
{
case WS_FRAME_CLOSE:
$this->errstr = "Disconnect requested by server";
if ($frame->payload) {
$this->errno = unpack("n", substr($frame->payload, 0, 2));
$this->errstr .= " (" . $this->errno . ") " . substr($frame->payload, 2);
}
return false;
break;
case WS_FRAME_PING:
$pong = new WSFrame();
$pong->set(WS_FRAME_PONG, $frame->payload, 1);
if (!$this->sendFrame($pong)) {
$this->errstr = "Error sending pong";
return false;
}
break;
case WS_FRAME_PONG:
break;
default:
array_push($this->recvframes, $frame);
}
return true;
}
/*
* Get frame from receive buffer
* name: WSClient::getFrame
* @return WSFrame
*
*/
public function getFrame()
{
return array_shift($this->recvframes);
}
}
common/widgets/WSFrame.php:
namespace common\widgets;
/* Opcodes */
define("WS_FRAME_INVALID", -1);
define("WS_FRAME_CONT", 0x0);
define("WS_FRAME_TEXT", 0x1);
define("WS_FRAME_BINARY", 0x2);
define("WS_FRAME_03", 0x3);
define("WS_FRAME_04", 0x4);
define("WS_FRAME_05", 0x5);
define("WS_FRAME_06", 0x6);
define("WS_FRAME_07", 0x7);
define("WS_FRAME_CLOSE", 0x8);
define("WS_FRAME_PING", 0x9);
define("WS_FRAME_PONG", 0xA);
define("WS_FRAME_0B", 0xB);
define("WS_FRAME_0C", 0xC);
define("WS_FRAME_0D", 0xD);
define("WS_FRAME_0E", 0xE);
define("WS_FRAME_0F", 0xF);
/* Frame decoding states */
define("FRAME_STATE_ERROR", -1);
define("FRAME_STATE_BEGIN", 0);
define("FRAME_STATE_LEN", 1);
define("FRAME_STATE_KEY", 2);
define("FRAME_STATE_PAYLOAD", 3);
define("FRAME_STATE_COMPLETED", 4);
/* Frame error codes */
define("FRAME_ERROR_NONE", 0);
define("FRAME_ERROR_INTERNAL", 1);
define("FRAME_ERROR_TOO_LARGE", 2);
/**
* Websocket Frame
*/
class WSFrame
{
public $fin;
public $rsv;
public $opcode;
public $masked;
public $length;
public $masking_key;
public $payload;
public $errcode;
public $framestate;
private $framepos;
private $lenlen;
private $lenbuf;
function __construct()
{
$this->clear();
}
/*
* Clear object
* name: WSFrame::clear
*
*/
function clear()
{
$this->fin = 0;
$this->rsv = 0;
$this->opcode = WS_FRAME_INVALID;
$this->masked = 0;
$this->length = 0;
$this->masking_key = array();
$this->payload = "";
$this->errcode = FRAME_ERROR_NONE;
$this->framepos = 0;
$this->framestate = FRAME_STATE_BEGIN;
$this->lenlen = 0;
$this->lenbuf = array();
}
/*
* Create simple frame
* name: WSFrame::set
* @param int $opcode
* @param string $payload
* @param int $masked
* @param array $masking_key
*
*/
function set($opcode, $payload, $masked = 0, $masking_key = array())
{
$this->fin = 1;
$this->payload = $payload;
$this->length = strlen($payload);
$this->opcode = $opcode;
$this->masked = $masked;
if ($masked) {
if (count($masking_key) == 4) {
$this->masking_key = $masking_key;
} else {
$this->genkey();
}
}
$this->framestate = FRAME_STATE_COMPLETED;
}
/*
* Generate random masking key
* name: WSFrame::genkey
*
*/
function genkey()
{
mt_srand();
$this->masking_key = array(
mt_rand(0, 255), mt_rand(0, 255), mt_rand(0, 255), mt_rand(0, 255)
);
}
/*
* Decode portion of data
* name: WSFrame::decode
* @param string $data
* @return int
*
*/
function decode($data)
{
$datalen = strlen($data);
$datapos = 0;
while ($datapos < $datalen)
{
if (($this->framestate == FRAME_STATE_ERROR) || ($this->framestate == FRAME_STATE_COMPLETED)) return $datapos;
$byte = ord($data[$datapos]);
switch ($this->framestate)
{
case FRAME_STATE_BEGIN:
if ($this->framepos == 0) {
// Process first byte of frame
$this->fin = ($byte >> 7) & 0x01;
$this->rsv = ($byte >> 4) & 0x07;
$this->opcode = $byte & 0x0F;
} elseif ($this->framepos == 1) {
// Process second byte of frame
$this->masked = ($byte >> 7) & 0x01;
$pl = $byte & 0x7F;
switch($pl) {
case 126:
$this->lenlen = 2;
$this->framestate = FRAME_STATE_LEN;
break;
case 127:
$this->lenlen = 8;
$this->framestate = FRAME_STATE_LEN;
break;
default:
$this->lenlen = 0;
$this->length = $pl;
$this->framestate = $pl ? ($this->masked ? FRAME_STATE_KEY : FRAME_STATE_PAYLOAD) : FRAME_STATE_COMPLETED;
}
} else {
$this->framestate = FRAME_STATE_ERROR;
$this->errcode = FRAME_ERROR_INTERNAL;
}
$this->framepos++;
break;
case FRAME_STATE_LEN:
if (($this->framepos >=2) && ($this->framepos < (2 + $this->lenlen))) {
$this->lenbuf[] = $byte;
if (count($this->lenbuf) == $this->lenlen) {
for ($i = $this->lenlen - 1; $i >= 0; $i--) {
//TODO: Add overflow handling for <64 bit OS
$this->length |= $this->lenbuf[$i] << (8 * ($this->lenlen - $i - 1));
}
$this->framestate = $this->masked ? FRAME_STATE_KEY : FRAME_STATE_PAYLOAD;
}
} else {
$this->framestate = FRAME_STATE_ERROR;
$this->errcode = FRAME_ERROR_INTERNAL;
}
$this->framepos++;
break;
case FRAME_STATE_KEY:
if (($this->framepos >= (2 + $this->lenlen)) && ($this->framepos < (6 + $this->lenlen))) {
$this->masking_key[] = $byte;
if (count($this->masking_key) == 4) {
$this->framestate = $this->length ? FRAME_STATE_PAYLOAD : FRAME_STATE_COMPLETED; // empty masked frame?
}
} else {
$this->framestate = FRAME_STATE_ERROR;
$this->errcode = FRAME_ERROR_INTERNAL;
}
break;
case FRAME_STATE_PAYLOAD:
$pl = strlen($this->payload);
if ($pl < $this->length) {
if ($this->masked) {
$this->payload .= chr($byte ^ $this->masking_key[$pl % 4]);
} else {
$this->payload .= chr($byte);
}
if (++$pl == $this->length) $this->framestate = FRAME_STATE_COMPLETED;
}
$this->framepos++;
break;
case FRAME_STATE_COMPLETED:
return $datapos;
default:
}
$datapos++;
}
return $datapos;
}
/*
* Is frame completed?
* name: WSFrame::completed
* @return boolean
*
*/
function completed()
{
return ($this->framestate == FRAME_STATE_COMPLETED);
}
/*
* Encode frame
* name: WSFrame::encode
* @return string
*
*/
function encode()
{
$retval = chr((($this->fin & 0x1) << 7) | (($this->rsv & 0x7) << 4) | ($this->opcode & 0xF));
$pl = strlen($this->payload);
if ($pl < 126) {
$retval .= chr(($pl & 0x7F) | (($this->masked & 0x1) << 7));
} elseif ($pl <= 0xFFFF) {
$retval .= chr(126 | (($this->masked & 0x1) << 7));
$retval .= pack('n', $pl);
} else {
$retval .= chr(127 | (($this->masked & 0x1) << 7));
for ($i = 7; $i >=0; $i--) {
$retval .= chr(($pl >> (8 * $i)) & 0xFF);
}
}
if ($this->masked) {
$retval .= chr($this->masking_key[0]) . chr($this->masking_key[1]) .
chr($this->masking_key[2]) . chr($this->masking_key[3]);
for ($i = 0; $i < $pl; $i++) {
$retval .= chr(ord($this->payload[$i]) ^ $this->masking_key[$i % 4]);
}
} else {
$retval .= $this->payload;
}
return $retval;
}
}
使用示例:
require __DIR__.'/common/widgets/WSClient.php';
require __DIR__.'/common/widgets/WSFrame.php';
$socket = new common\widgets\WSClient(
'127.0.0.1',
'/',
false,
9501
);
if (!$socket->connect(true)) {
die('connect failed');
}
$msg = "abc123";
if (!$socket->send(WS_FRAME_TEXT, $msg, 1)) {
echo $socket->errstr . "\n";
die;
}
里面大量的位运算可以看下 https://ibaiyang.github.io/blog/php/2019/01/10/PHP-位运算相关.html
问题说明
socket_read
在看一篇文章说到:
socket_read()
:从连接资源中读取指定字节数的数据,读取成功时,返回字符串,失败时,返回FALSE。没有数据时,返回空字符串。
所以可以用:
$data = '';
// 循环读取指定长度的服务器响应数据
while($response = socket_read($sock, 4))
{
$data .= $response;
}
在服务端尝试下来才发现,这样是可以用,但是是在客户端使用了 socket_close()
断开连接的情况下。
只适合于单向通信,如果服务端要把处理结果发送给客户端(因为连接已断开),就使用不了了。
不过客户端倒是可以使用这个函数,当服务端使用了 socket_close()
断开连接时,客户端获取到所有返回的信息,
然后进行处理。
参考资料
https://php.p2hp.com/manual/zh/function.socket-read.php
PHP 下的 Socket 编程 https://zhuanlan.zhihu.com/p/374036250 ,下面有几个文章集合,可以看看