From f88c8d0bc4ed252ad24ec3f9ad25da603dbc4db3 Mon Sep 17 00:00:00 2001 From: extremeheat Date: Sat, 24 Sep 2022 13:53:26 -0400 Subject: [PATCH] 1.19.30 support, improve error handling and server pong data (#284) * Update server advertisement * 1.19.30 protocol support * Handle configurable compressor * Support updated 1.19.30 login flow with NetworkSettings * Improve serialization error handling on client * refactor compressor handling * Fix client on older versions, fix internal error handling * Improve error handling * Log console connection errors; use raknet-native for proxy test --- src/client.js | 43 ++++++++++++++++++++++++++--- src/connection.js | 21 +++++++++------ src/datatypes/util.js | 4 +-- src/options.js | 10 +++++-- src/rak.js | 7 ++--- src/server.js | 35 ++++++++++++++++++++++-- src/server/advertisement.js | 27 ++++++++++++------- src/serverPlayer.js | 52 +++++++++++++++++++++++++++++------- src/transforms/framer.js | 44 +++++++++++++++++++++--------- src/transforms/serializer.js | 7 +---- test/internal.js | 4 +-- test/proxy.js | 8 +++--- tools/startVanillaServer.js | 11 +++++--- 13 files changed, 205 insertions(+), 68 deletions(-) diff --git a/src/client.js b/src/client.js index 7ec612b..ed287aa 100644 --- a/src/client.js +++ b/src/client.js @@ -8,6 +8,7 @@ const initRaknet = require('./rak') const { KeyExchange } = require('./handshake/keyExchange') const Login = require('./handshake/login') const LoginVerify = require('./handshake/loginVerify') +const fs = require('fs') const debugging = false @@ -22,6 +23,10 @@ class Client extends Connection { this.startGameData = {} this.clientRuntimeId = null + // Start off without compression on 1.19.30, zlib on below + this.compressionAlgorithm = this.versionGreaterThanOrEqualTo('1.19.30') ? 'none' : 'deflate' + this.compressionThreshold = 512 + this.compressionLevel = this.options.compressionLevel if (isDebug) { this.inLog = (...args) => debug('C ->', ...args) @@ -46,7 +51,7 @@ class Client extends Connection { const { RakClient } = initRaknet(this.options.raknetBackend) const host = this.options.host const port = this.options.port - this.connection = new RakClient({ useWorkers: this.options.useRaknetWorkers, host, port }) + this.connection = new RakClient({ useWorkers: this.options.useRaknetWorkers, host, port }, this) this.emit('connect_allowed') } @@ -76,7 +81,7 @@ class Client extends Connection { onEncapsulated = (encapsulated, inetAddr) => { const buffer = Buffer.from(encapsulated.buffer) - this.handle(buffer) + process.nextTick(() => this.handle(buffer)) } async ping () { @@ -90,8 +95,18 @@ class Client extends Connection { _connect = async (sessionData) => { debug('[client] connecting to', this.options.host, this.options.port, sessionData, this.connection) - this.connection.onConnected = () => this.sendLogin() - this.connection.onCloseConnection = (reason) => this.close() + this.connection.onConnected = () => { + this.status = ClientStatus.Connecting + if (this.versionGreaterThanOrEqualTo('1.19.30')) { + this.queue('request_network_settings', { client_protocol: this.options.protocolVersion }) + } else { + this.sendLogin() + } + } + this.connection.onCloseConnection = (reason) => { + if (this.status === ClientStatus.Disconnected) this.conLog?.(`Server closed connection: ${reason}`) + this.close() + } this.connection.onEncapsulated = this.onEncapsulated this.connection.connect() @@ -103,6 +118,11 @@ class Client extends Connection { }, this.options.connectTimeout || 9000) } + updateCompressorSettings (packet) { + this.compressionAlgorithm = packet.compression_algorithm || 'deflate' + this.compressionThreshold = packet.compression_threshold + } + sendLogin () { this.status = ClientStatus.Authenticating this.createClientChain(null, this.options.offline) @@ -174,6 +194,15 @@ class Client extends Connection { try { var des = this.deserializer.parsePacketBuffer(packet) // eslint-disable-line } catch (e) { + // Dump information about the packet only if user is not handling error event. + if (this.listenerCount('error') === 0) { + if (packet.length > 1000) { + fs.writeFileSync('packetReadError.txt', packet.toString('hex')) + console.log(`Deserialization failure for packet 0x${packet.slice(0, 1).toString('hex')}. Packet buffer saved in ./packetReadError.txt as buffer was too large (${packet.length} bytes).`) + } else { + console.log('Read failure for 0x' + packet.slice(0, 1).toString('hex'), packet.slice(0, 1000)) + } + } this.emit('error', e) return } @@ -193,6 +222,12 @@ class Client extends Connection { case 'server_to_client_handshake': this.emit('client.server_handshake', des.data.params) break + case 'network_settings': + this.updateCompressorSettings(des.data.params) + if (this.status === ClientStatus.Connecting) { + this.sendLogin() + } + break case 'disconnect': // Client kicked this.emit(des.data.name, des.data.params) // Emit before we kill all listeners. this.onDisconnectRequest(des.data.params) diff --git a/src/connection.js b/src/connection.js index ddd60b1..bc1d56b 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,14 +1,15 @@ -const Framer = require('./transforms/framer') const cipher = require('./transforms/encryption') const { EventEmitter } = require('events') const { Versions } = require('./options') const debug = require('debug')('minecraft-protocol') +const { Framer } = require('./transforms/framer') const ClientStatus = { Disconnected: 0, - Authenticating: 1, // Handshaking - Initializing: 2, // Authed, need to spawn - Initialized: 3 // play_status spawn sent by server, client responded with SetPlayerInit packet + Connecting: 1, + Authenticating: 2, // Handshaking + Initializing: 3, // Authed, need to spawn + Initialized: 4 // play_status spawn sent by server, client responded with SetPlayerInit packet } class Connection extends EventEmitter { @@ -34,6 +35,10 @@ class Connection extends EventEmitter { return this.options.protocolVersion > (typeof version === 'string' ? Versions[version] : version) } + versionGreaterThanOrEqualTo (version) { + return this.options.protocolVersion >= (typeof version === 'string' ? Versions[version] : version) + } + startEncryption (iv) { this.encryptionEnabled = true this.inLog?.('Started encryption', this.sharedSecret, iv) @@ -60,7 +65,7 @@ class Connection extends EventEmitter { write (name, params) { this.outLog?.(name, params) if (name === 'start_game') this.updateItemPalette(params.itemstates) - const batch = new Framer(this.compressionLevel) + const batch = new Framer(this.compressionAlgorithm, this.compressionLevel, this.compressionThreshold) const packet = this.serializer.createPacketBuffer({ name, params }) batch.addEncodedPacket(packet) @@ -86,7 +91,7 @@ class Connection extends EventEmitter { _tick () { if (this.sendQ.length) { - const batch = new Framer(this.compressionLevel) + const batch = new Framer(this.compressionAlgorithm, this.compressionLevel, this.compressionThreshold) batch.addEncodedPackets(this.sendQ) this.sendQ = [] this.sendIds = [] @@ -110,7 +115,7 @@ class Connection extends EventEmitter { */ sendBuffer (buffer, immediate = false) { if (immediate) { - const batch = new Framer(this.compressionLevel) + const batch = new Framer(this.compressionAlgorithm, this.compressionLevel, this.compressionThreshold) batch.addEncodedPacket(buffer) if (this.encryptionEnabled) { this.sendEncryptedBatch(batch) @@ -162,7 +167,7 @@ class Connection extends EventEmitter { if (this.encryptionEnabled) { this.decrypt(buffer.slice(1)) } else { - const packets = Framer.decode(buffer) + const packets = Framer.decode(this.compressionAlgorithm, buffer) for (const packet of packets) { this.readPacket(packet) } diff --git a/src/datatypes/util.js b/src/datatypes/util.js index 4b29d57..7070ce5 100644 --- a/src/datatypes/util.js +++ b/src/datatypes/util.js @@ -23,11 +23,11 @@ function sleep (ms) { async function waitFor (cb, withTimeout, onTimeout) { let t const ret = await Promise.race([ - new Promise(resolve => cb(resolve)), + new Promise((resolve, reject) => cb(resolve, reject)), new Promise(resolve => { t = setTimeout(() => resolve('timeout'), withTimeout) }) ]) clearTimeout(t) - if (ret === 'timeout') onTimeout() + if (ret === 'timeout') await onTimeout() return ret } diff --git a/src/options.js b/src/options.js index e081536..bd1282f 100644 --- a/src/options.js +++ b/src/options.js @@ -21,7 +21,14 @@ const defaultOptions = { // Specifies the raknet implementation to use raknetBackend: 'raknet-native', // If using JS implementation of RakNet, should we use workers? (This only affects the client) - useRaknetWorkers: true + useRaknetWorkers: true, + + // server: What compression algorithm to use by default, either `none`, `deflate` or `snappy` + compressionAlgorithm: 'deflate', + // server and client: On Deflate, what compression level to use, between 1 and 9 + compressionLevel: 7, + // server: If true, only compress if a payload is larger than compressionThreshold + compressionThreshold: 512 } function validateOptions (options) { @@ -34,7 +41,6 @@ function validateOptions (options) { if (options.protocolVersion < MIN_VERSION) { throw new Error(`Protocol version < ${MIN_VERSION} : ${options.protocolVersion}, too old`) } - this.compressionLevel = options.compressionLevel || 7 if (options.useNativeRaknet === true) options.raknetBackend = 'raknet-native' if (options.useNativeRaknet === false) options.raknetBackend = 'jsp-raknet' } diff --git a/src/rak.js b/src/rak.js index 7b1e2f1..35e82cf 100644 --- a/src/rak.js +++ b/src/rak.js @@ -24,14 +24,15 @@ module.exports = (backend) => { } class RakNativeClient extends EventEmitter { - constructor (options) { + constructor (options, client) { super() this.connected = false this.onConnected = () => { } this.onCloseConnection = () => { } this.onEncapsulated = () => { } - this.raknet = new Client(options.host, options.port, { protocolVersion: 10 }) + const protocolVersion = client?.versionGreaterThanOrEqualTo('1.19.30') ? 11 : 10 + this.raknet = new Client(options.host, options.port, { protocolVersion }) this.raknet.on('encapsulated', ({ buffer, address }) => { if (this.connected) { // Discard packets that are queued to be sent to us after close this.onEncapsulated(buffer, address) @@ -86,7 +87,7 @@ class RakNativeServer extends EventEmitter { this.onEncapsulated = () => { } this.raknet = new Server(options.host, options.port, { maxConnections: options.maxPlayers || 3, - protocolVersion: 10, + protocolVersion: server.versionLessThan('1.19.30') ? 10 : 11, message: server.getAdvertisement().toBuffer() }) this.onClose = () => {} diff --git a/src/server.js b/src/server.js index ffa5d77..bbefdae 100644 --- a/src/server.js +++ b/src/server.js @@ -16,18 +16,49 @@ class Server extends EventEmitter { this.serializer = createSerializer(this.options.version) this.deserializer = createDeserializer(this.options.version) - this.advertisement = new ServerAdvertisement(this.options.motd, this.options.version) + this.advertisement = new ServerAdvertisement(this.options.motd, this.options.port, this.options.version) this.advertisement.playersMax = options.maxPlayers ?? 3 /** @type {Object} */ this.clients = {} this.clientCount = 0 this.conLog = debug + + this.setCompressor(this.options.compressionAlgorithm, this.options.compressionLevel, this.options.compressionThreshold) + } + + setCompressor (algorithm, level = 1, threshold = 256) { + if (algorithm === 'none') { + this.compressionAlgorithm = 'none' + this.compressionLevel = 0 + } else if (algorithm === 'deflate') { + this.compressionAlgorithm = 'deflate' + this.compressionLevel = level + this.compressionThreshold = threshold + } else if (algorithm === 'snappy') { + this.compressionAlgorithm = 'snappy' + this.compressionLevel = level + this.compressionThreshold = threshold + } else { + throw new Error(`Unknown compression algorithm ${algorithm}`) + } } validateOptions () { Options.validateOptions(this.options) } + versionLessThan (version) { + return this.options.protocolVersion < (typeof version === 'string' ? Options.Versions[version] : version) + } + + versionGreaterThan (version) { + return this.options.protocolVersion > (typeof version === 'string' ? Options.Versions[version] : version) + } + + versionGreaterThanOrEqualTo (version) { + return this.options.protocolVersion >= (typeof version === 'string' ? Options.Versions[version] : version) + } + onOpenConnection = (conn) => { this.conLog('new connection', conn?.address) const player = new Player(this, conn) @@ -51,7 +82,7 @@ class Server extends EventEmitter { debug(`ignoring packet from unknown inet addr: ${address}`) return } - client.handle(buffer) + process.nextTick(() => client.handle(buffer)) } getAdvertisement () { diff --git a/src/server/advertisement.js b/src/server/advertisement.js index ff5f08a..c7210d9 100644 --- a/src/server/advertisement.js +++ b/src/server/advertisement.js @@ -6,24 +6,26 @@ class ServerAdvertisement { playersOnline = 0 playersMax = 5 gamemode = 'Creative' - serverId = '0' + serverId = Date.now().toString() gamemodeId = 1 - port = undefined + portV4 = undefined portV6 = undefined - constructor (obj, version = CURRENT_VERSION) { + constructor (obj, port, version = CURRENT_VERSION) { if (obj?.name) obj.motd = obj.name this.protocol = Versions[version] this.version = version + this.portV4 = port + this.portV6 = port Object.assign(this, obj) } fromString (str) { - const [header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, port, portV6] = str.split(';') - Object.assign(this, { header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, port, portV6 }) - for (const numeric of ['playersOnline', 'playersMax', 'gamemodeId', 'port', 'portV6']) { + const [header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, portV4, portV6] = str.split(';') + Object.assign(this, { header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, portV4, portV6 }) + for (const numeric of ['playersOnline', 'playersMax', 'gamemodeId', 'portV4', 'portV6']) { if (this[numeric] !== undefined) { - this[numeric] = parseInt(this[numeric]) + this[numeric] = this[numeric] ? parseInt(this[numeric]) : null } } return this @@ -39,13 +41,20 @@ class ServerAdvertisement { this.playersMax, this.serverId, this.levelName, - this.gamemode + this.gamemode, + this.gamemodeId, + this.portV4, + this.portV6, + '0' ].join(';') + ';' } toBuffer (version) { const str = this.toString(version) - return Buffer.concat([Buffer.from([0, str.length]), Buffer.from(str)]) + const buf = Buffer.alloc(2 + str.length) + buf.writeUInt16BE(str.length, 0) + buf.write(str, 2) + return buf } } diff --git a/src/serverPlayer.js b/src/serverPlayer.js index 83f25ec..ffb1961 100644 --- a/src/serverPlayer.js +++ b/src/serverPlayer.js @@ -4,7 +4,6 @@ const { serialize, isDebug } = require('./datatypes/util') const { KeyExchange } = require('./handshake/keyExchange') const Login = require('./handshake/login') const LoginVerify = require('./handshake/loginVerify') -const fs = require('fs') const debug = require('debug')('minecraft-protocol') class Player extends Connection { @@ -15,7 +14,6 @@ class Player extends Connection { this.deserializer = server.deserializer this.connection = connection this.options = server.options - this.compressionLevel = server.compressionLevel KeyExchange(this, server, server.options) Login(this, server, server.options) @@ -28,24 +26,49 @@ class Player extends Connection { this.inLog = (...args) => debug('S ->', ...args) this.outLog = (...args) => debug('S <-', ...args) } + + // Compression is server-wide + this.compressionAlgorithm = this.server.compressionAlgorithm + this.compressionLevel = this.server.compressionLevel + this.compressionThreshold = this.server.compressionThreshold + + this._sentNetworkSettings = false // 1.19.30+ } getUserData () { return this.userData } + sendNetworkSettings () { + this.write('network_settings', { + compression_threshold: this.server.compressionThreshold, + compression_algorithm: this.server.compressionAlgorithm, + client_throttle: false, + client_throttle_threshold: 0, + client_throttle_scalar: 0 + }) + this._sentNetworkSettings = true + } + + handleClientProtocolVersion (clientVersion) { + if (this.server.options.protocolVersion) { + if (this.server.options.protocolVersion < clientVersion) { + this.sendDisconnectStatus('failed_spawn') // client too new + return false + } + } else if (clientVersion < Options.MIN_VERSION) { + this.sendDisconnectStatus('failed_client') // client too old + return false + } + return true + } + onLogin (packet) { const body = packet.data this.emit('loggingIn', body) const clientVer = body.params.protocol_version - if (this.server.options.protocolVersion) { - if (this.server.options.protocolVersion < clientVer) { - this.sendDisconnectStatus('failed_spawn') - return - } - } else if (clientVer < Options.MIN_VERSION) { - this.sendDisconnectStatus('failed_client') + if (!this.handleClientProtocolVersion(clientVer)) { return } @@ -125,15 +148,24 @@ class Player extends Connection { var des = this.server.deserializer.parsePacketBuffer(packet) // eslint-disable-line } catch (e) { this.disconnect('Server error') - fs.writeFile(`packetdump_${this.connection.address}_${Date.now()}.bin`, packet) + debug('Dropping packet from', this.connection.address, e) return } this.inLog?.(des.data.name, serialize(des.data.params).slice(0, 200)) switch (des.data.name) { + // This is the first packet on 1.19.30 & above + case 'request_network_settings': + if (this.handleClientProtocolVersion(des.data.params.client_protocol)) { + this.sendNetworkSettings() + this.compressionLevel = this.server.compressionLevel + } + return + // Below 1.19.30, this is the first packet. case 'login': this.onLogin(des) + if (!this._sentNetworkSettings) this.sendNetworkSettings() return case 'client_to_server_handshake': // Emit the 'join' event diff --git a/src/transforms/framer.js b/src/transforms/framer.js index c31b8d8..2452f01 100644 --- a/src/transforms/framer.js +++ b/src/transforms/framer.js @@ -3,30 +3,48 @@ const zlib = require('zlib') // Concatenates packets into one batch packet, and adds length prefixs. class Framer { - constructor (compressionLevel) { + constructor (compressor, compressionLevel, compressionThreshold) { // Encoding this.packets = [] + this.compressor = compressor || 'none' this.compressionLevel = compressionLevel + this.compressionThreshold = compressionThreshold } - static decode (buf) { + // No compression in base class + compress (buffer) { + switch (this.compressor) { + case 'deflate': return zlib.deflateRawSync(buffer, { level: this.compressionLevel }) + case 'snappy': throw Error('Snappy compression not implemented') + case 'none': return buffer + } + } + + static decompress (algorithm, buffer) { + try { + switch (algorithm) { + case 'deflate': return zlib.inflateRawSync(buffer, { chunkSize: 512000 }) + case 'snappy': throw Error('Snappy compression not implemented') + case 'none': return buffer + default: throw Error('Unknown compression type ' + this.compressor) + } + } catch { + return buffer + } + } + + static decode (compressor, buf) { // Read header if (buf[0] !== 0xfe) throw Error('bad batch packet header ' + buf[0]) const buffer = buf.slice(1) - - // Decode the payload with 512kb buffer - try { - const inflated = zlib.inflateRawSync(buffer, { chunkSize: 512000 }) - return Framer.getPackets(inflated) - } catch (e) { // Try to decode without compression - return Framer.getPackets(buffer) - } + const decompressed = this.decompress(compressor, buffer) + return Framer.getPackets(decompressed) } encode () { const buf = Buffer.concat(this.packets) - const def = zlib.deflateRawSync(buf, { level: this.compressionLevel }) - return Buffer.concat([Buffer.from([0xfe]), def]) + const compressed = (buf.length > this.compressionThreshold) ? this.compress(buf) : buf + return Buffer.concat([Buffer.from([0xfe]), compressed]) } addEncodedPacket (chunk) { @@ -71,4 +89,4 @@ class Framer { } } -module.exports = Framer +module.exports = { Framer } diff --git a/src/transforms/serializer.js b/src/transforms/serializer.js index c763599..f6b1918 100644 --- a/src/transforms/serializer.js +++ b/src/transforms/serializer.js @@ -4,12 +4,7 @@ const { join } = require('path') class Parser extends FullPacketParser { parsePacketBuffer (buffer) { - try { - return super.parsePacketBuffer(buffer) - } catch (e) { - console.error('While decoding', buffer.toString('hex')) - throw e - } + return super.parsePacketBuffer(buffer) } verify (deserialized, serializer) { diff --git a/test/internal.js b/test/internal.js index 51e1f71..15131a9 100644 --- a/test/internal.js +++ b/test/internal.js @@ -201,9 +201,9 @@ async function requestChunks (version, x, z, radius) { } async function timedTest (version, timeout = 1000 * 220) { - await waitFor((res) => { + await waitFor((resolve, reject) => { // mocha eats up stack traces... - startTest(version, res).catch(console.error) + startTest(version, resolve).catch(reject) }, timeout, () => { throw Error('timed out') }) diff --git a/test/proxy.js b/test/proxy.js index a1df25e..d6b169f 100644 --- a/test/proxy.js +++ b/test/proxy.js @@ -1,7 +1,7 @@ const { createClient, Server, Relay } = require('bedrock-protocol') const { sleep, waitFor } = require('../src/datatypes/util') -function proxyTest (version, raknetBackend = 'raknet-node', timeout = 1000 * 40) { +function proxyTest (version, raknetBackend = 'raknet-native', timeout = 1000 * 40) { console.log('with raknet backend', raknetBackend) return waitFor(async res => { const SERVER_PORT = 19000 + ((Math.random() * 100) | 0) @@ -60,8 +60,8 @@ function proxyTest (version, raknetBackend = 'raknet-node', timeout = 1000 * 40) }, timeout, () => { throw Error('timed out') }) } -if (!module.parent) { - proxyTest('1.16.220', 'raknet-native') -} +// if (!module.parent) { +// proxyTest('1.16.220', 'raknet-native') +// } module.exports = { proxyTest } diff --git a/tools/startVanillaServer.js b/tools/startVanillaServer.js index 4740bef..be63b47 100644 --- a/tools/startVanillaServer.js +++ b/tools/startVanillaServer.js @@ -87,6 +87,8 @@ function run (inheritStdout = true) { return cp.spawn(exe, inheritStdout ? { stdio: 'inherit' } : {}) } +let lastHandle + // Run the server async function startServer (version, onStart, options = {}) { const os = process.platform === 'win32' ? 'win' : process.platform @@ -95,7 +97,7 @@ async function startServer (version, onStart, options = {}) { } await download(os, version, options.path) configure(options) - const handle = run(!onStart) + const handle = lastHandle = run(!onStart) handle.on('error', (...a) => { console.warn('*** THE MINECRAFT PROCESS CRASHED ***', a) handle.kill('SIGKILL') @@ -126,9 +128,12 @@ async function startServerAndWait (version, withTimeout, options) { async function startServerAndWait2 (version, withTimeout, options) { try { - return await startServerAndWait(version, withTimeout, options) + return await startServerAndWait(version, 1000 * 60, options) } catch (e) { - console.log(e, 'tring once more to start server...') + console.log(e) + console.log('^ Tring once more to start server in 10 seconds...') + lastHandle?.kill() + await new Promise(resolve => setTimeout(resolve, 10000)) process.chdir(__dirname) fs.rmSync('bds-' + version, { recursive: true }) return await startServerAndWait(version, withTimeout, options)