mirror of
https://github.com/abraunegg/onedrive
synced 2026-03-14 14:35:46 +01:00
Update PR
* Update PR - add try{} blocks around main monitor loop and socketio loop
This commit is contained in:
parent
505e5a28e7
commit
f889ee6899
2 changed files with 280 additions and 253 deletions
101
src/main.d
101
src/main.d
|
|
@ -2,24 +2,25 @@
|
|||
module main;
|
||||
|
||||
// What does this module require to function?
|
||||
import core.memory;
|
||||
import core.stdc.stdlib: EXIT_SUCCESS, EXIT_FAILURE, exit;
|
||||
import core.sys.posix.signal;
|
||||
import core.memory;
|
||||
import core.time;
|
||||
import core.thread;
|
||||
import std.stdio;
|
||||
import std.getopt;
|
||||
import std.string;
|
||||
import std.file;
|
||||
import std.process;
|
||||
import core.time;
|
||||
import std.algorithm;
|
||||
import std.path;
|
||||
import std.concurrency;
|
||||
import std.parallelism;
|
||||
import std.conv;
|
||||
import std.traits;
|
||||
import std.net.curl: CurlException;
|
||||
import std.datetime;
|
||||
import std.file;
|
||||
import std.getopt;
|
||||
import std.net.curl: CurlException;
|
||||
import std.parallelism;
|
||||
import std.path;
|
||||
import std.process;
|
||||
import std.socket: SocketException;
|
||||
import std.stdio;
|
||||
import std.string;
|
||||
import std.traits;
|
||||
|
||||
// What other modules that we have created do we need to import?
|
||||
import config;
|
||||
|
|
@ -1289,41 +1290,53 @@ int main(string[] cliArgs) {
|
|||
auto elapsedTime = Clock.currTime() - applicationStartTime;
|
||||
if (debugLogging) {addLogEntry("Application run-time thus far: " ~ to!string(elapsedTime), ["debug"]);}
|
||||
|
||||
// Need to re-validate that the client is still online for this loop
|
||||
if (testInternetReachability(appConfig)) {
|
||||
// Starting a sync - we are online
|
||||
addLogEntry("Starting a sync with Microsoft OneDrive");
|
||||
|
||||
// Attempt to reset syncFailures from any prior loop
|
||||
syncEngineInstance.resetSyncFailures();
|
||||
|
||||
// Update cached quota details from online as this may have changed online in the background outside of this application
|
||||
syncEngineInstance.freshenCachedDriveQuotaDetails();
|
||||
|
||||
// Did the user specify --upload-only?
|
||||
if (appConfig.getValueBool("upload_only")) {
|
||||
// Perform the --upload-only sync process
|
||||
performUploadOnlySyncProcess(localPath, filesystemMonitor);
|
||||
// Catch network exceptions at the monitor-loop level and treat them as recoverable
|
||||
try {
|
||||
// Need to re-validate that the client is still online for this loop
|
||||
if (testInternetReachability(appConfig)) {
|
||||
// Starting a sync - we are online
|
||||
addLogEntry("Starting a sync with Microsoft OneDrive");
|
||||
|
||||
// Attempt to reset syncFailures from any prior loop
|
||||
syncEngineInstance.resetSyncFailures();
|
||||
|
||||
// Update cached quota details from online as this may have changed online in the background outside of this application
|
||||
syncEngineInstance.freshenCachedDriveQuotaDetails();
|
||||
|
||||
// Did the user specify --upload-only?
|
||||
if (appConfig.getValueBool("upload_only")) {
|
||||
// Perform the --upload-only sync process
|
||||
performUploadOnlySyncProcess(localPath, filesystemMonitor);
|
||||
} else {
|
||||
// Perform the standard sync process
|
||||
performStandardSyncProcess(localPath, filesystemMonitor);
|
||||
}
|
||||
|
||||
// Handle any new inotify events
|
||||
processInotifyEvents(true);
|
||||
|
||||
// Detail the outcome of the sync process
|
||||
displaySyncOutcome();
|
||||
|
||||
// Cleanup sync process arrays
|
||||
syncEngineInstance.cleanupArrays();
|
||||
|
||||
// Write WAL and SHM data to file for this loop and release memory used by in-memory processing
|
||||
if (debugLogging) {addLogEntry("Merge contents of WAL and SHM files into main database file", ["debug"]);}
|
||||
itemDB.performCheckpoint("PASSIVE");
|
||||
} else {
|
||||
// Perform the standard sync process
|
||||
performStandardSyncProcess(localPath, filesystemMonitor);
|
||||
// Not online
|
||||
addLogEntry("Microsoft OneDrive service is not reachable at this time. Will re-try on next sync attempt.");
|
||||
}
|
||||
|
||||
// Handle any new inotify events
|
||||
processInotifyEvents(true);
|
||||
|
||||
// Detail the outcome of the sync process
|
||||
displaySyncOutcome();
|
||||
|
||||
// Cleanup sync process arrays
|
||||
syncEngineInstance.cleanupArrays();
|
||||
|
||||
// Write WAL and SHM data to file for this loop and release memory used by in-memory processing
|
||||
if (debugLogging) {addLogEntry("Merge contents of WAL and SHM files into main database file", ["debug"]);}
|
||||
itemDB.performCheckpoint("PASSIVE");
|
||||
} else {
|
||||
// Not online
|
||||
addLogEntry("Microsoft OneDrive service is not reachable at this time. Will re-try on next sync attempt.");
|
||||
} catch (CurlException e) {
|
||||
// Caught a CurlException
|
||||
addLogEntry("Network error during main monitor loop: " ~ e.msg ~ " (will retry)");
|
||||
} catch (SocketException e) {
|
||||
// Caught a SocketException
|
||||
addLogEntry("Socket error during main monitor loop: " ~ e.msg ~ " (will retry)");
|
||||
} catch (Exception e) {
|
||||
// Caught some other error
|
||||
addLogEntry("Unexpected error during main monitor loop: " ~ e.toString());
|
||||
}
|
||||
|
||||
// Output end of loop processing times
|
||||
|
|
|
|||
472
src/socketio.d
472
src/socketio.d
|
|
@ -2,15 +2,17 @@
|
|||
module socketio;
|
||||
|
||||
// What does this module require to function?
|
||||
import std.exception : collectException;
|
||||
import core.thread : Thread;
|
||||
import std.concurrency : spawn, Tid, thisTid, send, receiveTimeout;
|
||||
import core.time : Duration, dur;
|
||||
import std.datetime : SysTime, Clock, UTC;
|
||||
import std.conv : to;
|
||||
import std.string : indexOf;
|
||||
import std.json : JSONValue, JSONType, parseJSON;
|
||||
import core.atomic : atomicLoad, atomicStore;
|
||||
import core.thread : Thread;
|
||||
import core.time : Duration, dur;
|
||||
import std.concurrency : spawn, Tid, thisTid, send, receiveTimeout;
|
||||
import std.conv : to;
|
||||
import std.datetime : SysTime, Clock, UTC;
|
||||
import std.exception : collectException;
|
||||
import std.json : JSONValue, JSONType, parseJSON;
|
||||
import std.net.curl : CurlException;
|
||||
import std.socket : SocketException;
|
||||
import std.string : indexOf;
|
||||
|
||||
// What other modules that we have created do we need to import?
|
||||
import log;
|
||||
|
|
@ -151,244 +153,256 @@ private:
|
|||
}
|
||||
|
||||
while (!self.pleaseStop) {
|
||||
// If we're offline (or OneDrive service not reachable), don't bother trying yet
|
||||
logSocketIOOutput("Testing network to ensure network connectivity to Microsoft OneDrive Service");
|
||||
online = testInternetReachability(self.appConfig, false); // Will display failures, but nothing if successful .. a quiet check of sorts.
|
||||
if (!online) {
|
||||
logSocketIOOutput("Network or OneDrive service not reachable; delaying reconnect");
|
||||
logSocketIOOutput("Backoff " ~ to!string(backoffSeconds) ~ "s before retry");
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
} else {
|
||||
// We are 'online'
|
||||
// Build Socket.IO WS URL from notificationUrl
|
||||
string notif = self.appConfig.websocketNotificationUrl;
|
||||
if (notif.length == 0) {
|
||||
logSocketIOOutput("No notificationUrl available; will retry");
|
||||
// Catch network exceptions at the socketio-loop level and treat them as recoverable
|
||||
try {
|
||||
// If we're offline (or OneDrive service not reachable), don't bother trying yet
|
||||
logSocketIOOutput("Testing network to ensure network connectivity to Microsoft OneDrive Service");
|
||||
online = testInternetReachability(self.appConfig, false); // Will display failures, but nothing if successful .. a quiet check of sorts.
|
||||
if (!online) {
|
||||
logSocketIOOutput("Network or OneDrive service not reachable; delaying reconnect");
|
||||
logSocketIOOutput("Backoff " ~ to!string(backoffSeconds) ~ "s before retry");
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
}
|
||||
|
||||
self.currentNotifUrl = notif;
|
||||
string wsUrl = toSocketIoWsUrl(notif);
|
||||
|
||||
// Fresh WS instance per attempt
|
||||
self.ws = new CurlWebSocket();
|
||||
|
||||
// Use application configuration values
|
||||
self.ws.setUserAgent(self.appConfig.getValueString("user_agent"));
|
||||
self.ws.setHTTPSDebug(self.appConfig.getValueBool("debug_https"));
|
||||
self.ws.setTimeouts(10000, 15000);
|
||||
|
||||
// Connect to Microsoft Graph API using WebSockets and Socket.IO v4
|
||||
logSocketIOOutput("Connecting to " ~ wsUrl);
|
||||
auto rc = self.ws.connect(wsUrl);
|
||||
if (rc != 0) {
|
||||
logSocketIOOutput("self.ws.connect failed; will retry");
|
||||
collectException(self.ws.close(1002, "connect-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Socket.IO handshake: wait for '0{json}'
|
||||
if (!awaitEngineOpen(self.ws, self)) {
|
||||
logSocketIOOutput("Socket.IO open handshake failed; will retry");
|
||||
collectException(self.ws.close(1002, "handshake-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Open default namespace: send "40"
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to default namespace");
|
||||
if (self.ws.sendText("40") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 (open namespace); will retry");
|
||||
collectException(self.ws.close(1002, "ns40-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/'");
|
||||
}
|
||||
|
||||
// Open 'notifications' namespace: send "40/notifications"
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to '/notifications' namespace");
|
||||
if (self.ws.sendText("40/notifications") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 for '/notifications' namespace; will retry");
|
||||
collectException(self.ws.close(1002, "ns40-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/notifications'");
|
||||
}
|
||||
|
||||
// Connected successfully → reset backoff
|
||||
backoffSeconds = 1;
|
||||
// Reset per-connection flags so renew logic and ns-open tracking work after reconnection
|
||||
self.expiryWarned = false;
|
||||
self.renewRequested = false;
|
||||
self.namespaceOpened = false;
|
||||
|
||||
// Track last server ping received to detect a dead connection
|
||||
SysTime lastPingAt = Clock.currTime(UTC());
|
||||
|
||||
// Listen for Socket.IO Events
|
||||
for (;;) {
|
||||
// Stop request
|
||||
if (self.pleaseStop) {
|
||||
logSocketIOOutput("Stop requested; shutting down run() loop");
|
||||
collectException(self.ws.close(1000, "stop-requested"));
|
||||
collectException(self.ws.cleanupCurlHandle());
|
||||
return;
|
||||
}
|
||||
|
||||
// Subscription nearing expiry? (informational; renewal happens elsewhere)
|
||||
if (!self.expiryWarned && self.appConfig.websocketUrlExpiry.length > 0) {
|
||||
SysTime expiry;
|
||||
auto e = collectException(expiry = SysTime.fromISOExtString(self.appConfig.websocketUrlExpiry));
|
||||
if (e is null) {
|
||||
auto remain = expiry - Clock.currTime(UTC());
|
||||
if (remain <= dur!"minutes"(5)) {
|
||||
self.expiryWarned = true; // emit only once
|
||||
logSocketIOOutput("subscription nearing expiry; renewal required soon");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Renewal window check (emit once; 2 minutes before)
|
||||
if (!self.renewRequested && self.appConfig.websocketUrlExpiry.length > 0) {
|
||||
SysTime expiry;
|
||||
auto e = collectException(expiry = SysTime.fromISOExtString(self.appConfig.websocketUrlExpiry));
|
||||
if (e is null) {
|
||||
auto remain = expiry - Clock.currTime(UTC());
|
||||
if (remain <= dur!"minutes"(2)) {
|
||||
self.renewRequested = true;
|
||||
logSocketIOOutput("Subscription nearing expiry; requesting renewal from main() monitor loop");
|
||||
send(self.parentTid, "SOCKETIO_RENEWAL_REQUEST");
|
||||
send(self.parentTid, "SOCKETIO_RENEWAL_CONTEXT:" ~ "id=" ~ self.appConfig.websocketEndpointResponse ~ " url=" ~ self.appConfig.websocketNotificationUrl);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we haven't seen a server ping within pingInterval + pingTimeout → treat as dead link
|
||||
auto now = Clock.currTime(UTC());
|
||||
auto maxSilence = dur!"msecs"(self.pingIntervalMs + self.pingTimeoutMs);
|
||||
if (now - lastPingAt > maxSilence) {
|
||||
logSocketIOOutput("No server ping within expected window; restarting WebSocket");
|
||||
break; // fall out to backoff/retry
|
||||
}
|
||||
|
||||
// Reconnect to a new endpoint if main updated websocketNotificationUrl
|
||||
if (self.appConfig.websocketNotificationUrl.length > 0 &&
|
||||
self.appConfig.websocketNotificationUrl != self.currentNotifUrl) {
|
||||
|
||||
logSocketIOOutput("Detected new notificationUrl; reconnecting");
|
||||
collectException(self.ws.close(1000, "reconnect"));
|
||||
|
||||
self.currentNotifUrl = self.appConfig.websocketNotificationUrl;
|
||||
string newWsUrl = toSocketIoWsUrl(self.currentNotifUrl);
|
||||
|
||||
// Establish a fresh connection and handshakes
|
||||
self.ws = new CurlWebSocket();
|
||||
self.ws.setUserAgent(self.appConfig.getValueString("user_agent"));
|
||||
self.ws.setTimeouts(10000, 15000);
|
||||
self.ws.setHTTPSDebug(self.appConfig.getValueBool("debug_https"));
|
||||
|
||||
auto rc2 = self.ws.connect(newWsUrl);
|
||||
if (rc2 != 0) {
|
||||
logSocketIOOutput("reconnect failed");
|
||||
break; // fall out to backoff/retry
|
||||
}
|
||||
if (!awaitEngineOpen(self.ws, self)) {
|
||||
logSocketIOOutput("Socket.IO open after reconnect failed");
|
||||
break; // fall out to backoff/retry
|
||||
}
|
||||
|
||||
// Open default namespace again
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to default namespace");
|
||||
if (self.ws.sendText("40") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 (open namespace)");
|
||||
break; // fall out to backoff/retry
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/'");
|
||||
}
|
||||
|
||||
// Open '/notifications' again (best-effort)
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to '/notifications' namespace");
|
||||
if (self.ws.sendText("40/notifications") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 for '/notifications' namespace");
|
||||
break; // fall out to backoff/retry
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/notifications'");
|
||||
}
|
||||
|
||||
// Reset ping reference after a clean reconnect
|
||||
lastPingAt = Clock.currTime(UTC());
|
||||
}
|
||||
|
||||
// Receive message
|
||||
auto msg = self.ws.recvText();
|
||||
if (msg.length == 0) {
|
||||
Thread.sleep(dur!"msecs"(20));
|
||||
// We are 'online'
|
||||
// Build Socket.IO WS URL from notificationUrl
|
||||
string notif = self.appConfig.websocketNotificationUrl;
|
||||
if (notif.length == 0) {
|
||||
logSocketIOOutput("No notificationUrl available; will retry");
|
||||
logSocketIOOutput("Backoff " ~ to!string(backoffSeconds) ~ "s before retry");
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Socket.IO parsing
|
||||
if (msg.length > 0 && msg[0] == '2') {
|
||||
// Server ping -> immediate pong, and mark last ping time
|
||||
if (self.ws.sendText("3") != 0) {
|
||||
logSocketIOOutput("Failed sending Socket.IO pong '3'");
|
||||
break; // fall out to backoff/retry
|
||||
} else {
|
||||
lastPingAt = Clock.currTime(UTC());
|
||||
logSocketIOOutput("Socket.IO ping received, → pong sent");
|
||||
}
|
||||
self.currentNotifUrl = notif;
|
||||
string wsUrl = toSocketIoWsUrl(notif);
|
||||
|
||||
// Fresh WS instance per attempt
|
||||
self.ws = new CurlWebSocket();
|
||||
|
||||
// Use application configuration values
|
||||
self.ws.setUserAgent(self.appConfig.getValueString("user_agent"));
|
||||
self.ws.setHTTPSDebug(self.appConfig.getValueBool("debug_https"));
|
||||
self.ws.setTimeouts(10000, 15000);
|
||||
|
||||
// Connect to Microsoft Graph API using WebSockets and Socket.IO v4
|
||||
logSocketIOOutput("Connecting to " ~ wsUrl);
|
||||
auto rc = self.ws.connect(wsUrl);
|
||||
if (rc != 0) {
|
||||
logSocketIOOutput("self.ws.connect failed; will retry");
|
||||
collectException(self.ws.close(1002, "connect-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (msg.length > 0 && msg[0] == '3') {
|
||||
// Socket.IO handshake: wait for '0{json}'
|
||||
if (!awaitEngineOpen(self.ws, self)) {
|
||||
logSocketIOOutput("Socket.IO open handshake failed; will retry");
|
||||
collectException(self.ws.close(1002, "handshake-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
} else if (msg.length > 1 && msg[0] == '4' && msg[1] == '2') {
|
||||
logSocketIOOutput("Received 42 msg = " ~ to!string(msg));
|
||||
handleSocketIoEvent(msg, self);
|
||||
continue;
|
||||
} else if (msg.length > 1 && msg[0] == '4' && msg[1] == '0') {
|
||||
logSocketIOOutput("Received 40 msg = " ~ to!string(msg));
|
||||
// 40{"sid":...} or 40/notifications,{...}
|
||||
size_t i = 3;
|
||||
while (i < msg.length && msg[i] != ',') i++;
|
||||
auto ns = msg[3 .. i];
|
||||
}
|
||||
|
||||
if (ns == "notifications") {
|
||||
logSocketIOOutput("Namespace '/notifications' opened; listening for Socket.IO events via WebSocket Transport");
|
||||
} else {
|
||||
logSocketIOOutput("Namespace '/' opened; listening for Socket.IO events via WebSocket Transport");
|
||||
}
|
||||
self.namespaceOpened = true;
|
||||
continue;
|
||||
|
||||
} else if (msg.length > 1 && msg[0] == '4' && msg[1] == '1') {
|
||||
logSocketIOOutput("got 41 (disconnect)");
|
||||
break; // fall out to backoff/retry
|
||||
} else if (msg.length > 0 && msg[0] == '0') {
|
||||
parseEngineOpenFromPacket(msg, self);
|
||||
// Open default namespace: send "40"
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to default namespace");
|
||||
if (self.ws.sendText("40") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 (open namespace); will retry");
|
||||
collectException(self.ws.close(1002, "ns40-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
} else {
|
||||
logSocketIOOutput("Received Unhandled Message: " ~ msg);
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/'");
|
||||
}
|
||||
}
|
||||
|
||||
// Fell out of the inner loop → close and backoff, then retry
|
||||
logSocketIOOutput("Retrying WebSocket Connection");
|
||||
collectException(self.ws.close(1001, "reconnect"));
|
||||
logSocketIOOutput("Backoff " ~ to!string(backoffSeconds) ~ "s before retry");
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
// Open 'notifications' namespace: send "40/notifications"
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to '/notifications' namespace");
|
||||
if (self.ws.sendText("40/notifications") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 for '/notifications' namespace; will retry");
|
||||
collectException(self.ws.close(1002, "ns40-failed"));
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
continue;
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/notifications'");
|
||||
}
|
||||
|
||||
// Connected successfully → reset backoff
|
||||
backoffSeconds = 1;
|
||||
// Reset per-connection flags so renew logic and ns-open tracking work after reconnection
|
||||
self.expiryWarned = false;
|
||||
self.renewRequested = false;
|
||||
self.namespaceOpened = false;
|
||||
|
||||
// Track last server ping received to detect a dead connection
|
||||
SysTime lastPingAt = Clock.currTime(UTC());
|
||||
|
||||
// Listen for Socket.IO Events
|
||||
for (;;) {
|
||||
// Stop request
|
||||
if (self.pleaseStop) {
|
||||
logSocketIOOutput("Stop requested; shutting down run() loop");
|
||||
collectException(self.ws.close(1000, "stop-requested"));
|
||||
collectException(self.ws.cleanupCurlHandle());
|
||||
return;
|
||||
}
|
||||
|
||||
// Subscription nearing expiry? (informational; renewal happens elsewhere)
|
||||
if (!self.expiryWarned && self.appConfig.websocketUrlExpiry.length > 0) {
|
||||
SysTime expiry;
|
||||
auto e = collectException(expiry = SysTime.fromISOExtString(self.appConfig.websocketUrlExpiry));
|
||||
if (e is null) {
|
||||
auto remain = expiry - Clock.currTime(UTC());
|
||||
if (remain <= dur!"minutes"(5)) {
|
||||
self.expiryWarned = true; // emit only once
|
||||
logSocketIOOutput("subscription nearing expiry; renewal required soon");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Renewal window check (emit once; 2 minutes before)
|
||||
if (!self.renewRequested && self.appConfig.websocketUrlExpiry.length > 0) {
|
||||
SysTime expiry;
|
||||
auto e = collectException(expiry = SysTime.fromISOExtString(self.appConfig.websocketUrlExpiry));
|
||||
if (e is null) {
|
||||
auto remain = expiry - Clock.currTime(UTC());
|
||||
if (remain <= dur!"minutes"(2)) {
|
||||
self.renewRequested = true;
|
||||
logSocketIOOutput("Subscription nearing expiry; requesting renewal from main() monitor loop");
|
||||
send(self.parentTid, "SOCKETIO_RENEWAL_REQUEST");
|
||||
send(self.parentTid, "SOCKETIO_RENEWAL_CONTEXT:" ~ "id=" ~ self.appConfig.websocketEndpointResponse ~ " url=" ~ self.appConfig.websocketNotificationUrl);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we haven't seen a server ping within pingInterval + pingTimeout → treat as dead link
|
||||
auto now = Clock.currTime(UTC());
|
||||
auto maxSilence = dur!"msecs"(self.pingIntervalMs + self.pingTimeoutMs);
|
||||
if (now - lastPingAt > maxSilence) {
|
||||
logSocketIOOutput("No server ping within expected window; restarting WebSocket");
|
||||
break; // fall out to backoff/retry
|
||||
}
|
||||
|
||||
// Reconnect to a new endpoint if main updated websocketNotificationUrl
|
||||
if (self.appConfig.websocketNotificationUrl.length > 0 &&
|
||||
self.appConfig.websocketNotificationUrl != self.currentNotifUrl) {
|
||||
|
||||
logSocketIOOutput("Detected new notificationUrl; reconnecting");
|
||||
collectException(self.ws.close(1000, "reconnect"));
|
||||
|
||||
self.currentNotifUrl = self.appConfig.websocketNotificationUrl;
|
||||
string newWsUrl = toSocketIoWsUrl(self.currentNotifUrl);
|
||||
|
||||
// Establish a fresh connection and handshakes
|
||||
self.ws = new CurlWebSocket();
|
||||
self.ws.setUserAgent(self.appConfig.getValueString("user_agent"));
|
||||
self.ws.setTimeouts(10000, 15000);
|
||||
self.ws.setHTTPSDebug(self.appConfig.getValueBool("debug_https"));
|
||||
|
||||
auto rc2 = self.ws.connect(newWsUrl);
|
||||
if (rc2 != 0) {
|
||||
logSocketIOOutput("reconnect failed");
|
||||
break; // fall out to backoff/retry
|
||||
}
|
||||
if (!awaitEngineOpen(self.ws, self)) {
|
||||
logSocketIOOutput("Socket.IO open after reconnect failed");
|
||||
break; // fall out to backoff/retry
|
||||
}
|
||||
|
||||
// Open default namespace again
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to default namespace");
|
||||
if (self.ws.sendText("40") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 (open namespace)");
|
||||
break; // fall out to backoff/retry
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/'");
|
||||
}
|
||||
|
||||
// Open '/notifications' again (best-effort)
|
||||
logSocketIOOutput("Sending Socket.IO connect (40) to '/notifications' namespace");
|
||||
if (self.ws.sendText("40/notifications") != 0) {
|
||||
logSocketIOOutput("Failed to send 40 for '/notifications' namespace");
|
||||
break; // fall out to backoff/retry
|
||||
} else {
|
||||
logSocketIOOutput("Sent Socket.IO connect '40' for namespace '/notifications'");
|
||||
}
|
||||
|
||||
// Reset ping reference after a clean reconnect
|
||||
lastPingAt = Clock.currTime(UTC());
|
||||
}
|
||||
|
||||
// Receive message
|
||||
auto msg = self.ws.recvText();
|
||||
if (msg.length == 0) {
|
||||
Thread.sleep(dur!"msecs"(20));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Socket.IO parsing
|
||||
if (msg.length > 0 && msg[0] == '2') {
|
||||
// Server ping -> immediate pong, and mark last ping time
|
||||
if (self.ws.sendText("3") != 0) {
|
||||
logSocketIOOutput("Failed sending Socket.IO pong '3'");
|
||||
break; // fall out to backoff/retry
|
||||
} else {
|
||||
lastPingAt = Clock.currTime(UTC());
|
||||
logSocketIOOutput("Socket.IO ping received, → pong sent");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (msg.length > 0 && msg[0] == '3') {
|
||||
continue;
|
||||
} else if (msg.length > 1 && msg[0] == '4' && msg[1] == '2') {
|
||||
logSocketIOOutput("Received 42 msg = " ~ to!string(msg));
|
||||
handleSocketIoEvent(msg, self);
|
||||
continue;
|
||||
} else if (msg.length > 1 && msg[0] == '4' && msg[1] == '0') {
|
||||
logSocketIOOutput("Received 40 msg = " ~ to!string(msg));
|
||||
// 40{"sid":...} or 40/notifications,{...}
|
||||
size_t i = 3;
|
||||
while (i < msg.length && msg[i] != ',') i++;
|
||||
auto ns = msg[3 .. i];
|
||||
|
||||
if (ns == "notifications") {
|
||||
logSocketIOOutput("Namespace '/notifications' opened; listening for Socket.IO events via WebSocket Transport");
|
||||
} else {
|
||||
logSocketIOOutput("Namespace '/' opened; listening for Socket.IO events via WebSocket Transport");
|
||||
}
|
||||
self.namespaceOpened = true;
|
||||
continue;
|
||||
|
||||
} else if (msg.length > 1 && msg[0] == '4' && msg[1] == '1') {
|
||||
logSocketIOOutput("got 41 (disconnect)");
|
||||
break; // fall out to backoff/retry
|
||||
} else if (msg.length > 0 && msg[0] == '0') {
|
||||
parseEngineOpenFromPacket(msg, self);
|
||||
continue;
|
||||
} else {
|
||||
logSocketIOOutput("Received Unhandled Message: " ~ msg);
|
||||
}
|
||||
}
|
||||
|
||||
// Fell out of the inner loop → close and backoff, then retry
|
||||
logSocketIOOutput("Retrying WebSocket Connection");
|
||||
collectException(self.ws.close(1001, "reconnect"));
|
||||
logSocketIOOutput("Backoff " ~ to!string(backoffSeconds) ~ "s before retry");
|
||||
Thread.sleep(dur!"seconds"(backoffSeconds));
|
||||
if (backoffSeconds < maxBackoffSeconds) backoffSeconds *= 2;
|
||||
}
|
||||
} catch (CurlException e) {
|
||||
// Caught a CurlException
|
||||
addLogEntry("Network error during socketio loop: " ~ e.msg ~ " (will retry)");
|
||||
} catch (SocketException e) {
|
||||
// Caught a SocketException
|
||||
addLogEntry("Socket error during socketio loop: " ~ e.msg ~ " (will retry)");
|
||||
} catch (Exception e) {
|
||||
// Caught some other error
|
||||
addLogEntry("Unexpected error during socketio loop: " ~ e.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue