diff --git a/src/ConnWorker.js b/src/ConnWorker.js new file mode 100644 index 0000000..f6d38d3 --- /dev/null +++ b/src/ConnWorker.js @@ -0,0 +1,64 @@ +const RakClient = require('@jsprismarine/raknet/client') +const { Worker, isMainThread, parentPort } = require('worker_threads') +const EncapsulatedPacket = require('@jsprismarine/raknet/protocol/encapsulated_packet') + +function connect(hostname, port) { + if (isMainThread) { + const worker = new Worker(__filename) + worker.postMessage({ type: 'connect', hostname, port }) + return worker + } +} + +var raknet + +function main() { + parentPort.on('message', (evt) => { + if (evt.type == 'connect') { + console.warn('-------- ', evt) + const { hostname, port } =evt + raknet = new RakClient(hostname, port) + + raknet.connect().then(() => { + console.log('Raknet Connected!') + }) + + raknet.on('connecting', () => { + console.log(`[client] connecting to ${hostname}/${port}`) + parentPort.postMessage('message', { type: 'connecting' }) + console.log('Raknet', raknet) + }) + + raknet.once('connected', (connection) => { + console.log(`[worker] connected!`) + globalThis.raknetConnection = connection + parentPort.postMessage({ type: 'connected' }) + }) + + raknet.on('encapsulated', (...args) => { + // console.log('-> ENCAP BUF', args) + setTimeout(() => { + parentPort.postMessage({ type: 'encapsulated', args }) + }, 100) + }) + + raknet.on('raw', (buffer, inetAddr) => { + console.log('Raw packet', buffer, inetAddr) + }) + } else if (evt.type == 'queueEncapsulated') { + console.log('SEND' , globalThis.raknetConnection, evt.packet) + + const sendPacket = new EncapsulatedPacket() + sendPacket.reliability = 0 + sendPacket.buffer = evt.packet + + globalThis.raknetConnection?.addEncapsulatedToQueue(sendPacket) + if (evt.immediate) { + globalThis.raknetConnection?.sendQueue() + } + } + }) +} + +if (!isMainThread) main() +module.exports = { connect } \ No newline at end of file diff --git a/src/client.js b/src/client.js index 8d21ead..b7b2ed5 100644 --- a/src/client.js +++ b/src/client.js @@ -1,12 +1,14 @@ const RakClient = require('@jsprismarine/raknet/client') const { Connection } = require('./connection') const { createDeserializer, createSerializer } = require('./transforms/serializer') +const ConnWorker = require('./ConnWorker') const { Encrypt } = require('./auth/encryption') const auth = require('./client/auth') const Options = require('./options') const fs = require('fs') const log = console.log +const useWorkers = true class Client extends Connection { constructor(options) { @@ -25,6 +27,7 @@ class Client extends Connection { } this.on('session', this.connect) + // this.on('decrypted', this.onDecryptedPacket) } validateOptions() { @@ -35,32 +38,46 @@ class Client extends Connection { onEncapsulated = (encapsulated, inetAddr) => { // log(inetAddr.address, ': Encapsulated', encapsulated) - const buffer = encapsulated.buffer + const buffer = Buffer.from(encapsulated.buffer) this.handle(buffer) } connect = async (sessionData) => { - console.log('Got session data', sessionData) + if (useWorkers) { + this.worker = ConnWorker.connect('127.0.0.1', 19132) + this.worker.on('message', (evt) => { + switch (evt.type) { + case 'connected': + this.sendLogin() + break + case 'encapsulated': + this.onEncapsulated(...evt.args) + break + } + }) - if (this.raknet) return + } else { + if (this.raknet) return - this.raknet = new RakClient('127.0.0.1', 19132) - await this.raknet.connect() + this.raknet = new RakClient('127.0.0.1', 19132) + await this.raknet.connect() - this.raknet.on('connecting', () => { - // console.log(`[client] connecting to ${hostname}/${port}`) - }) - this.raknet.on('connected', (connection) => { - console.log(`[client] connected!`) - this.connection = connection - this.sendLogin() - }) + this.raknet.on('connecting', () => { + // console.log(`[client] connecting to ${hostname}/${port}`) + }) + this.raknet.on('connected', (connection) => { + console.log(`[client] connected!`) + this.connection = connection + this.sendLogin() + }) - this.raknet.on('encapsulated', this.onEncapsulated) + this.raknet.on('encapsulated', this.onEncapsulated) + + this.raknet.on('raw', (buffer, inetAddr) => { + console.log('Raw packet', buffer, inetAddr) + }) + } - this.raknet.on('raw', (buffer, inetAddr) => { - console.log('Raw packet', buffer, inetAddr) - }) } sendLogin() { @@ -104,7 +121,7 @@ class Client extends Connection { readPacket(packet) { // console.log('packet', packet) const des = this.deserializer.parsePacketBuffer(packet) - console.log('->',des) + console.log('->', des) const pakData = { name: des.data.name, params: des.data.params } // console.info('->', JSON.stringify(pakData, (k,v) => typeof v == 'bigint' ? v.toString() : v)) switch (des.data.name) { @@ -115,12 +132,12 @@ class Client extends Connection { this.onDisconnectRequest(des.data.params) break case 'crafting_data': - fs.writeFileSync('crafting.json', JSON.stringify(des.data.params, (k,v) => typeof v == 'bigint' ? v.toString() : v)) + fs.writeFileSync('crafting.json', JSON.stringify(des.data.params, (k, v) => typeof v == 'bigint' ? v.toString() : v)) break case 'start_game': - fs.writeFileSync('start_game.json', JSON.stringify(des.data.params, (k,v) => typeof v == 'bigint' ? v.toString() : v)) + fs.writeFileSync('start_game.json', JSON.stringify(des.data.params, (k, v) => typeof v == 'bigint' ? v.toString() : v)) default: - // console.log('Sending to listeners') + // console.log('Sending to listeners') } this.emit(des.data.name, des.data.params) diff --git a/src/connection.js b/src/connection.js index f266ebf..58a51c8 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,5 +1,5 @@ const BinaryStream = require('@jsprismarine/jsbinaryutils').default -const BatchPacket = require('./BatchPacket') +const BatchPacket = require('./datatypes/BatchPacket') const cipher = require('./transforms/encryption') const { EventEmitter } = require('events') const EncapsulatedPacket = require('@jsprismarine/raknet/protocol/encapsulated_packet') @@ -48,12 +48,7 @@ class Connection extends EventEmitter { sendDecryptedBatch(batch) { const buf = batch.encode() // send to raknet - const sendPacket = new EncapsulatedPacket(); - sendPacket.reliability = 0; - sendPacket.buffer = buf - - this.connection.addEncapsulatedToQueue(sendPacket) - this.connection.sendQueue() + this.sendMCPE(buf, true) } sendEncryptedBatch(batch) { @@ -62,15 +57,26 @@ class Connection extends EventEmitter { this.encrypt(buf) } + sendMCPE(buffer, immediate) { + if (this.worker) { + console.log('-> buf', buffer) + this.worker.postMessage({ type: 'queueEncapsulated', packet: buffer, immediate }) + } else { + const sendPacket = new EncapsulatedPacket(); + sendPacket.reliability = 0 + sendPacket.buffer = buffer + this.connection.addEncapsulatedToQueue(sendPacket) + if (immediate) this.connection.sendQueue() + } + } + // These are callbacks called from encryption.js onEncryptedPacket = (buf) => { console.log('ENC BUF', buf) const packet = Buffer.concat([Buffer.from([0xfe]), buf]) // add header - const sendPacket = new EncapsulatedPacket(); - sendPacket.reliability = 0 - sendPacket.buffer = packet + console.log('Sending wrapped encrypted batch', packet) - this.connection.addEncapsulatedToQueue(sendPacket) + this.sendMCPE(packet) } onDecryptedPacket = (buf) => { @@ -87,7 +93,7 @@ class Connection extends EventEmitter { handle(buffer) { // handle encapsulated if (buffer[0] == 0xfe) { // wrapper if (this.encryptionEnabled) { - console.log('Reading encrypted packet', buffer) + console.trace('Reading encrypted packet', buffer.toString('hex')) this.decrypt(buffer.slice(1)) } else { const stream = new BinaryStream(buffer) @@ -100,7 +106,7 @@ class Connection extends EventEmitter { } } } - console.log('[client] procesed ', buffer) + console.log('[client] handled incoming ', buffer) } } diff --git a/src/BatchPacket.js b/src/datatypes/BatchPacket.js similarity index 100% rename from src/BatchPacket.js rename to src/datatypes/BatchPacket.js