'use strict' var util = require('util') var transport = require('../spdy-transport') var debug = { server: require('debug')('spdy:connection:server'), client: require('debug')('spdy:connection:client') } var EventEmitter = require('events').EventEmitter var Stream = transport.Stream function Connection (socket, options) { EventEmitter.call(this) var state = {} this._spdyState = state // NOTE: There's a big trick here. Connection is used as a `this` argument // to the wrapped `connection` event listener. // socket end doesn't necessarly mean connection drop this.httpAllowHalfOpen = true state.timeout = new transport.utils.Timeout(this) // Protocol info state.protocol = transport.protocol[options.protocol] state.version = null state.constants = state.protocol.constants state.pair = null state.isServer = options.isServer // Root of priority tree (i.e. stream id = 0) state.priorityRoot = new transport.Priority({ defaultWeight: state.constants.DEFAULT_WEIGHT, maxCount: transport.protocol.base.constants.MAX_PRIORITY_STREAMS }) // Defaults state.maxStreams = options.maxStreams || state.constants.MAX_CONCURRENT_STREAMS state.autoSpdy31 = options.protocol.name !== 'h2' && options.autoSpdy31 state.acceptPush = options.acceptPush === undefined ? !state.isServer : options.acceptPush if (options.maxChunk === false) { state.maxChunk = Infinity } else if (options.maxChunk === undefined) { state.maxChunk = transport.protocol.base.constants.DEFAULT_MAX_CHUNK } else { state.maxChunk = options.maxChunk } // Connection-level flow control var windowSize = options.windowSize || 1 << 20 state.window = new transport.Window({ id: 0, isServer: state.isServer, recv: { size: state.constants.DEFAULT_WINDOW, max: state.constants.MAX_INITIAL_WINDOW_SIZE }, send: { size: state.constants.DEFAULT_WINDOW, max: state.constants.MAX_INITIAL_WINDOW_SIZE } }) // It starts with DEFAULT_WINDOW, update must be sent to change it on client state.window.recv.setMax(windowSize) // Boilerplate for Stream constructor state.streamWindow = new transport.Window({ id: -1, isServer: state.isServer, recv: { size: windowSize, max: state.constants.MAX_INITIAL_WINDOW_SIZE }, send: { size: state.constants.DEFAULT_WINDOW, max: state.constants.MAX_INITIAL_WINDOW_SIZE } }) // Various state info state.pool = state.protocol.compressionPool.create(options.headerCompression) state.counters = { push: 0, stream: 0 } // Init streams list state.stream = { map: {}, count: 0, nextId: state.isServer ? 2 : 1, lastId: { both: 0, received: 0 } } state.ping = { nextId: state.isServer ? 2 : 1, map: {} } state.goaway = false // Debug state.debug = state.isServer ? debug.server : debug.client // X-Forwarded feature state.xForward = null // Create parser and hole for framer state.parser = state.protocol.parser.create({ // NOTE: needed to distinguish ping from ping ACK in SPDY isServer: state.isServer, window: state.window }) state.framer = state.protocol.framer.create({ window: state.window, timeout: state.timeout }) // SPDY has PUSH enabled on servers if (state.protocol.name === 'spdy') { state.framer.enablePush(state.isServer) } if (!state.isServer) { state.parser.skipPreface() } this.socket = socket this._init() } util.inherits(Connection, EventEmitter) exports.Connection = Connection Connection.create = function create (socket, options) { return new Connection(socket, options) } Connection.prototype._init = function init () { var self = this var state = this._spdyState var pool = state.pool // Initialize session window state.window.recv.on('drain', function () { self._onSessionWindowDrain() }) // Initialize parser state.parser.on('data', function (frame) { self._handleFrame(frame) }) state.parser.once('version', function (version) { self._onVersion(version) }) // Propagate parser errors state.parser.on('error', function (err) { self._onParserError(err) }) // Propagate framer errors state.framer.on('error', function (err) { self.emit('error', err) }) this.socket.pipe(state.parser) state.framer.pipe(this.socket) // Allow high-level api to catch socket errors this.socket.on('error', function onSocketError (e) { self.emit('error', e) }) this.socket.once('close', function onclose (hadError) { var err if (hadError) { err = new Error('socket hang up') err.code = 'ECONNRESET' } self.destroyStreams(err) self.emit('close') if (state.pair) { pool.put(state.pair) } state.framer.resume() }) // Reset timeout on close this.once('close', function () { self.setTimeout(0) }) function _onWindowOverflow () { self._onWindowOverflow() } state.window.recv.on('overflow', _onWindowOverflow) state.window.send.on('overflow', _onWindowOverflow) // Do not allow half-open connections this.socket.allowHalfOpen = false } Connection.prototype._onVersion = function _onVersion (version) { var state = this._spdyState var prev = state.version var parser = state.parser var framer = state.framer var pool = state.pool state.version = version state.debug('id=0 version=%d', version) // Ignore transition to 3.1 if (!prev) { state.pair = pool.get(version) parser.setCompression(state.pair) framer.setCompression(state.pair) } framer.setVersion(version) if (!state.isServer) { framer.prefaceFrame() if (state.xForward !== null) { framer.xForwardedFor({ host: state.xForward }) } } // Send preface+settings frame (once) framer.settingsFrame({ max_header_list_size: state.constants.DEFAULT_MAX_HEADER_LIST_SIZE, max_concurrent_streams: state.maxStreams, enable_push: state.acceptPush ? 1 : 0, initial_window_size: state.window.recv.max }) // Update session window if (state.version >= 3.1 || (state.isServer && state.autoSpdy31)) { this._onSessionWindowDrain() } this.emit('version', version) } Connection.prototype._onParserError = function _onParserError (err) { var state = this._spdyState // Prevent further errors state.parser.kill() // Send GOAWAY if (err instanceof transport.protocol.base.utils.ProtocolError) { this._goaway({ lastId: state.stream.lastId.both, code: err.code, extra: err.message, send: true }) } this.emit('error', err) } Connection.prototype._handleFrame = function _handleFrame (frame) { var state = this._spdyState state.debug('id=0 frame', frame) state.timeout.reset() // For testing purposes this.emit('frame', frame) var stream // Session window update if (frame.type === 'WINDOW_UPDATE' && frame.id === 0) { if (state.version < 3.1 && state.autoSpdy31) { state.debug('id=0 switch version to 3.1') state.version = 3.1 this.emit('version', 3.1) } state.window.send.update(frame.delta) return } if (state.isServer && frame.type === 'PUSH_PROMISE') { state.debug('id=0 server PUSH_PROMISE') this._goaway({ lastId: state.stream.lastId.both, code: 'PROTOCOL_ERROR', send: true }) return } if (!stream && frame.id !== undefined) { // Load created one stream = state.stream.map[frame.id] // Fail if not found if (!stream && frame.type !== 'HEADERS' && frame.type !== 'PRIORITY' && frame.type !== 'RST') { // Other side should destroy the stream upon receiving GOAWAY if (this._isGoaway(frame.id)) { return } state.debug('id=0 stream=%d not found', frame.id) state.framer.rstFrame({ id: frame.id, code: 'INVALID_STREAM' }) return } } // Create new stream if (!stream && frame.type === 'HEADERS') { this._handleHeaders(frame) return } if (stream) { stream._handleFrame(frame) } else if (frame.type === 'SETTINGS') { this._handleSettings(frame.settings) } else if (frame.type === 'ACK_SETTINGS') { // TODO(indutny): handle it one day } else if (frame.type === 'PING') { this._handlePing(frame) } else if (frame.type === 'GOAWAY') { this._handleGoaway(frame) } else if (frame.type === 'X_FORWARDED_FOR') { // Set X-Forwarded-For only once if (state.xForward === null) { state.xForward = frame.host } } else if (frame.type === 'PRIORITY') { // TODO(indutny): handle this } else { state.debug('id=0 unknown frame type: %s', frame.type) } } Connection.prototype._onWindowOverflow = function _onWindowOverflow () { var state = this._spdyState state.debug('id=0 window overflow') this._goaway({ lastId: state.stream.lastId.both, code: 'FLOW_CONTROL_ERROR', send: true }) } Connection.prototype._isGoaway = function _isGoaway (id) { var state = this._spdyState if (state.goaway !== false && state.goaway < id) { return true } return false } Connection.prototype._getId = function _getId () { var state = this._spdyState var id = state.stream.nextId state.stream.nextId += 2 return id } Connection.prototype._createStream = function _createStream (uri) { var state = this._spdyState var id = uri.id if (id === undefined) { id = this._getId() } var isGoaway = this._isGoaway(id) if (uri.push && !state.acceptPush) { state.debug('id=0 push disabled promisedId=%d', id) // Fatal error this._goaway({ lastId: state.stream.lastId.both, code: 'PROTOCOL_ERROR', send: true }) isGoaway = true } var stream = new Stream(this, { id: id, request: uri.request !== false, method: uri.method, path: uri.path, host: uri.host, priority: uri.priority, headers: uri.headers, parent: uri.parent, readable: !isGoaway && uri.readable, writable: !isGoaway && uri.writable }) var self = this // Just an empty stream for API consistency if (isGoaway) { return stream } state.stream.lastId.both = Math.max(state.stream.lastId.both, id) state.debug('id=0 add stream=%d', stream.id) state.stream.map[stream.id] = stream state.stream.count++ state.counters.stream++ if (stream.parent !== null) { state.counters.push++ } stream.once('close', function () { self._removeStream(stream) }) return stream } Connection.prototype._handleHeaders = function _handleHeaders (frame) { var state = this._spdyState // Must be HEADERS frame after stream close if (frame.id <= state.stream.lastId.received) { return } // Someone is using our ids! if ((frame.id + state.stream.nextId) % 2 === 0) { state.framer.rstFrame({ id: frame.id, code: 'PROTOCOL_ERROR' }) return } var stream = this._createStream({ id: frame.id, request: false, method: frame.headers[':method'], path: frame.headers[':path'], host: frame.headers[':authority'], priority: frame.priority, headers: frame.headers, writable: frame.writable }) // GOAWAY if (this._isGoaway(stream.id)) { return } state.stream.lastId.received = Math.max( state.stream.lastId.received, stream.id ) // TODO(indutny) handle stream limit if (!this.emit('stream', stream)) { // No listeners was set - abort the stream stream.abort() return } // Create fake frame to simulate end of the data if (frame.fin) { stream._handleFrame({ type: 'FIN', fin: true }) } return stream } Connection.prototype._onSessionWindowDrain = function _onSessionWindowDrain () { var state = this._spdyState if (state.version < 3.1 && !(state.isServer && state.autoSpdy31)) { return } var delta = state.window.recv.getDelta() if (delta === 0) { return } state.debug('id=0 session window drain, update by %d', delta) state.framer.windowUpdateFrame({ id: 0, delta: delta }) state.window.recv.update(delta) } Connection.prototype.start = function start (version) { this._spdyState.parser.setVersion(version) } // Mostly for testing Connection.prototype.getVersion = function getVersion () { return this._spdyState.version } Connection.prototype._handleSettings = function _handleSettings (settings) { var state = this._spdyState state.framer.ackSettingsFrame() this._setDefaultWindow(settings) if (settings.max_frame_size) { state.framer.setMaxFrameSize(settings.max_frame_size) } // TODO(indutny): handle max_header_list_size if (settings.header_table_size) { try { state.pair.compress.updateTableSize(settings.header_table_size) } catch (e) { this._goaway({ lastId: 0, code: 'PROTOCOL_ERROR', send: true }) return } } // HTTP2 clients needs to enable PUSH streams explicitly if (state.protocol.name !== 'spdy') { if (settings.enable_push === undefined) { state.framer.enablePush(state.isServer) } else { state.framer.enablePush(settings.enable_push === 1) } } // TODO(indutny): handle max_concurrent_streams } Connection.prototype._setDefaultWindow = function _setDefaultWindow (settings) { if (settings.initial_window_size === undefined) { return } var state = this._spdyState // Update defaults var window = state.streamWindow window.send.setMax(settings.initial_window_size) // Update existing streams Object.keys(state.stream.map).forEach(function (id) { var stream = state.stream.map[id] var window = stream._spdyState.window window.send.updateMax(settings.initial_window_size) }) } Connection.prototype._handlePing = function handlePing (frame) { var self = this var state = this._spdyState // Handle incoming PING if (!frame.ack) { state.framer.pingFrame({ opaque: frame.opaque, ack: true }) self.emit('ping', frame.opaque) return } // Handle reply PING var hex = frame.opaque.toString('hex') if (!state.ping.map[hex]) { return } var ping = state.ping.map[hex] delete state.ping.map[hex] if (ping.cb) { ping.cb(null) } } Connection.prototype._handleGoaway = function handleGoaway (frame) { this._goaway({ lastId: frame.lastId, code: frame.code, send: false }) } Connection.prototype.ping = function ping (callback) { var state = this._spdyState // HTTP2 is using 8-byte opaque var opaque = Buffer.alloc(state.constants.PING_OPAQUE_SIZE) opaque.fill(0) opaque.writeUInt32BE(state.ping.nextId, opaque.length - 4) state.ping.nextId += 2 state.ping.map[opaque.toString('hex')] = { cb: callback } state.framer.pingFrame({ opaque: opaque, ack: false }) } Connection.prototype.getCounter = function getCounter (name) { return this._spdyState.counters[name] } Connection.prototype.reserveStream = function reserveStream (uri, callback) { var stream = this._createStream(uri) // GOAWAY if (this._isGoaway(stream.id)) { var err = new Error('Can\'t send request after GOAWAY') process.nextTick(function () { if (callback) { callback(err) } else { stream.emit('error', err) } }) return stream } if (callback) { process.nextTick(function () { callback(null, stream) }) } return stream } Connection.prototype.request = function request (uri, callback) { var stream = this.reserveStream(uri, function (err) { if (err) { if (callback) { callback(err) } else { stream.emit('error', err) } return } if (stream._wasSent()) { if (callback) { callback(null, stream) } return } stream.send(function (err) { if (err) { if (callback) { return callback(err) } else { return stream.emit('error', err) } } if (callback) { callback(null, stream) } }) }) return stream } Connection.prototype._removeStream = function _removeStream (stream) { var state = this._spdyState state.debug('id=0 remove stream=%d', stream.id) delete state.stream.map[stream.id] state.stream.count-- if (state.stream.count === 0) { this.emit('_streamDrain') } } Connection.prototype._goaway = function _goaway (params) { var state = this._spdyState var self = this state.goaway = params.lastId state.debug('id=0 goaway from=%d', state.goaway) Object.keys(state.stream.map).forEach(function (id) { var stream = state.stream.map[id] // Abort every stream started after GOAWAY if (stream.id <= params.lastId) { return } stream.abort() stream.emit('error', new Error('New stream after GOAWAY')) }) function finish () { // Destroy socket if there are no streams if (state.stream.count === 0 || params.code !== 'OK') { // No further frames should be processed state.parser.kill() process.nextTick(function () { var err = new Error('Fatal error: ' + params.code) self._onStreamDrain(err) }) return } self.on('_streamDrain', self._onStreamDrain) } if (params.send) { // Make sure that GOAWAY frame is sent before dumping framer state.framer.goawayFrame({ lastId: params.lastId, code: params.code, extra: params.extra }, finish) } else { finish() } } Connection.prototype._onStreamDrain = function _onStreamDrain (error) { var state = this._spdyState state.debug('id=0 _onStreamDrain') state.framer.dump() state.framer.unpipe(this.socket) state.framer.resume() if (this.socket.destroySoon) { this.socket.destroySoon() } this.emit('close', error) } Connection.prototype.end = function end (callback) { var state = this._spdyState if (callback) { this.once('close', callback) } this._goaway({ lastId: state.stream.lastId.both, code: 'OK', send: true }) } Connection.prototype.destroyStreams = function destroyStreams (err) { var state = this._spdyState Object.keys(state.stream.map).forEach(function (id) { var stream = state.stream.map[id] stream.destroy() if (err) { stream.emit('error', err) } }) } Connection.prototype.isServer = function isServer () { return this._spdyState.isServer } Connection.prototype.getXForwardedFor = function getXForwardFor () { return this._spdyState.xForward } Connection.prototype.sendXForwardedFor = function sendXForwardedFor (host) { var state = this._spdyState if (state.version !== null) { state.framer.xForwardedFor({ host: host }) } else { state.xForward = host } } Connection.prototype.pushPromise = function pushPromise (parent, uri, callback) { var state = this._spdyState var stream = this._createStream({ request: false, parent: parent, method: uri.method, path: uri.path, host: uri.host, priority: uri.priority, headers: uri.headers, readable: false }) var err // TODO(indutny): deduplicate this logic somehow if (this._isGoaway(stream.id)) { err = new Error('Can\'t send PUSH_PROMISE after GOAWAY') process.nextTick(function () { if (callback) { callback(err) } else { stream.emit('error', err) } }) return stream } if (uri.push && !state.acceptPush) { err = new Error( 'Can\'t send PUSH_PROMISE, other side won\'t accept it') process.nextTick(function () { if (callback) { callback(err) } else { stream.emit('error', err) } }) return stream } stream._sendPush(uri.status, uri.response, function (err) { if (!callback) { if (err) { stream.emit('error', err) } return } if (err) { return callback(err) } callback(null, stream) }) return stream } Connection.prototype.setTimeout = function setTimeout (delay, callback) { var state = this._spdyState state.timeout.set(delay, callback) }