use messenger to push influxdb stats
This commit is contained in:
parent
4a84b2db98
commit
bfe3599054
2
.env
2
.env
|
@ -35,6 +35,8 @@ MAILER_SENDER=example@localhost
|
||||||
ASSET_BASE_URL=null
|
ASSET_BASE_URL=null
|
||||||
UMAMI_URL=null
|
UMAMI_URL=null
|
||||||
|
|
||||||
|
MESSENGER_TRANSPORT_DSN=doctrine://default
|
||||||
|
|
||||||
INFLUXDB_URL=
|
INFLUXDB_URL=
|
||||||
INFLUXDB_TOKEN=
|
INFLUXDB_TOKEN=
|
||||||
INFLUXDB_BUCKET=
|
INFLUXDB_BUCKET=
|
||||||
|
|
154
bin/messenger
Executable file
154
bin/messenger
Executable file
|
@ -0,0 +1,154 @@
|
||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
set -eu
|
||||||
|
|
||||||
|
usage() {
|
||||||
|
printf "Usage: %s [-l DEBUG_LEVEL] [-h] start|stop|restart\n" "$0"
|
||||||
|
}
|
||||||
|
|
||||||
|
help() {
|
||||||
|
cat << EOH
|
||||||
|
SYNOPSIS
|
||||||
|
$0 [-l DEBUG_LEVEL] [-h] -a start|stop|restart
|
||||||
|
|
||||||
|
DESCRIPTION
|
||||||
|
|
||||||
|
$0 manages symfony messenger
|
||||||
|
|
||||||
|
OPTIONS
|
||||||
|
|
||||||
|
-h Show this help
|
||||||
|
|
||||||
|
-l debug|info|notice|warning|error
|
||||||
|
Debug level
|
||||||
|
|
||||||
|
-a start|stop|restart
|
||||||
|
EOH
|
||||||
|
}
|
||||||
|
|
||||||
|
on_interrupt() {
|
||||||
|
log -l notice ""
|
||||||
|
log -l notice "Process aborted!"
|
||||||
|
|
||||||
|
exit 130
|
||||||
|
}
|
||||||
|
|
||||||
|
start_messenger() {
|
||||||
|
started=0
|
||||||
|
pid=
|
||||||
|
|
||||||
|
if [ -f "$pid_file" ]; then
|
||||||
|
pid="$(cat "$pid_file")"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ -n "$pid" ]; then
|
||||||
|
if [ -d "/proc/$pid" ]; then
|
||||||
|
log -t -l warning "Already running"
|
||||||
|
started=1
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ "$started" -eq 0 ]; then
|
||||||
|
nohup php bin/console messenger:consume 2>/dev/null >/dev/null &
|
||||||
|
echo -n $! > "$pid_file"
|
||||||
|
log -t -l notice "Started"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
stop_messenger() {
|
||||||
|
if [ -f "$pid_file" ]; then
|
||||||
|
pid="$(cat "$pid_file")"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ -n "$pid" ]; then
|
||||||
|
if [ ! -d "/proc/$pid" ]; then
|
||||||
|
log -t -l warning "Not started"
|
||||||
|
else
|
||||||
|
kill -9 "$pid"
|
||||||
|
log -t -l notice "Stopped"
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm "$pid_file"
|
||||||
|
else
|
||||||
|
log -t -l warning "Not started"
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
main() {
|
||||||
|
cd "$(dirname "0")"
|
||||||
|
|
||||||
|
pid_file=var/messenger.pid
|
||||||
|
|
||||||
|
while getopts "l:ha:" option; do
|
||||||
|
case "${option}" in
|
||||||
|
h) help; exit 0;;
|
||||||
|
l) LOG_VERBOSE="$OPTARG";;
|
||||||
|
a) ACTION="$OPTARG";;
|
||||||
|
?) log -l error "$(usage)"; exit 1;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ "$ACTION" = "start" ]; then
|
||||||
|
start_messenger
|
||||||
|
elif [ "$ACTION" = "stop" ]; then
|
||||||
|
stop_messenger
|
||||||
|
elif [ "$ACTION" = "restart" ]; then
|
||||||
|
stop_messenger
|
||||||
|
start_messenger
|
||||||
|
fi
|
||||||
|
|
||||||
|
# log [-t] [-l debug|info|notice|warning|error] message
|
||||||
|
|
||||||
|
exit 0
|
||||||
|
}
|
||||||
|
|
||||||
|
log() {
|
||||||
|
LOG_VERBOSE="${LOG_VERBOSE:-info}"
|
||||||
|
LEVEL=info
|
||||||
|
TIME=
|
||||||
|
|
||||||
|
while getopts "tl:" option; do
|
||||||
|
case "${option}" in
|
||||||
|
l) LEVEL="$OPTARG"; shift $((OPTIND-1));;
|
||||||
|
t) TIME="$(printf "[%s] " "$(date +'%Y-%m-%dT%H:%M:%S.%s')")"; shift $((OPTIND-1));;
|
||||||
|
*) exit 1;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
if [ -t 2 ] && [ -z "${NO_COLOR-}" ]; then
|
||||||
|
case "${LEVEL}" in
|
||||||
|
debug) COLOR="$(tput setaf 3)";;
|
||||||
|
notice) COLOR="$(tput setaf 4)";;
|
||||||
|
warning) COLOR="$(tput setaf 5)";;
|
||||||
|
error) COLOR="$(tput setaf 1)";;
|
||||||
|
*) COLOR="$(tput sgr0)";;
|
||||||
|
esac
|
||||||
|
fi
|
||||||
|
|
||||||
|
case "${LEVEL}" in
|
||||||
|
debug) LEVEL=100;;
|
||||||
|
notice) LEVEL=250;;
|
||||||
|
warning) LEVEL=300;;
|
||||||
|
error) LEVEL=400;;
|
||||||
|
*) LEVEL=200;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
case "${LOG_VERBOSE}" in
|
||||||
|
debug) LOG_VERBOSE_VALUE=100;;
|
||||||
|
notice) LOG_VERBOSE_VALUE=250;;
|
||||||
|
warning) LOG_VERBOSE_VALUE=300;;
|
||||||
|
error) LOG_VERBOSE_VALUE=400;;
|
||||||
|
*) LOG_VERBOSE_VALUE=200;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
if [ $LEVEL -ge $LOG_VERBOSE_VALUE ]; then
|
||||||
|
printf "%s\n" "$*" | while IFS='' read -r LINE; do
|
||||||
|
printf "%s%s%s\n" "${COLOR:-}" "${TIME:-}" "$LINE" >&2
|
||||||
|
done
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
trap on_interrupt INT
|
||||||
|
|
||||||
|
main "$@"
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
"knplabs/knp-markdown-bundle": "^1.9",
|
"knplabs/knp-markdown-bundle": "^1.9",
|
||||||
"knplabs/knp-menu-bundle": "^3.1",
|
"knplabs/knp-menu-bundle": "^3.1",
|
||||||
"murph/murph-core": "dev-master",
|
"murph/murph-core": "dev-master",
|
||||||
|
"symfony/messenger": "5.4.*",
|
||||||
"twig/intl-extra": "^3.5"
|
"twig/intl-extra": "^3.5"
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
|
|
7
config/packages/messenger.yaml
Normal file
7
config/packages/messenger.yaml
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
framework:
|
||||||
|
messenger:
|
||||||
|
transports:
|
||||||
|
async: "%env(MESSENGER_TRANSPORT_DSN)%"
|
||||||
|
|
||||||
|
routing:
|
||||||
|
'App\Message\PageViewMessage': async
|
30
nohup.out
Normal file
30
nohup.out
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
|
||||||
|
[OK] Consuming messages from transport "async".
|
||||||
|
|
||||||
|
// The worker will automatically exit once it has received a stop signal via
|
||||||
|
// the messenger:stop-workers command.
|
||||||
|
|
||||||
|
// Quit the worker with CONTROL-C.
|
||||||
|
|
||||||
|
// Re-run the command with a -vv option to see logs about consumed messages.
|
||||||
|
|
||||||
|
|
||||||
|
[OK] Consuming messages from transport "async".
|
||||||
|
|
||||||
|
// The worker will automatically exit once it has received a stop signal via
|
||||||
|
// the messenger:stop-workers command.
|
||||||
|
|
||||||
|
// Quit the worker with CONTROL-C.
|
||||||
|
|
||||||
|
// Re-run the command with a -vv option to see logs about consumed messages.
|
||||||
|
|
||||||
|
|
||||||
|
[OK] Consuming messages from transport "async".
|
||||||
|
|
||||||
|
// The worker will automatically exit once it has received a stop signal via
|
||||||
|
// the messenger:stop-workers command.
|
||||||
|
|
||||||
|
// Quit the worker with CONTROL-C.
|
||||||
|
|
||||||
|
// Re-run the command with a -vv option to see logs about consumed messages.
|
||||||
|
|
|
@ -2,10 +2,9 @@
|
||||||
|
|
||||||
namespace App\EventListener;
|
namespace App\EventListener;
|
||||||
|
|
||||||
use App\Api\InfluxDB;
|
use App\Message\PageViewMessage;
|
||||||
use Symfony\Component\HttpKernel\Event\RequestEvent;
|
use Symfony\Component\HttpKernel\Event\RequestEvent;
|
||||||
use InfluxDB2\WriteType;
|
use Symfony\Component\Messenger\MessageBusInterface;
|
||||||
use InfluxDB2\Point;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* class StatListener.
|
* class StatListener.
|
||||||
|
@ -14,28 +13,12 @@ use InfluxDB2\Point;
|
||||||
*/
|
*/
|
||||||
class StatListener
|
class StatListener
|
||||||
{
|
{
|
||||||
public function __construct(protected InfluxDB $influxDB)
|
public function __construct(protected MessageBusInterface $bus)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
public function onKernelRequest(RequestEvent $event)
|
public function onKernelRequest(RequestEvent $event)
|
||||||
{
|
{
|
||||||
if (!$this->influxDB->isAvailable()) {
|
$this->bus->dispatch(new PageViewMessage(time()));
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$client = $this->influxDB->getClient();
|
|
||||||
|
|
||||||
$writeApi = $client->createWriteApi(['writeType' => WriteType::SYNCHRONOUS]);
|
|
||||||
$pageView = new Point('page_view');
|
|
||||||
$pageView
|
|
||||||
->addTag('request', 'view')
|
|
||||||
->addField('value', 1)
|
|
||||||
->time(time())
|
|
||||||
;
|
|
||||||
|
|
||||||
$writeApi->write($pageView);
|
|
||||||
$writeApi->close();
|
|
||||||
$client->close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
15
src/Message/PageViewMessage.php
Normal file
15
src/Message/PageViewMessage.php
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\Message;
|
||||||
|
|
||||||
|
final class PageViewMessage
|
||||||
|
{
|
||||||
|
public function __construct(public int $time)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getTime(): int
|
||||||
|
{
|
||||||
|
return $this->time;
|
||||||
|
}
|
||||||
|
}
|
37
src/MessageHandler/PageViewMessageHandler.php
Normal file
37
src/MessageHandler/PageViewMessageHandler.php
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\MessageHandler;
|
||||||
|
|
||||||
|
use App\Message\PageViewMessage;
|
||||||
|
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
|
||||||
|
use App\Api\InfluxDB;
|
||||||
|
use InfluxDB2\WriteType;
|
||||||
|
use InfluxDB2\Point;
|
||||||
|
|
||||||
|
final class PageViewMessageHandler implements MessageHandlerInterface
|
||||||
|
{
|
||||||
|
public function __construct(protected InfluxDB $influxDB)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public function __invoke(PageViewMessage $message)
|
||||||
|
{
|
||||||
|
if (!$this->influxDB->isAvailable()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
$client = $this->influxDB->getClient();
|
||||||
|
|
||||||
|
$writeApi = $client->createWriteApi(['writeType' => WriteType::SYNCHRONOUS]);
|
||||||
|
$pageView = new Point('page_view');
|
||||||
|
$pageView
|
||||||
|
->addTag('request', 'view')
|
||||||
|
->addField('value', 1)
|
||||||
|
->time($message->getTime())
|
||||||
|
;
|
||||||
|
|
||||||
|
$writeApi->write($pageView);
|
||||||
|
$writeApi->close();
|
||||||
|
$client->close();
|
||||||
|
}
|
||||||
|
}
|
16
src/Middleware/PageViewMiddleware.php
Normal file
16
src/Middleware/PageViewMiddleware.php
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
<?php
|
||||||
|
|
||||||
|
namespace App\Middleware;
|
||||||
|
|
||||||
|
use Symfony\Component\Messenger\Envelope;
|
||||||
|
use Symfony\Component\Messenger\Middleware\MiddlewareInterface;
|
||||||
|
use Symfony\Component\Messenger\Middleware\StackInterface;
|
||||||
|
|
||||||
|
final class PageViewMiddleware implements MiddlewareInterface
|
||||||
|
{
|
||||||
|
public function handle(Envelope $envelope, StackInterface $stack): Envelope
|
||||||
|
{
|
||||||
|
// ...
|
||||||
|
return $stack->next()->handle($envelope, $stack);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue