Compression, batching and protocol fixes

This commit is contained in:
unknown 2025-01-29 01:00:31 +00:00 committed by LucienHH
commit 300b5eacb3
11 changed files with 131 additions and 47 deletions

View file

@ -1,4 +1,4 @@
process.env.DEBUG = '*'
process.env.DEBUG = 'minecraft-protocol'
const readline = require('readline')
const { createClient } = require('bedrock-protocol')

View file

@ -27,7 +27,6 @@ class Client extends Connection {
this.compressionAlgorithm = this.versionGreaterThanOrEqualTo('1.19.30') ? 'none' : 'deflate'
this.compressionThreshold = 512
this.compressionLevel = this.options.compressionLevel
this.batchHeader = 0xfe
if (isDebug) {
this.inLog = (...args) => debug('C ->', ...args)
@ -57,12 +56,12 @@ class Client extends Connection {
if (this.options.transport === 'nethernet') {
this.connection = new NethernetClient({ networkId })
this.batchHeader = []
this.batchHeader = null
this.disableEncryption = true
} else if (this.options.transport === 'raknet') {
const { RakClient } = initRaknet(this.options.raknetBackend)
this.connection = new RakClient({ useWorkers: this.options.useRaknetWorkers, host, port }, this)
this.batchHeader = [0xfe]
this.batchHeader = 0xfe
this.disableEncryption = false
}

View file

@ -8,7 +8,7 @@ class NethernetClient {
this.onCloseConnection = () => { }
this.onEncapsulated = () => { }
this.nethernet = new Client({ ...options })
this.nethernet = new Client(options.networkId)
this.nethernet.on('connected', (client) => {
this.onConnected(client)
@ -28,12 +28,13 @@ class NethernetClient {
}
sendReliable (data) {
this.nethernet.connection.sendReliable(data)
this.nethernet.send(data)
}
async ping (timeout = 10000) {
this.nethernet.ping()
return waitFor((done) => {
this.nethernet.ping().then(data => { done(data) })
this.nethernet.once('pong', (ret) => { done(ret.data) })
}, timeout, () => {
throw new Error('Ping timed out')
})
@ -49,7 +50,7 @@ class NethernetServer {
this.onOpenConnection = () => { }
this.onCloseConnection = () => { }
this.onEncapsulated = () => { }
this.onClose = () => {}
this.onClose = () => { }
this.updateAdvertisement = () => {
this.nethernet.setAdvertisement(server.getAdvertisement().toBuffer())
}
@ -57,6 +58,9 @@ class NethernetServer {
this.nethernet = new Server({ ...options })
this.nethernet.on('openConnection', (client) => {
client.sendReliable = function (buffer) {
return this.send(buffer)
}
this.onOpenConnection(client)
})

View file

@ -1,10 +1,9 @@
const dgram = require('dgram')
const dgram = require('node:dgram')
const { write } = require('sdp-transform')
const { EventEmitter } = require('events')
const { EventEmitter } = require('node:events')
const { RTCIceCandidate, RTCPeerConnection } = require('werift')
const { Connection } = require('./connection')
const { getRandomUint64 } = require('../datatypes/util')
const { SignalType, SignalStructure } = require('./signalling')
const { getBroadcastAddress } = require('./net')
@ -14,29 +13,33 @@ const { MessagePacket } = require('./discovery/packets/MessagePacket')
const { ResponsePacket } = require('./discovery/packets/ResponsePacket')
const { decrypt, encrypt, calculateChecksum } = require('./discovery/crypto')
const { getRandomUint64 } = require('./util')
const debug = require('debug')('minecraft-protocol')
const PORT = 7551
const BROADCAST_ADDRESS = getBroadcastAddress()
class Client extends EventEmitter {
constructor (options = {}) {
constructor (networkId) {
super()
this.options = options
this.serverNetworkId = networkId
this.networkId = getRandomUint64()
debug('C: Generated networkId:', this.networkId)
this.connectionId = getRandomUint64()
this.targetNetworkId = options.networkId
debug('C: Generated connectionId:', this.connectionId)
this.socket = dgram.createSocket('udp4')
this.socket.on('message', (buffer, rinfo) => {
debug('C: Received message from', rinfo.address, ':', rinfo.port)
this.processPacket(buffer, rinfo)
})
this.responses = new Map()
this.addresses = new Map()
this.credentials = []
@ -44,21 +47,26 @@ class Client extends EventEmitter {
this.signalHandler = this.sendDiscoveryMessage
this.sendDiscoveryRequest()
debug('C: Sent initial discovery request')
this.pingInterval = setInterval(() => {
debug('C: Sending periodic discovery request')
this.sendDiscoveryRequest()
}, 2000)
}
async handleCandidate (signal) {
debug('C: Handling ICE candidate signal:', signal)
await this.rtcConnection.addIceCandidate(new RTCIceCandidate({ candidate: signal.data }))
}
async handleAnswer (signal) {
debug('C: Handling answer signal:', signal)
await this.rtcConnection.setRemoteDescription({ type: 'answer', sdp: signal.data })
}
async createOffer () {
debug('C: Creating RTC offer')
this.rtcConnection = new RTCPeerConnection({
iceServers: this.credentials
})
@ -69,6 +77,7 @@ class Client extends EventEmitter {
this.rtcConnection.onicecandidate = (e) => {
if (e.candidate) {
debug('C: Collected ICE candidate:', e.candidate.candidate)
candidates.push(e.candidate.candidate)
}
}
@ -78,19 +87,21 @@ class Client extends EventEmitter {
this.rtcConnection.createDataChannel('UnreliableDataChannel')
)
this.connection.reliable.onopen = () => { this.emit('connected', this.connection) }
this.rtcConnection.onconnectionstatechange = () => {
const state = this.rtcConnection.connectionState
if (state === 'connected') this.emit('connected', this.connection)
if (state === 'disconnected') this.emit('closeConnection', this.connectionId, 'disconnected')
debug('C: Connection state changed:', state)
if (state === 'disconnected') this.emit('disconnect', this.connectionId, 'disconnected')
}
await this.rtcConnection.createOffer()
const ice = this.rtcConnection.iceTransports[0]
const dtls = this.rtcConnection.dtlsTransports[0]
if (!ice || !dtls) {
debug('C: Failed to create ICE or DTLS transports')
throw new Error('Failed to create transports')
}
@ -98,6 +109,7 @@ class Client extends EventEmitter {
const dtlsParams = dtls.localParameters
if (dtlsParams.fingerprints.length === 0) {
debug('C: No DTLS fingerprints available')
throw new Error('local DTLS parameters has no fingerprints')
}
@ -139,54 +151,67 @@ class Client extends EventEmitter {
await this.rtcConnection.setLocalDescription({ type: 'offer', sdp: desc })
debug('C: Local SDP set:', desc)
this.signalHandler(
new SignalStructure(SignalType.ConnectRequest, this.connectionId, desc, this.targetNetworkId)
new SignalStructure(SignalType.ConnectRequest, this.connectionId, desc, this.serverNetworkId)
)
for (const candidate of candidates) {
debug('C: Sending ICE candidate signal:', candidate)
this.signalHandler(
new SignalStructure(SignalType.CandidateAdd, this.connectionId, candidate, this.targetNetworkId)
new SignalStructure(SignalType.CandidateAdd, this.connectionId, candidate, this.serverNetworkId)
)
}
}
processPacket (buffer, rinfo) {
debug('C: Processing packet from', rinfo.address, ':', rinfo.port)
if (buffer.length < 32) {
debug('C: Received packet is too short')
throw new Error('Packet is too short')
}
const decryptedData = decrypt(buffer.slice(32))
const checksum = calculateChecksum(decryptedData)
if (Buffer.compare(buffer.slice(0, 32), checksum) !== 0) {
debug('C: Checksum mismatch for packet from', rinfo.address)
throw new Error('Checksum mismatch')
}
const packetType = decryptedData.readUInt16LE(2)
debug('C: Packet type:', packetType)
switch (packetType) {
case PACKET_TYPE.DISCOVERY_REQUEST:
debug('C: Received DISCOVERY_REQUEST packet')
break
case PACKET_TYPE.DISCOVERY_RESPONSE:
debug('C: Received DISCOVERY_RESPONSE packet')
this.handleResponse(new ResponsePacket(decryptedData).decode(), rinfo)
break
case PACKET_TYPE.DISCOVERY_MESSAGE:
debug('C: Received DISCOVERY_MESSAGE packet')
this.handleMessage(new MessagePacket(decryptedData).decode())
break
default:
debug('C: Unknown packet type:', packetType)
throw new Error('Unknown packet type')
}
}
handleResponse (packet, rinfo) {
debug('C: Handling discovery response from', rinfo.address, 'with data:', packet)
this.addresses.set(packet.senderId, rinfo)
this.responses.set(packet.senderId, packet.data)
this.emit('discoveryResponse', packet)
this.emit('pong', packet)
}
handleMessage (packet) {
debug('C: Handling discovery message:', packet)
if (packet.data === 'Ping') {
debug('C: Ignoring ping message')
return
}
@ -194,21 +219,26 @@ class Client extends EventEmitter {
signal.networkId = packet.senderId
debug('C: Processing signal:', signal)
this.handleSignal(signal)
}
handleSignal (signal) {
debug('C: Handling signal of type:', signal.type)
switch (signal.type) {
case SignalType.ConnectResponse:
debug('C: Handling ConnectResponse signal')
this.handleAnswer(signal)
break
case SignalType.CandidateAdd:
debug('C: Handling CandidateAdd signal')
this.handleCandidate(signal)
break
}
}
sendDiscoveryRequest () {
debug('C: Sending discovery request')
const requestPacket = new RequestPacket()
requestPacket.senderId = this.networkId
@ -223,8 +253,14 @@ class Client extends EventEmitter {
}
sendDiscoveryMessage (signal) {
debug('C: Sending discovery message for signal:', signal)
const rinfo = this.addresses.get(signal.networkId)
if (!rinfo) {
debug('C: No address found for networkId:', signal.networkId)
return
}
const messagePacket = new MessagePacket()
messagePacket.senderId = this.networkId
@ -240,31 +276,30 @@ class Client extends EventEmitter {
}
async connect () {
debug('C: Initiating connection')
this.running = true
await this.createOffer()
}
async ping () {
this.running = true
send (buffer) {
this.connection.send(buffer)
}
return new Promise((resolve, reject) => {
this.on('discoveryResponse', (packet) => {
if (packet.senderId === this.targetNetworkId) {
resolve(packet.data)
}
})
})
ping () {
debug('C: Sending ping')
this.sendDiscoveryRequest()
}
close (reason) {
debug('C: Closing client with reason:', reason)
if (!this.running) return
clearInterval(this.pingInterval)
this.connection?.close()
setTimeout(() => this.socket.close(), 100)
this.connection = null
this.running = false
this.emit('disconnect', reason)
this.removeAllListeners()
}
}

View file

@ -61,8 +61,8 @@ class Connection {
this.buf = null
}
sendReliable (data) {
if (!this.reliable) {
send (data) {
if (!this.reliable || this.reliable.readyState !== 'open') {
throw new Error('Reliable data channel is not available')
}

View file

@ -1,4 +1,4 @@
const crypto = require('crypto')
const crypto = require('node:crypto')
const appIdBuffer = Buffer.allocUnsafe(8)
appIdBuffer.writeBigUInt64LE(BigInt(0xdeadbeef))

View file

@ -1,4 +1,4 @@
const os = require('os')
const os = require('node:os')
function getBroadcastAddress () {
const interfaces = os.networkInterfaces()

View file

@ -1,5 +1,5 @@
const dgram = require('dgram')
const { EventEmitter } = require('events')
const dgram = require('node:dgram')
const { EventEmitter } = require('node:events')
const { RTCIceCandidate, RTCPeerConnection } = require('werift')
const { Connection } = require('./connection')
@ -10,6 +10,8 @@ const { MessagePacket } = require('./discovery/packets/MessagePacket')
const { ResponsePacket } = require('./discovery/packets/ResponsePacket')
const { decrypt, encrypt, calculateChecksum } = require('./discovery/crypto')
const { getRandomUint64 } = require('./util')
const debug = require('debug')('minecraft-protocol')
class Server extends EventEmitter {
@ -18,22 +20,26 @@ class Server extends EventEmitter {
this.options = options
this.networkId = options.networkId
this.networkId = options.networkId ?? getRandomUint64()
this.connections = new Map()
debug('S: Server initialised with networkId: %s', this.networkId)
}
async handleCandidate (signal) {
const conn = this.connections.get(signal.connectionId)
if (conn) {
debug('S: Adding ICE candidate for connectionId: %s', signal.connectionId)
await conn.rtcConnection.addIceCandidate(new RTCIceCandidate({ candidate: signal.data }))
} else {
debug('Received candidate for unknown connection', signal)
debug('S: Received candidate for unknown connection', signal)
}
}
async handleOffer (signal, respond, credentials = []) {
debug('S: Handling offer for connectionId: %s', signal.connectionId)
const rtcConnection = new RTCPeerConnection({
iceServers: credentials
})
@ -44,6 +50,7 @@ class Server extends EventEmitter {
rtcConnection.onicecandidate = (e) => {
if (e.candidate) {
debug('S: ICE candidate generated for connectionId: %s', signal.connectionId)
respond(
new SignalStructure(SignalType.CandidateAdd, signal.connectionId, e.candidate.candidate, signal.networkId)
)
@ -51,21 +58,24 @@ class Server extends EventEmitter {
}
rtcConnection.ondatachannel = ({ channel }) => {
debug('S: Data channel established with label: %s', channel.label)
if (channel.label === 'ReliableDataChannel') connection.setChannels(channel)
if (channel.label === 'UnreliableDataChannel') connection.setChannels(null, channel)
}
rtcConnection.onconnectionstatechange = () => {
const state = rtcConnection.connectionState
debug('S: Connection state changed for connectionId: %s, state: %s', signal.connectionId, state)
if (state === 'connected') this.emit('openConnection', connection)
if (state === 'disconnected') this.emit('closeConnection', signal.connectionId, 'disconnected')
}
await rtcConnection.setRemoteDescription({ type: 'offer', sdp: signal.data })
debug('S: Remote description set for connectionId: %s', signal.connectionId)
const answer = await rtcConnection.createAnswer()
await rtcConnection.setLocalDescription(answer)
debug('S: Local description set (answer) for connectionId: %s', signal.connectionId)
respond(
new SignalStructure(SignalType.ConnectResponse, signal.connectionId, answer.sdp, signal.networkId)
@ -73,7 +83,9 @@ class Server extends EventEmitter {
}
processPacket (buffer, rinfo) {
debug('S: Processing packet from %s:%s', rinfo.address, rinfo.port)
if (buffer.length < 32) {
debug('S: Packet is too short')
throw new Error('Packet is too short')
}
@ -82,33 +94,42 @@ class Server extends EventEmitter {
const checksum = calculateChecksum(decryptedData)
if (Buffer.compare(buffer.slice(0, 32), checksum) !== 0) {
debug('S: Checksum mismatch')
throw new Error('Checksum mismatch')
}
const packetType = decryptedData.readUInt16LE(2)
debug('S: Packet type: %s', packetType)
switch (packetType) {
case PACKET_TYPE.DISCOVERY_REQUEST:
debug('S: Handling discovery request')
this.handleRequest(rinfo)
break
case PACKET_TYPE.DISCOVERY_RESPONSE:
debug('S: Discovery response received (ignored)')
break
case PACKET_TYPE.DISCOVERY_MESSAGE:
debug('S: Handling discovery message')
this.handleMessage(new MessagePacket(decryptedData).decode(), rinfo)
break
default:
debug('S: Unknown packet type: %s', packetType)
throw new Error('Unknown packet type')
}
}
setAdvertisement (buffer) {
debug('S: Setting advertisement data')
this.advertisement = buffer
}
handleRequest (rinfo) {
debug('S: Handling request from %s:%s', rinfo.address, rinfo.port)
const data = this.advertisement
if (!data) {
debug('S: Advertisement data not set')
return new Error('Advertisement data not set yet')
}
@ -124,14 +145,18 @@ class Server extends EventEmitter {
const packetToSend = Buffer.concat([calculateChecksum(buf), encrypt(buf)])
this.socket.send(packetToSend, rinfo.port, rinfo.address)
debug('S: Response sent to %s:%s', rinfo.address, rinfo.port)
}
handleMessage (packet, rinfo) {
debug('S: Handling message from %s:%s', rinfo.address, rinfo.port)
if (packet.data === 'Ping') {
debug('S: Ping message received')
return
}
const respond = (signal) => {
debug('S: Responding with signal: %o', signal)
const messagePacket = new MessagePacket()
messagePacket.senderId = this.networkId
@ -144,6 +169,7 @@ class Server extends EventEmitter {
const packetToSend = Buffer.concat([calculateChecksum(buf), encrypt(buf)])
this.socket.send(packetToSend, rinfo.port, rinfo.address)
debug('S: Signal response sent to %s:%s', rinfo.address, rinfo.port)
}
const signal = SignalStructure.fromString(packet.data)
@ -152,18 +178,22 @@ class Server extends EventEmitter {
switch (signal.type) {
case SignalType.ConnectRequest:
debug('S: Handling ConnectRequest signal')
this.handleOffer(signal, respond)
break
case SignalType.CandidateAdd:
debug('S: Handling CandidateAdd signal')
this.handleCandidate(signal)
break
}
}
async listen () {
debug('S: Starting server')
this.socket = dgram.createSocket('udp4')
this.socket.on('message', (buffer, rinfo) => {
debug('S: Message received from %s:%s', rinfo.address, rinfo.port)
this.processPacket(buffer, rinfo)
})
@ -171,18 +201,25 @@ class Server extends EventEmitter {
const failFn = e => reject(e)
this.socket.once('error', failFn)
this.socket.bind(7551, () => {
debug('S: Server is listening on port 7551')
this.socket.removeListener('error', failFn)
resolve(true)
})
})
}
send (buffer) {
this.connection.send(buffer)
}
close (reason) {
debug('S: Closing server: %s', reason)
for (const conn of this.connections.values()) {
conn.close()
}
this.socket.close(() => {
debug('S: Server closed')
this.emit('close', reason)
this.removeAllListeners()
})

10
src/nethernet/util.js Normal file
View file

@ -0,0 +1,10 @@
const getRandomUint64 = () => {
const high = Math.floor(Math.random() * 0xFFFFFFFF)
const low = Math.floor(Math.random() * 0xFFFFFFFF)
return (BigInt(high) << 32n) | BigInt(low)
}
module.exports = {
getRandomUint64
}

View file

@ -17,12 +17,12 @@ class Server extends EventEmitter {
if (this.options.transport === 'nethernet') {
this.transportServer = require('./nethernet').NethernetServer
this.advertisement = new NethernetServerAdvertisement(this.options.motd, this.options.version)
this.batchHeader = []
this.batchHeader = null
this.disableEncryption = true
} else if (this.options.transport === 'raknet') {
this.transportServer = require('./rak')(this.options.raknetBackend).RakServer
this.advertisement = new ServerAdvertisement(this.options.motd, this.options.port, this.options.version)
this.batchHeader = [0xfe]
this.batchHeader = 0xfe
this.disableEncryption = false
} else {
throw new Error(`Unsupported transport: ${this.options.transport} (nethernet, raknet)`)
@ -36,7 +36,6 @@ class Server extends EventEmitter {
this.clients = {}
this.clientCount = 0
this.conLog = debug
this.batchHeader = 0xfe
this.setCompressor(this.options.compressionAlgorithm, this.options.compressionLevel, this.options.compressionThreshold)
}
@ -132,7 +131,7 @@ class Server extends EventEmitter {
async listen () {
const { host, port, maxPlayers } = this.options
// eslint-disable-next-line new-cap
this.transport = new this.transportServer({ host, port, networkId: this.options.networkId }, this)
this.transport = new this.transportServer({ host, port, networkId: this.options.networkId, maxPlayers }, this)
try {
await this.transport.listen()

View file

@ -41,7 +41,7 @@ class Framer {
static decode (client, buf) {
// Read header
if (this.batchHeader && buf[0] !== this.batchHeader) throw Error(`bad batch packet header, received: ${buf[0]}, expected: ${this.batchHeader}`)
const buffer = buf.slice(1)
const buffer = buf.slice(this.batchHeader ? 1 : 0)
// Decompress
let decompressed
if (client.features.compressorInHeader && client.compressionReady) {