217 lines
4.9 KiB
JavaScript
Executable file
217 lines
4.9 KiB
JavaScript
Executable file
'use strict'
|
|
|
|
var transport = require('../../../spdy-transport')
|
|
var utils = transport.utils
|
|
|
|
var assert = require('assert')
|
|
var util = require('util')
|
|
var debug = require('debug')('spdy:scheduler')
|
|
var Readable = require('readable-stream').Readable
|
|
|
|
/*
|
|
* We create following structure in `pending`:
|
|
* [ [ id = 0 ], [ id = 1 ], [ id = 2 ], [ id = 0 ] ]
|
|
* chunks chunks chunks chunks
|
|
* chunks chunks
|
|
* chunks
|
|
*
|
|
* Then on the `.tick()` pass we pick one chunks from each item and remove the
|
|
* item if it is empty:
|
|
*
|
|
* [ [ id = 0 ], [ id = 2 ] ]
|
|
* chunks chunks
|
|
* chunks
|
|
*
|
|
* Writing out: chunks for 0, chunks for 1, chunks for 2, chunks for 0
|
|
*
|
|
* This way data is interleaved between the different streams.
|
|
*/
|
|
|
|
function Scheduler (options) {
|
|
Readable.call(this)
|
|
|
|
// Pretty big window by default
|
|
this.window = 0.25
|
|
|
|
if (options && options.window) { this.window = options.window }
|
|
|
|
this.sync = []
|
|
this.list = []
|
|
this.count = 0
|
|
this.pendingTick = false
|
|
}
|
|
util.inherits(Scheduler, Readable)
|
|
module.exports = Scheduler
|
|
|
|
// Just for testing, really
|
|
Scheduler.create = function create (options) {
|
|
return new Scheduler(options)
|
|
}
|
|
|
|
function insertCompare (a, b) {
|
|
return a.priority === b.priority
|
|
? a.stream - b.stream
|
|
: b.priority - a.priority
|
|
}
|
|
|
|
Scheduler.prototype.schedule = function schedule (data) {
|
|
var priority = data.priority
|
|
var stream = data.stream
|
|
var chunks = data.chunks
|
|
|
|
// Synchronous frames should not be interleaved
|
|
if (priority === false) {
|
|
debug('queue sync', chunks)
|
|
this.sync.push(data)
|
|
this.count += chunks.length
|
|
|
|
this._read()
|
|
return
|
|
}
|
|
|
|
debug('queue async priority=%d stream=%d', priority, stream, chunks)
|
|
var item = new SchedulerItem(stream, priority)
|
|
var index = utils.binaryLookup(this.list, item, insertCompare)
|
|
|
|
// Push new item
|
|
if (index >= this.list.length || insertCompare(this.list[index], item) !== 0) {
|
|
this.list.splice(index, 0, item)
|
|
} else { // Coalesce
|
|
item = this.list[index]
|
|
}
|
|
|
|
item.push(data)
|
|
|
|
this.count += chunks.length
|
|
|
|
this._read()
|
|
}
|
|
|
|
Scheduler.prototype._read = function _read () {
|
|
if (this.count === 0) {
|
|
return
|
|
}
|
|
|
|
if (this.pendingTick) {
|
|
return
|
|
}
|
|
this.pendingTick = true
|
|
|
|
var self = this
|
|
process.nextTick(function () {
|
|
self.pendingTick = false
|
|
self.tick()
|
|
})
|
|
}
|
|
|
|
Scheduler.prototype.tick = function tick () {
|
|
// No luck for async frames
|
|
if (!this.tickSync()) { return false }
|
|
|
|
return this.tickAsync()
|
|
}
|
|
|
|
Scheduler.prototype.tickSync = function tickSync () {
|
|
// Empty sync queue first
|
|
var sync = this.sync
|
|
var res = true
|
|
this.sync = []
|
|
for (var i = 0; i < sync.length; i++) {
|
|
var item = sync[i]
|
|
debug('tick sync pending=%d', this.count, item.chunks)
|
|
for (var j = 0; j < item.chunks.length; j++) {
|
|
this.count--
|
|
// TODO: handle stream backoff properly
|
|
try {
|
|
res = this.push(item.chunks[j])
|
|
} catch (err) {
|
|
this.emit('error', err)
|
|
return false
|
|
}
|
|
}
|
|
debug('after tick sync pending=%d', this.count)
|
|
|
|
// TODO(indutny): figure out the way to invoke callback on actual write
|
|
if (item.callback) {
|
|
item.callback(null)
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
Scheduler.prototype.tickAsync = function tickAsync () {
|
|
var res = true
|
|
var list = this.list
|
|
if (list.length === 0) {
|
|
return res
|
|
}
|
|
|
|
var startPriority = list[0].priority
|
|
for (var index = 0; list.length > 0; index++) {
|
|
// Loop index
|
|
index %= list.length
|
|
if (startPriority - list[index].priority > this.window) { index = 0 }
|
|
debug('tick async index=%d start=%d', index, startPriority)
|
|
|
|
var current = list[index]
|
|
var item = current.shift()
|
|
|
|
if (current.isEmpty()) {
|
|
list.splice(index, 1)
|
|
if (index === 0 && list.length > 0) {
|
|
startPriority = list[0].priority
|
|
}
|
|
index--
|
|
}
|
|
|
|
debug('tick async pending=%d', this.count, item.chunks)
|
|
for (var i = 0; i < item.chunks.length; i++) {
|
|
this.count--
|
|
// TODO: handle stream backoff properly
|
|
try {
|
|
res = this.push(item.chunks[i])
|
|
} catch (err) {
|
|
this.emit('error', err)
|
|
return false
|
|
}
|
|
}
|
|
debug('after tick pending=%d', this.count)
|
|
|
|
// TODO(indutny): figure out the way to invoke callback on actual write
|
|
if (item.callback) {
|
|
item.callback(null)
|
|
}
|
|
if (!res) { break }
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
Scheduler.prototype.dump = function dump () {
|
|
this.tickSync()
|
|
|
|
// Write everything out
|
|
while (!this.tickAsync()) {
|
|
// Intentional no-op
|
|
}
|
|
assert.strictEqual(this.count, 0)
|
|
}
|
|
|
|
function SchedulerItem (stream, priority) {
|
|
this.stream = stream
|
|
this.priority = priority
|
|
this.queue = []
|
|
}
|
|
|
|
SchedulerItem.prototype.push = function push (chunks) {
|
|
this.queue.push(chunks)
|
|
}
|
|
|
|
SchedulerItem.prototype.shift = function shift () {
|
|
return this.queue.shift()
|
|
}
|
|
|
|
SchedulerItem.prototype.isEmpty = function isEmpty () {
|
|
return this.queue.length === 0
|
|
}
|