From bfe3599054b655ca19c34a8966bc9b2a27e8ac18 Mon Sep 17 00:00:00 2001 From: Simon Vieille Date: Sat, 23 Sep 2023 00:03:48 +0200 Subject: [PATCH] use messenger to push influxdb stats --- .env | 2 + bin/messenger | 154 ++++++++++++++++++ composer.json | 1 + config/packages/messenger.yaml | 7 + nohup.out | 30 ++++ src/EventListener/StatListener.php | 25 +-- src/Message/PageViewMessage.php | 15 ++ src/MessageHandler/PageViewMessageHandler.php | 37 +++++ src/Middleware/PageViewMiddleware.php | 16 ++ 9 files changed, 266 insertions(+), 21 deletions(-) create mode 100755 bin/messenger create mode 100644 config/packages/messenger.yaml create mode 100644 nohup.out create mode 100644 src/Message/PageViewMessage.php create mode 100644 src/MessageHandler/PageViewMessageHandler.php create mode 100644 src/Middleware/PageViewMiddleware.php diff --git a/.env b/.env index eba2126..a0667ad 100644 --- a/.env +++ b/.env @@ -35,6 +35,8 @@ MAILER_SENDER=example@localhost ASSET_BASE_URL=null UMAMI_URL=null +MESSENGER_TRANSPORT_DSN=doctrine://default + INFLUXDB_URL= INFLUXDB_TOKEN= INFLUXDB_BUCKET= diff --git a/bin/messenger b/bin/messenger new file mode 100755 index 0000000..219e289 --- /dev/null +++ b/bin/messenger @@ -0,0 +1,154 @@ +#!/bin/sh + +set -eu + +usage() { + printf "Usage: %s [-l DEBUG_LEVEL] [-h] start|stop|restart\n" "$0" +} + +help() { + cat << EOH + SYNOPSIS + $0 [-l DEBUG_LEVEL] [-h] -a start|stop|restart + + DESCRIPTION + + $0 manages symfony messenger + + OPTIONS + + -h Show this help + + -l debug|info|notice|warning|error + Debug level + + -a start|stop|restart +EOH +} + +on_interrupt() { + log -l notice "" + log -l notice "Process aborted!" + + exit 130 +} + +start_messenger() { + started=0 + pid= + + if [ -f "$pid_file" ]; then + pid="$(cat "$pid_file")" + fi + + if [ -n "$pid" ]; then + if [ -d "/proc/$pid" ]; then + log -t -l warning "Already running" + started=1 + fi + fi + + if [ "$started" -eq 0 ]; then + nohup php bin/console messenger:consume 2>/dev/null >/dev/null & + echo -n $! > "$pid_file" + log -t -l notice "Started" + fi +} + +stop_messenger() { + if [ -f "$pid_file" ]; then + pid="$(cat "$pid_file")" + fi + + if [ -n "$pid" ]; then + if [ ! -d "/proc/$pid" ]; then + log -t -l warning "Not started" + else + kill -9 "$pid" + log -t -l notice "Stopped" + fi + + rm "$pid_file" + else + log -t -l warning "Not started" + fi +} + +main() { + cd "$(dirname "0")" + + pid_file=var/messenger.pid + + while getopts "l:ha:" option; do + case "${option}" in + h) help; exit 0;; + l) LOG_VERBOSE="$OPTARG";; + a) ACTION="$OPTARG";; + ?) log -l error "$(usage)"; exit 1;; + esac + done + + if [ "$ACTION" = "start" ]; then + start_messenger + elif [ "$ACTION" = "stop" ]; then + stop_messenger + elif [ "$ACTION" = "restart" ]; then + stop_messenger + start_messenger + fi + + # log [-t] [-l debug|info|notice|warning|error] message + + exit 0 +} + +log() { + LOG_VERBOSE="${LOG_VERBOSE:-info}" + LEVEL=info + TIME= + + while getopts "tl:" option; do + case "${option}" in + l) LEVEL="$OPTARG"; shift $((OPTIND-1));; + t) TIME="$(printf "[%s] " "$(date +'%Y-%m-%dT%H:%M:%S.%s')")"; shift $((OPTIND-1));; + *) exit 1;; + esac + done + + if [ -t 2 ] && [ -z "${NO_COLOR-}" ]; then + case "${LEVEL}" in + debug) COLOR="$(tput setaf 3)";; + notice) COLOR="$(tput setaf 4)";; + warning) COLOR="$(tput setaf 5)";; + error) COLOR="$(tput setaf 1)";; + *) COLOR="$(tput sgr0)";; + esac + fi + + case "${LEVEL}" in + debug) LEVEL=100;; + notice) LEVEL=250;; + warning) LEVEL=300;; + error) LEVEL=400;; + *) LEVEL=200;; + esac + + case "${LOG_VERBOSE}" in + debug) LOG_VERBOSE_VALUE=100;; + notice) LOG_VERBOSE_VALUE=250;; + warning) LOG_VERBOSE_VALUE=300;; + error) LOG_VERBOSE_VALUE=400;; + *) LOG_VERBOSE_VALUE=200;; + esac + + if [ $LEVEL -ge $LOG_VERBOSE_VALUE ]; then + printf "%s\n" "$*" | while IFS='' read -r LINE; do + printf "%s%s%s\n" "${COLOR:-}" "${TIME:-}" "$LINE" >&2 + done + fi +} + +trap on_interrupt INT + +main "$@" + diff --git a/composer.json b/composer.json index 8f4606e..9f8b4a7 100644 --- a/composer.json +++ b/composer.json @@ -13,6 +13,7 @@ "knplabs/knp-markdown-bundle": "^1.9", "knplabs/knp-menu-bundle": "^3.1", "murph/murph-core": "dev-master", + "symfony/messenger": "5.4.*", "twig/intl-extra": "^3.5" }, "require-dev": { diff --git a/config/packages/messenger.yaml b/config/packages/messenger.yaml new file mode 100644 index 0000000..ce96395 --- /dev/null +++ b/config/packages/messenger.yaml @@ -0,0 +1,7 @@ +framework: + messenger: + transports: + async: "%env(MESSENGER_TRANSPORT_DSN)%" + + routing: + 'App\Message\PageViewMessage': async diff --git a/nohup.out b/nohup.out new file mode 100644 index 0000000..f107441 --- /dev/null +++ b/nohup.out @@ -0,0 +1,30 @@ + + [OK] Consuming messages from transport "async". + + // The worker will automatically exit once it has received a stop signal via + // the messenger:stop-workers command. + + // Quit the worker with CONTROL-C. + + // Re-run the command with a -vv option to see logs about consumed messages. + + + [OK] Consuming messages from transport "async". + + // The worker will automatically exit once it has received a stop signal via + // the messenger:stop-workers command. + + // Quit the worker with CONTROL-C. + + // Re-run the command with a -vv option to see logs about consumed messages. + + + [OK] Consuming messages from transport "async". + + // The worker will automatically exit once it has received a stop signal via + // the messenger:stop-workers command. + + // Quit the worker with CONTROL-C. + + // Re-run the command with a -vv option to see logs about consumed messages. + diff --git a/src/EventListener/StatListener.php b/src/EventListener/StatListener.php index 999aa56..7640ddb 100644 --- a/src/EventListener/StatListener.php +++ b/src/EventListener/StatListener.php @@ -2,10 +2,9 @@ namespace App\EventListener; -use App\Api\InfluxDB; +use App\Message\PageViewMessage; use Symfony\Component\HttpKernel\Event\RequestEvent; -use InfluxDB2\WriteType; -use InfluxDB2\Point; +use Symfony\Component\Messenger\MessageBusInterface; /** * class StatListener. @@ -14,28 +13,12 @@ use InfluxDB2\Point; */ class StatListener { - public function __construct(protected InfluxDB $influxDB) + public function __construct(protected MessageBusInterface $bus) { } public function onKernelRequest(RequestEvent $event) { - if (!$this->influxDB->isAvailable()) { - return; - } - - $client = $this->influxDB->getClient(); - - $writeApi = $client->createWriteApi(['writeType' => WriteType::SYNCHRONOUS]); - $pageView = new Point('page_view'); - $pageView - ->addTag('request', 'view') - ->addField('value', 1) - ->time(time()) - ; - - $writeApi->write($pageView); - $writeApi->close(); - $client->close(); + $this->bus->dispatch(new PageViewMessage(time())); } } diff --git a/src/Message/PageViewMessage.php b/src/Message/PageViewMessage.php new file mode 100644 index 0000000..b3e19b7 --- /dev/null +++ b/src/Message/PageViewMessage.php @@ -0,0 +1,15 @@ +time; + } +} diff --git a/src/MessageHandler/PageViewMessageHandler.php b/src/MessageHandler/PageViewMessageHandler.php new file mode 100644 index 0000000..33c8169 --- /dev/null +++ b/src/MessageHandler/PageViewMessageHandler.php @@ -0,0 +1,37 @@ +influxDB->isAvailable()) { + return; + } + + $client = $this->influxDB->getClient(); + + $writeApi = $client->createWriteApi(['writeType' => WriteType::SYNCHRONOUS]); + $pageView = new Point('page_view'); + $pageView + ->addTag('request', 'view') + ->addField('value', 1) + ->time($message->getTime()) + ; + + $writeApi->write($pageView); + $writeApi->close(); + $client->close(); + } +} diff --git a/src/Middleware/PageViewMiddleware.php b/src/Middleware/PageViewMiddleware.php new file mode 100644 index 0000000..1944109 --- /dev/null +++ b/src/Middleware/PageViewMiddleware.php @@ -0,0 +1,16 @@ +next()->handle($envelope, $stack); + } +}