Тема Websocket`ов уже не раз затрагивалась на Хабре, в частности рассматривались варианты реализации на PHP. Однако, с момента выхода последней статьи с обзором разных технологий прошло уже более года, а миру PHP есть чем похвастаться за прошедшее время.
В данной статье я хочу представить русскоязычному сообществу Swoole — Асинхронный Open Source фреймворк для PHP, написанный на Си, и поставляемый в виде pecl-расширения.
Посмотреть получившееся в итоге приложение(чат) можно: здесь.
Исходники на github.
Linux users
#!/bin/bash
pecl install swoole
Mac users
# get a list of avaiable packages
brew install swoole
#!/bin/bash
brew install homebrew/php/php71-swoole
<?php
$server = new swoole_websocket_server("127.0.0.1", 9502);
$server->on('open', function($server, $req) {
echo "connection open: {$req->fd}\n";
});
$server->on('message', function($server, $frame) {
echo "received message: {$frame->data}\n";
$server->push($frame->fd, json_encode(["hello", "world"]));
});
$server->on('close', function($server, $fd) {
echo "connection close: {$fd}\n";
});
$server->start();
$server->connections;
Swoole\WebSocket\Frame Object
(
[fd] => 20
[data] => {"type":"login","username":"new user"}
[opcode] => 1
[finish] => 1
)
Server::push($fd, $data, $opcode=null, $finish=null)
$users_table = new swoole_table(131072);
$users_table->column('id', swoole_table::TYPE_INT, 5);
$users_table->column('username', swoole_table::TYPE_STRING, 64);
$users_table->create();
$count = count($messages_table);
$dateTime = time();
$row = ['username' => $username, 'message' => $data->message, 'date_time' => $dateTime];
$messages_table->set($count, $row);
/**
* @return Message[]
*/
public function getAll()
{
$stmt = $this->pdo->query('SELECT * from messages');
$messages = [];
foreach ($stmt->fetchAll() as $row) {
$messages[] = new Message( $row['username'], $row['message'], new \DateTime($row['date_time']) );
}
return $messages;
}
public function __construct()
{
$this->ws = new swoole_websocket_server('0.0.0.0', 9502);
$this->ws->on('open', function ($ws, $request) {
$this->onConnection($request);
});
$this->ws->on('message', function ($ws, $frame) {
$this->onMessage($frame);
});
$this->ws->on('close', function ($ws, $id) {
$this->onClose($id);
});
$this->ws->on('workerStart', function (swoole_websocket_server $ws) {
$this->onWorkerStart($ws);
});
$this->ws->start();
}
private function onWorkerStart(swoole_websocket_server $ws)
{
$this->messagesRepository = new MessagesRepository();
$ws->tick(self::PING_DELAY_MS, function () use ($ws) {
foreach ($ws->connections as $id) {
$ws->push($id, 'ping', WEBSOCKET_OPCODE_PING);
}
});
}
/**
* Init new Connection, and ping DB timer function
*/
private static function initPdo()
{
if (self::$timerId === null || (!Timer::exists(self::$timerId))) {
self::$timerId = Timer::tick(self::MySQL_PING_INTERVAL, function () {
self::ping();
});
}
self::$pdo = new PDO(self::DSN, DBConfig::USER, DBConfig::PASSWORD, self::OPT);
}
/**
* Ping database to maintain the connection
*/
private static function ping()
{
try {
self::$pdo->query('SELECT 1');
} catch (PDOException $e) {
self::initPdo();
}
}
/**
* @param string $username
* @return bool
*/
private function isUsernameCurrentlyTaken(string $username) {
foreach ($this->usersRepository->getByIds($this->ws->connection_list()) as $user) {
if ($user->getUsername() == $username) {
return true;
}
}
return false;
}
<?php
namespace App\Helpers;
use Swoole\Channel;
class RequestLimiter
{
/**
* @var Channel
*/
private $userIds;
const MAX_RECORDS_COUNT = 10;
const MAX_REQUESTS_BY_USER = 4;
public function __construct() {
$this->userIds = new Channel(1024 * 64);
}
/**
* Check if there are too many requests from user
* and make a record of request from that user
*
* @param int $userId
* @return bool
*/
public function checkIsRequestAllowed(int $userId) {
$requestsCount = $this->getRequestsCountByUser($userId);
$this->addRecord($userId);
if ($requestsCount >= self::MAX_REQUESTS_BY_USER) return false;
return true;
}
/**
* @param int $userId
* @return int
*/
private function getRequestsCountByUser(int $userId) {
$channelRecordsCount = $this->userIds->stats()['queue_num'];
$requestsCount = 0;
for ($i = 0; $i < $channelRecordsCount; $i++) {
$userIdFromChannel = $this->userIds->pop();
$this->userIds->push($userIdFromChannel);
if ($userIdFromChannel === $userId) {
$requestsCount++;
}
}
return $requestsCount;
}
/**
* @param int $userId
*/
private function addRecord(int $userId) {
$recordsCount = $this->userIds->stats()['queue_num'];
if ($recordsCount >= self::MAX_RECORDS_COUNT) {
$this->userIds->pop();
}
$this->userIds->push($userId);
}
}
<?php
namespace App\Helpers;
class SpamFilter
{
/**
* @var string[] errors
*/
private $errors = [];
/**
* @param string $text
* @return bool
*/
public function checkIsMessageTextCorrect(string $text) {
$isCorrect = true;
if (empty(trim($text))) {
$this->errors[] = 'Empty message text';
$isCorrect = false;
}
return $isCorrect;
}
/**
* @return string[] errors
*/
public function getErrors(): array {
return $this->errors;
}
}
К сожалению, не доступен сервер mySQL