1.19.30 support, improve error handling and server pong data (#284)

* Update server advertisement

* 1.19.30 protocol support
* Handle configurable compressor
* Support updated 1.19.30 login flow with NetworkSettings
* Improve serialization error handling on client

* refactor compressor handling

* Fix client on older versions, fix internal error handling

* Improve error handling

* Log console connection errors; use raknet-native for proxy test
This commit is contained in:
extremeheat 2022-09-24 13:53:26 -04:00 committed by GitHub
commit f88c8d0bc4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 205 additions and 68 deletions

View file

@ -8,6 +8,7 @@ const initRaknet = require('./rak')
const { KeyExchange } = require('./handshake/keyExchange')
const Login = require('./handshake/login')
const LoginVerify = require('./handshake/loginVerify')
const fs = require('fs')
const debugging = false
@ -22,6 +23,10 @@ class Client extends Connection {
this.startGameData = {}
this.clientRuntimeId = null
// Start off without compression on 1.19.30, zlib on below
this.compressionAlgorithm = this.versionGreaterThanOrEqualTo('1.19.30') ? 'none' : 'deflate'
this.compressionThreshold = 512
this.compressionLevel = this.options.compressionLevel
if (isDebug) {
this.inLog = (...args) => debug('C ->', ...args)
@ -46,7 +51,7 @@ class Client extends Connection {
const { RakClient } = initRaknet(this.options.raknetBackend)
const host = this.options.host
const port = this.options.port
this.connection = new RakClient({ useWorkers: this.options.useRaknetWorkers, host, port })
this.connection = new RakClient({ useWorkers: this.options.useRaknetWorkers, host, port }, this)
this.emit('connect_allowed')
}
@ -76,7 +81,7 @@ class Client extends Connection {
onEncapsulated = (encapsulated, inetAddr) => {
const buffer = Buffer.from(encapsulated.buffer)
this.handle(buffer)
process.nextTick(() => this.handle(buffer))
}
async ping () {
@ -90,8 +95,18 @@ class Client extends Connection {
_connect = async (sessionData) => {
debug('[client] connecting to', this.options.host, this.options.port, sessionData, this.connection)
this.connection.onConnected = () => this.sendLogin()
this.connection.onCloseConnection = (reason) => this.close()
this.connection.onConnected = () => {
this.status = ClientStatus.Connecting
if (this.versionGreaterThanOrEqualTo('1.19.30')) {
this.queue('request_network_settings', { client_protocol: this.options.protocolVersion })
} else {
this.sendLogin()
}
}
this.connection.onCloseConnection = (reason) => {
if (this.status === ClientStatus.Disconnected) this.conLog?.(`Server closed connection: ${reason}`)
this.close()
}
this.connection.onEncapsulated = this.onEncapsulated
this.connection.connect()
@ -103,6 +118,11 @@ class Client extends Connection {
}, this.options.connectTimeout || 9000)
}
updateCompressorSettings (packet) {
this.compressionAlgorithm = packet.compression_algorithm || 'deflate'
this.compressionThreshold = packet.compression_threshold
}
sendLogin () {
this.status = ClientStatus.Authenticating
this.createClientChain(null, this.options.offline)
@ -174,6 +194,15 @@ class Client extends Connection {
try {
var des = this.deserializer.parsePacketBuffer(packet) // eslint-disable-line
} catch (e) {
// Dump information about the packet only if user is not handling error event.
if (this.listenerCount('error') === 0) {
if (packet.length > 1000) {
fs.writeFileSync('packetReadError.txt', packet.toString('hex'))
console.log(`Deserialization failure for packet 0x${packet.slice(0, 1).toString('hex')}. Packet buffer saved in ./packetReadError.txt as buffer was too large (${packet.length} bytes).`)
} else {
console.log('Read failure for 0x' + packet.slice(0, 1).toString('hex'), packet.slice(0, 1000))
}
}
this.emit('error', e)
return
}
@ -193,6 +222,12 @@ class Client extends Connection {
case 'server_to_client_handshake':
this.emit('client.server_handshake', des.data.params)
break
case 'network_settings':
this.updateCompressorSettings(des.data.params)
if (this.status === ClientStatus.Connecting) {
this.sendLogin()
}
break
case 'disconnect': // Client kicked
this.emit(des.data.name, des.data.params) // Emit before we kill all listeners.
this.onDisconnectRequest(des.data.params)

View file

@ -1,14 +1,15 @@
const Framer = require('./transforms/framer')
const cipher = require('./transforms/encryption')
const { EventEmitter } = require('events')
const { Versions } = require('./options')
const debug = require('debug')('minecraft-protocol')
const { Framer } = require('./transforms/framer')
const ClientStatus = {
Disconnected: 0,
Authenticating: 1, // Handshaking
Initializing: 2, // Authed, need to spawn
Initialized: 3 // play_status spawn sent by server, client responded with SetPlayerInit packet
Connecting: 1,
Authenticating: 2, // Handshaking
Initializing: 3, // Authed, need to spawn
Initialized: 4 // play_status spawn sent by server, client responded with SetPlayerInit packet
}
class Connection extends EventEmitter {
@ -34,6 +35,10 @@ class Connection extends EventEmitter {
return this.options.protocolVersion > (typeof version === 'string' ? Versions[version] : version)
}
versionGreaterThanOrEqualTo (version) {
return this.options.protocolVersion >= (typeof version === 'string' ? Versions[version] : version)
}
startEncryption (iv) {
this.encryptionEnabled = true
this.inLog?.('Started encryption', this.sharedSecret, iv)
@ -60,7 +65,7 @@ class Connection extends EventEmitter {
write (name, params) {
this.outLog?.(name, params)
if (name === 'start_game') this.updateItemPalette(params.itemstates)
const batch = new Framer(this.compressionLevel)
const batch = new Framer(this.compressionAlgorithm, this.compressionLevel, this.compressionThreshold)
const packet = this.serializer.createPacketBuffer({ name, params })
batch.addEncodedPacket(packet)
@ -86,7 +91,7 @@ class Connection extends EventEmitter {
_tick () {
if (this.sendQ.length) {
const batch = new Framer(this.compressionLevel)
const batch = new Framer(this.compressionAlgorithm, this.compressionLevel, this.compressionThreshold)
batch.addEncodedPackets(this.sendQ)
this.sendQ = []
this.sendIds = []
@ -110,7 +115,7 @@ class Connection extends EventEmitter {
*/
sendBuffer (buffer, immediate = false) {
if (immediate) {
const batch = new Framer(this.compressionLevel)
const batch = new Framer(this.compressionAlgorithm, this.compressionLevel, this.compressionThreshold)
batch.addEncodedPacket(buffer)
if (this.encryptionEnabled) {
this.sendEncryptedBatch(batch)
@ -162,7 +167,7 @@ class Connection extends EventEmitter {
if (this.encryptionEnabled) {
this.decrypt(buffer.slice(1))
} else {
const packets = Framer.decode(buffer)
const packets = Framer.decode(this.compressionAlgorithm, buffer)
for (const packet of packets) {
this.readPacket(packet)
}

View file

@ -23,11 +23,11 @@ function sleep (ms) {
async function waitFor (cb, withTimeout, onTimeout) {
let t
const ret = await Promise.race([
new Promise(resolve => cb(resolve)),
new Promise((resolve, reject) => cb(resolve, reject)),
new Promise(resolve => { t = setTimeout(() => resolve('timeout'), withTimeout) })
])
clearTimeout(t)
if (ret === 'timeout') onTimeout()
if (ret === 'timeout') await onTimeout()
return ret
}

View file

@ -21,7 +21,14 @@ const defaultOptions = {
// Specifies the raknet implementation to use
raknetBackend: 'raknet-native',
// If using JS implementation of RakNet, should we use workers? (This only affects the client)
useRaknetWorkers: true
useRaknetWorkers: true,
// server: What compression algorithm to use by default, either `none`, `deflate` or `snappy`
compressionAlgorithm: 'deflate',
// server and client: On Deflate, what compression level to use, between 1 and 9
compressionLevel: 7,
// server: If true, only compress if a payload is larger than compressionThreshold
compressionThreshold: 512
}
function validateOptions (options) {
@ -34,7 +41,6 @@ function validateOptions (options) {
if (options.protocolVersion < MIN_VERSION) {
throw new Error(`Protocol version < ${MIN_VERSION} : ${options.protocolVersion}, too old`)
}
this.compressionLevel = options.compressionLevel || 7
if (options.useNativeRaknet === true) options.raknetBackend = 'raknet-native'
if (options.useNativeRaknet === false) options.raknetBackend = 'jsp-raknet'
}

View file

@ -24,14 +24,15 @@ module.exports = (backend) => {
}
class RakNativeClient extends EventEmitter {
constructor (options) {
constructor (options, client) {
super()
this.connected = false
this.onConnected = () => { }
this.onCloseConnection = () => { }
this.onEncapsulated = () => { }
this.raknet = new Client(options.host, options.port, { protocolVersion: 10 })
const protocolVersion = client?.versionGreaterThanOrEqualTo('1.19.30') ? 11 : 10
this.raknet = new Client(options.host, options.port, { protocolVersion })
this.raknet.on('encapsulated', ({ buffer, address }) => {
if (this.connected) { // Discard packets that are queued to be sent to us after close
this.onEncapsulated(buffer, address)
@ -86,7 +87,7 @@ class RakNativeServer extends EventEmitter {
this.onEncapsulated = () => { }
this.raknet = new Server(options.host, options.port, {
maxConnections: options.maxPlayers || 3,
protocolVersion: 10,
protocolVersion: server.versionLessThan('1.19.30') ? 10 : 11,
message: server.getAdvertisement().toBuffer()
})
this.onClose = () => {}

View file

@ -16,18 +16,49 @@ class Server extends EventEmitter {
this.serializer = createSerializer(this.options.version)
this.deserializer = createDeserializer(this.options.version)
this.advertisement = new ServerAdvertisement(this.options.motd, this.options.version)
this.advertisement = new ServerAdvertisement(this.options.motd, this.options.port, this.options.version)
this.advertisement.playersMax = options.maxPlayers ?? 3
/** @type {Object<string, Player>} */
this.clients = {}
this.clientCount = 0
this.conLog = debug
this.setCompressor(this.options.compressionAlgorithm, this.options.compressionLevel, this.options.compressionThreshold)
}
setCompressor (algorithm, level = 1, threshold = 256) {
if (algorithm === 'none') {
this.compressionAlgorithm = 'none'
this.compressionLevel = 0
} else if (algorithm === 'deflate') {
this.compressionAlgorithm = 'deflate'
this.compressionLevel = level
this.compressionThreshold = threshold
} else if (algorithm === 'snappy') {
this.compressionAlgorithm = 'snappy'
this.compressionLevel = level
this.compressionThreshold = threshold
} else {
throw new Error(`Unknown compression algorithm ${algorithm}`)
}
}
validateOptions () {
Options.validateOptions(this.options)
}
versionLessThan (version) {
return this.options.protocolVersion < (typeof version === 'string' ? Options.Versions[version] : version)
}
versionGreaterThan (version) {
return this.options.protocolVersion > (typeof version === 'string' ? Options.Versions[version] : version)
}
versionGreaterThanOrEqualTo (version) {
return this.options.protocolVersion >= (typeof version === 'string' ? Options.Versions[version] : version)
}
onOpenConnection = (conn) => {
this.conLog('new connection', conn?.address)
const player = new Player(this, conn)
@ -51,7 +82,7 @@ class Server extends EventEmitter {
debug(`ignoring packet from unknown inet addr: ${address}`)
return
}
client.handle(buffer)
process.nextTick(() => client.handle(buffer))
}
getAdvertisement () {

View file

@ -6,24 +6,26 @@ class ServerAdvertisement {
playersOnline = 0
playersMax = 5
gamemode = 'Creative'
serverId = '0'
serverId = Date.now().toString()
gamemodeId = 1
port = undefined
portV4 = undefined
portV6 = undefined
constructor (obj, version = CURRENT_VERSION) {
constructor (obj, port, version = CURRENT_VERSION) {
if (obj?.name) obj.motd = obj.name
this.protocol = Versions[version]
this.version = version
this.portV4 = port
this.portV6 = port
Object.assign(this, obj)
}
fromString (str) {
const [header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, port, portV6] = str.split(';')
Object.assign(this, { header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, port, portV6 })
for (const numeric of ['playersOnline', 'playersMax', 'gamemodeId', 'port', 'portV6']) {
const [header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, portV4, portV6] = str.split(';')
Object.assign(this, { header, motd, protocol, version, playersOnline, playersMax, serverId, levelName, gamemode, gamemodeId, portV4, portV6 })
for (const numeric of ['playersOnline', 'playersMax', 'gamemodeId', 'portV4', 'portV6']) {
if (this[numeric] !== undefined) {
this[numeric] = parseInt(this[numeric])
this[numeric] = this[numeric] ? parseInt(this[numeric]) : null
}
}
return this
@ -39,13 +41,20 @@ class ServerAdvertisement {
this.playersMax,
this.serverId,
this.levelName,
this.gamemode
this.gamemode,
this.gamemodeId,
this.portV4,
this.portV6,
'0'
].join(';') + ';'
}
toBuffer (version) {
const str = this.toString(version)
return Buffer.concat([Buffer.from([0, str.length]), Buffer.from(str)])
const buf = Buffer.alloc(2 + str.length)
buf.writeUInt16BE(str.length, 0)
buf.write(str, 2)
return buf
}
}

View file

@ -4,7 +4,6 @@ const { serialize, isDebug } = require('./datatypes/util')
const { KeyExchange } = require('./handshake/keyExchange')
const Login = require('./handshake/login')
const LoginVerify = require('./handshake/loginVerify')
const fs = require('fs')
const debug = require('debug')('minecraft-protocol')
class Player extends Connection {
@ -15,7 +14,6 @@ class Player extends Connection {
this.deserializer = server.deserializer
this.connection = connection
this.options = server.options
this.compressionLevel = server.compressionLevel
KeyExchange(this, server, server.options)
Login(this, server, server.options)
@ -28,24 +26,49 @@ class Player extends Connection {
this.inLog = (...args) => debug('S ->', ...args)
this.outLog = (...args) => debug('S <-', ...args)
}
// Compression is server-wide
this.compressionAlgorithm = this.server.compressionAlgorithm
this.compressionLevel = this.server.compressionLevel
this.compressionThreshold = this.server.compressionThreshold
this._sentNetworkSettings = false // 1.19.30+
}
getUserData () {
return this.userData
}
sendNetworkSettings () {
this.write('network_settings', {
compression_threshold: this.server.compressionThreshold,
compression_algorithm: this.server.compressionAlgorithm,
client_throttle: false,
client_throttle_threshold: 0,
client_throttle_scalar: 0
})
this._sentNetworkSettings = true
}
handleClientProtocolVersion (clientVersion) {
if (this.server.options.protocolVersion) {
if (this.server.options.protocolVersion < clientVersion) {
this.sendDisconnectStatus('failed_spawn') // client too new
return false
}
} else if (clientVersion < Options.MIN_VERSION) {
this.sendDisconnectStatus('failed_client') // client too old
return false
}
return true
}
onLogin (packet) {
const body = packet.data
this.emit('loggingIn', body)
const clientVer = body.params.protocol_version
if (this.server.options.protocolVersion) {
if (this.server.options.protocolVersion < clientVer) {
this.sendDisconnectStatus('failed_spawn')
return
}
} else if (clientVer < Options.MIN_VERSION) {
this.sendDisconnectStatus('failed_client')
if (!this.handleClientProtocolVersion(clientVer)) {
return
}
@ -125,15 +148,24 @@ class Player extends Connection {
var des = this.server.deserializer.parsePacketBuffer(packet) // eslint-disable-line
} catch (e) {
this.disconnect('Server error')
fs.writeFile(`packetdump_${this.connection.address}_${Date.now()}.bin`, packet)
debug('Dropping packet from', this.connection.address, e)
return
}
this.inLog?.(des.data.name, serialize(des.data.params).slice(0, 200))
switch (des.data.name) {
// This is the first packet on 1.19.30 & above
case 'request_network_settings':
if (this.handleClientProtocolVersion(des.data.params.client_protocol)) {
this.sendNetworkSettings()
this.compressionLevel = this.server.compressionLevel
}
return
// Below 1.19.30, this is the first packet.
case 'login':
this.onLogin(des)
if (!this._sentNetworkSettings) this.sendNetworkSettings()
return
case 'client_to_server_handshake':
// Emit the 'join' event

View file

@ -3,30 +3,48 @@ const zlib = require('zlib')
// Concatenates packets into one batch packet, and adds length prefixs.
class Framer {
constructor (compressionLevel) {
constructor (compressor, compressionLevel, compressionThreshold) {
// Encoding
this.packets = []
this.compressor = compressor || 'none'
this.compressionLevel = compressionLevel
this.compressionThreshold = compressionThreshold
}
static decode (buf) {
// No compression in base class
compress (buffer) {
switch (this.compressor) {
case 'deflate': return zlib.deflateRawSync(buffer, { level: this.compressionLevel })
case 'snappy': throw Error('Snappy compression not implemented')
case 'none': return buffer
}
}
static decompress (algorithm, buffer) {
try {
switch (algorithm) {
case 'deflate': return zlib.inflateRawSync(buffer, { chunkSize: 512000 })
case 'snappy': throw Error('Snappy compression not implemented')
case 'none': return buffer
default: throw Error('Unknown compression type ' + this.compressor)
}
} catch {
return buffer
}
}
static decode (compressor, buf) {
// Read header
if (buf[0] !== 0xfe) throw Error('bad batch packet header ' + buf[0])
const buffer = buf.slice(1)
// Decode the payload with 512kb buffer
try {
const inflated = zlib.inflateRawSync(buffer, { chunkSize: 512000 })
return Framer.getPackets(inflated)
} catch (e) { // Try to decode without compression
return Framer.getPackets(buffer)
}
const decompressed = this.decompress(compressor, buffer)
return Framer.getPackets(decompressed)
}
encode () {
const buf = Buffer.concat(this.packets)
const def = zlib.deflateRawSync(buf, { level: this.compressionLevel })
return Buffer.concat([Buffer.from([0xfe]), def])
const compressed = (buf.length > this.compressionThreshold) ? this.compress(buf) : buf
return Buffer.concat([Buffer.from([0xfe]), compressed])
}
addEncodedPacket (chunk) {
@ -71,4 +89,4 @@ class Framer {
}
}
module.exports = Framer
module.exports = { Framer }

View file

@ -4,12 +4,7 @@ const { join } = require('path')
class Parser extends FullPacketParser {
parsePacketBuffer (buffer) {
try {
return super.parsePacketBuffer(buffer)
} catch (e) {
console.error('While decoding', buffer.toString('hex'))
throw e
}
return super.parsePacketBuffer(buffer)
}
verify (deserialized, serializer) {

View file

@ -201,9 +201,9 @@ async function requestChunks (version, x, z, radius) {
}
async function timedTest (version, timeout = 1000 * 220) {
await waitFor((res) => {
await waitFor((resolve, reject) => {
// mocha eats up stack traces...
startTest(version, res).catch(console.error)
startTest(version, resolve).catch(reject)
}, timeout, () => {
throw Error('timed out')
})

View file

@ -1,7 +1,7 @@
const { createClient, Server, Relay } = require('bedrock-protocol')
const { sleep, waitFor } = require('../src/datatypes/util')
function proxyTest (version, raknetBackend = 'raknet-node', timeout = 1000 * 40) {
function proxyTest (version, raknetBackend = 'raknet-native', timeout = 1000 * 40) {
console.log('with raknet backend', raknetBackend)
return waitFor(async res => {
const SERVER_PORT = 19000 + ((Math.random() * 100) | 0)
@ -60,8 +60,8 @@ function proxyTest (version, raknetBackend = 'raknet-node', timeout = 1000 * 40)
}, timeout, () => { throw Error('timed out') })
}
if (!module.parent) {
proxyTest('1.16.220', 'raknet-native')
}
// if (!module.parent) {
// proxyTest('1.16.220', 'raknet-native')
// }
module.exports = { proxyTest }

View file

@ -87,6 +87,8 @@ function run (inheritStdout = true) {
return cp.spawn(exe, inheritStdout ? { stdio: 'inherit' } : {})
}
let lastHandle
// Run the server
async function startServer (version, onStart, options = {}) {
const os = process.platform === 'win32' ? 'win' : process.platform
@ -95,7 +97,7 @@ async function startServer (version, onStart, options = {}) {
}
await download(os, version, options.path)
configure(options)
const handle = run(!onStart)
const handle = lastHandle = run(!onStart)
handle.on('error', (...a) => {
console.warn('*** THE MINECRAFT PROCESS CRASHED ***', a)
handle.kill('SIGKILL')
@ -126,9 +128,12 @@ async function startServerAndWait (version, withTimeout, options) {
async function startServerAndWait2 (version, withTimeout, options) {
try {
return await startServerAndWait(version, withTimeout, options)
return await startServerAndWait(version, 1000 * 60, options)
} catch (e) {
console.log(e, 'tring once more to start server...')
console.log(e)
console.log('^ Tring once more to start server in 10 seconds...')
lastHandle?.kill()
await new Promise(resolve => setTimeout(resolve, 10000))
process.chdir(__dirname)
fs.rmSync('bds-' + version, { recursive: true })
return await startServerAndWait(version, withTimeout, options)