Improve organization of server processing logic and concurrent connections

This commit is contained in:
Jansen Price 2020-09-04 00:27:04 -05:00
parent fcac2b20ab
commit 457332952a
3 changed files with 108 additions and 36 deletions

View file

@ -16,8 +16,8 @@ class Request
public function __construct($request_input)
{
$this->url = $request_input;
$data = parse_url($request_input);
$this->url = trim($request_input);
$data = parse_url($this->url);
foreach ($data as $key => $value) {
$this->{$key} = urldecode($value);

View file

@ -41,7 +41,11 @@ class Response
public function send($client)
{
fwrite($client, $this->getHeader());
$result = fwrite($client, $this->getHeader());
if (!$result) {
return false;
}
if ($this->filepath) {
$size = filesize($this->filepath);
@ -56,6 +60,7 @@ class Response
// So, result will be 0 if the client cancels (broken socket)
$result = fwrite($client, fread($fp, 8192));
}
fclose($fp);
return $size;
} else {
$body = $this->getBody();

View file

@ -18,6 +18,10 @@ class Server
private $ssl_context;
private $logger;
private $connections = []; // Incoming client connections
private $peers = []; // Client connections to read from
private $metas = []; // Meta data for each client connection
private $real_root_dir;
public function __construct(Config $config = null, Cert $cert = null, Logger $logger = null)
{
@ -69,9 +73,8 @@ class Server
throw new \Exception("Error: Root directory '$root_dir' not a directory");
}
$path = realpath($root_dir);
$this->logger->debug("Root directory '$path'");
$this->real_root_dir = realpath($root_dir);
$this->logger->debug(sprintf("Root directory '%s'", $this->real_root_dir));
$server = stream_socket_server(
$this->getListenAddress(),
@ -83,45 +86,94 @@ class Server
if (!$server) {
throw new \Exception("Error " . $errno . ": " . $errstr);
}
stream_set_blocking($server, true);
$name = stream_socket_get_name($server, false);
$this->logger->info(sprintf("Listening on %s://%s...", self::SCHEME, $name));
$this->connections = [];
$this->peers = [];
$this->metas = [];
$_write = null;
$_exception = null;
while (true) {
# onWarning is added here to swallow up the `timeout` warning
set_error_handler([$this, 'onWarning']);
$client = stream_socket_accept($server, $this->timeout, $client_name);
//stream_socket_enable_crypto($server, true, STREAM_CRYPTO_METHOD_TLSv1_2_SERVER);
restore_error_handler();
// Handle incoming new connections
$this->intakeConnections($server);
if ($client) {
$time = ['start' => microtime(true)];
$meta = stream_get_meta_data($client);
if (count($this->connections) == 0) {
continue;
}
$this->peers = array_values($this->connections);
$this->logger->debug("$client_name Accepted", $meta);
$request_buffer = stream_get_line($client, 1026, "\r\n");
//print($this->hexView($request_buffer));
//print("Length: " . mb_strlen($request_buffer) . "\n");
$this->logger->info("REQ: $request_buffer", ["client" => $client_name]);
$request = new Request($request_buffer);
// Respond to client
$response = $this->handleResponse($request, $path);
$size = $response->send($client);
$time['end'] = microtime(true);
$this->logger->debug(
"RSP: " . trim($response->getHeader()),
['size' => $size, 'time' => $time['end'] - $time['start']]
);
fclose($client);
$this->logger->debug("$client_name Closed");
// Handle reading from and responding to connections
if (stream_select($this->peers, $_write, $_exception, 5)) {
foreach ($this->peers as $client) {
$this->handlePeerConnection($client);
}
}
}
}
public function handleResponse($request, $dir)
public function intakeConnections($server)
{
set_error_handler([$this, 'onWarning']);
# onWarning is added here to swallow up the `timeout` warning
$client = stream_socket_accept($server, $this->timeout, $peer);
restore_error_handler();
if (!$client) {
$this->pruneExpiredConnections();
return;
}
$time = ['start' => microtime(true)];
$meta = stream_get_meta_data($client);
$this->logger->debug("$peer Accepted", $meta);
$this->connections[$peer] = $client;
$this->metas[$peer] = ["input" => "", "time" => $time];
}
public function handlePeerConnection($client)
{
$peer = stream_socket_get_name($client, true);
if (!$peer) {
// If it went away use the array key as the name to close it
$peer = array_search($client, $this->connections);
}
$this->metas[$peer]['input'] .= fread($client, 1026);
if (mb_strlen($this->metas[$peer]['input']) >= 1024
|| strpos($this->metas[$peer]['input'], "\r\n") !== false
) {
$request_buffer = $this->metas[$peer]['input'];
//print($this->hexView($request_buffer));
//print("Length: " . mb_strlen($request_buffer) . "\n");
$this->logger->info("REQ: $request_buffer", ['client' => $peer]);
$request = new Request($request_buffer);
// Respond to client
$response = $this->handleResponse($request);
$size = $response->send($client);
if ($size !== false) {
$time_end = microtime(true);
$this->logger->debug(
"RSP: " . trim($response->getHeader()),
['size' => $size, 'time' => $time_end - $this->metas[$peer]['time']['start']]
);
}
fclose($client);
$this->logger->debug("$peer Closed");
unset($this->connections[$peer]);
unset($this->metas[$peer]);
}
}
public function handleResponse($request)
{
list($is_valid, $response) = $this->validateRequest($request);
@ -129,12 +181,12 @@ class Server
return $response;
}
$resource_path = rtrim($dir, "/") . $request->path;
$resource_path = rtrim($this->real_root_dir, "/") . $request->path;
// Check if within the server root
// Realpath will translate any '..' in the path
$realpath = realpath($resource_path);
if ($realpath && strpos($realpath, $dir) !== 0) {
if ($realpath && strpos($realpath, $this->real_root_dir) !== 0) {
$response->setStatus(Response::STATUS_PERMANENT_FAILURE);
$response->setMeta("Invalid location");
return $response;
@ -193,6 +245,21 @@ class Server
return $response;
}
public function pruneExpiredConnections()
{
$now = microtime(true);
foreach ($this->metas as $peer => $meta) {
$delta = $now - $meta['time']['start'];
if ($delta > $this->timeout) {
$this->logger->debug(sprintf("Pruning expired connection %s (%.02f seconds over)", $peer, $delta));
if (isset($this->connections[$peer])) {
unset($this->connections[$peer]);
}
unset($this->metas[$peer]);
}
}
}
public function validateRequest($request)
{
$response = new Response();