Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ module.exports = {
kPendingIdx: Symbol('pending index'),
kError: Symbol('error'),
kClients: Symbol('clients'),
kHttp1OnlyClients: Symbol('http1-only clients'),
kGetDispatcherEntry: Symbol('get dispatcher entry'),
kSetDispatcherEntry: Symbol('set dispatcher entry'),
kDeleteDispatcherEntry: Symbol('delete dispatcher entry'),
kHasDispatcherForOrigin: Symbol('has dispatcher for origin'),
kForEachDispatcherEntry: Symbol('for each dispatcher entry'),
kClient: Symbol('client'),
kParser: Symbol('parser'),
kOnDestroyed: Symbol('destroy callbacks'),
Expand Down
122 changes: 89 additions & 33 deletions lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
'use strict'

const { InvalidArgumentError, MaxOriginsReachedError } = require('../core/errors')
const { kBusy, kClients, kConnected, kRunning, kClose, kDestroy, kDispatch, kUrl } = require('../core/symbols')
const {
kBusy,
kClients,
kConnected,
kHttp1OnlyClients,
kRunning,
kClose,
kDestroy,
kDispatch,
kUrl,
kGetDispatcherEntry,
kSetDispatcherEntry,
kDeleteDispatcherEntry,
kHasDispatcherForOrigin,
kForEachDispatcherEntry
} = require('../core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
Expand All @@ -21,6 +36,10 @@ function defaultFactory (origin, opts) {
: new Pool(origin, opts)
}

function shouldUseHttp1OnlyClients (allowH2) {
return allowH2 === false
}

class Agent extends DispatcherBase {
constructor ({ factory = defaultFactory, maxOrigins = Infinity, connect, ...options } = {}) {
if (typeof factory !== 'function') {
Expand All @@ -44,6 +63,7 @@ class Agent extends DispatcherBase {
this[kOptions] = { ...util.deepClone(options), maxOrigins, connect }
this[kFactory] = factory
this[kClients] = new Map()
this[kHttp1OnlyClients] = new Map()
this[kOrigins] = new Set()

this[kOnDrain] = (origin, targets) => {
Expand All @@ -65,12 +85,45 @@ class Agent extends DispatcherBase {

get [kRunning] () {
let ret = 0
for (const dispatcher of this[kClients].values()) {

this[kForEachDispatcherEntry](({ dispatcher }) => {
ret += dispatcher[kRunning]
}
})

return ret
}

[kGetDispatcherEntry] (origin, { allowH2 } = {}) {
return (shouldUseHttp1OnlyClients(allowH2) ? this[kHttp1OnlyClients] : this[kClients]).get(origin)
}

[kSetDispatcherEntry] (origin, { allowH2 } = {}, entry) {
;(shouldUseHttp1OnlyClients(allowH2) ? this[kHttp1OnlyClients] : this[kClients]).set(origin, entry)
this[kOrigins].add(origin)
}

[kDeleteDispatcherEntry] (origin, { allowH2 } = {}) {
;(shouldUseHttp1OnlyClients(allowH2) ? this[kHttp1OnlyClients] : this[kClients]).delete(origin)

if (!this[kHasDispatcherForOrigin](origin)) {
this[kOrigins].delete(origin)
}
}

[kHasDispatcherForOrigin] (origin) {
return this[kClients].has(origin) || this[kHttp1OnlyClients].has(origin)
}

[kForEachDispatcherEntry] (callback) {
for (const [origin, entry] of this[kClients]) {
callback(entry, { origin })
}

for (const [origin, entry] of this[kHttp1OnlyClients]) {
callback(entry, { origin, allowH2: false })
}
}

[kDispatch] (opts, handler) {
let origin
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
Expand All @@ -80,48 +133,42 @@ class Agent extends DispatcherBase {
}

const allowH2 = opts.allowH2 ?? this[kOptions].allowH2
const key = allowH2 === false ? `${origin}#http1-only` : origin
const registry = { allowH2 }

if (this[kOrigins].size >= this[kOptions].maxOrigins && !this[kOrigins].has(origin)) {
throw new MaxOriginsReachedError()
}

let dispatcher = this[kClients].get(key)
const result = this[kGetDispatcherEntry](origin, registry)
let dispatcher = result && result.dispatcher
if (!dispatcher) {
dispatcher = this[kFactory](opts.origin, allowH2 === false
? { ...this[kOptions], allowH2: false }
: this[kOptions])

const closeClientIfUnused = () => {
if (this[kClients].get(key) !== dispatcher) {
const result = this[kGetDispatcherEntry](origin, registry)
if (!result || result.dispatcher !== dispatcher) {
return
}

if (dispatcher[kConnected] > 0 || dispatcher[kBusy]) {
return
}

this[kClients].delete(key)
this[kDeleteDispatcherEntry](origin, registry)
if (!dispatcher.destroyed) {
dispatcher.close()
}

let hasOrigin = false
for (const client of this[kClients].values()) {
if (client[kUrl].origin === dispatcher[kUrl].origin) {
hasOrigin = true
break
}
}

if (!hasOrigin) {
this[kOrigins].delete(dispatcher[kUrl].origin)
}
}

dispatcher
dispatcher = this[kFactory](opts.origin, allowH2 === false
? { ...this[kOptions], allowH2: false }
: this[kOptions])
.on('drain', this[kOnDrain])
.on('connect', this[kOnConnect])
.on('connect', (origin, targets) => {
const result = this[kGetDispatcherEntry](origin, registry)
if (result) {
result.count += 1
}
this[kOnConnect](origin, targets)
})
.on('disconnect', (origin, targets, err) => {
closeClientIfUnused()
this[kOnDisconnect](origin, targets, err)
Expand All @@ -131,40 +178,49 @@ class Agent extends DispatcherBase {
this[kOnConnectionError](origin, targets, err)
})

this[kClients].set(key, dispatcher)
this[kOrigins].add(origin)
this[kSetDispatcherEntry](origin, registry, { count: 0, dispatcher, origin })
}

return dispatcher.dispatch(opts, handler)
}

[kClose] () {
const closePromises = []
for (const dispatcher of this[kClients].values()) {

this[kForEachDispatcherEntry](({ dispatcher }) => {
closePromises.push(dispatcher.close())
}
})

this[kClients].clear()
this[kHttp1OnlyClients].clear()
this[kOrigins].clear()

return Promise.all(closePromises)
}

[kDestroy] (err) {
const destroyPromises = []
for (const dispatcher of this[kClients].values()) {

this[kForEachDispatcherEntry](({ dispatcher }) => {
destroyPromises.push(dispatcher.destroy(err))
}
})

this[kClients].clear()
this[kHttp1OnlyClients].clear()
this[kOrigins].clear()

return Promise.all(destroyPromises)
}

get stats () {
const allClientStats = {}
for (const dispatcher of this[kClients].values()) {

this[kForEachDispatcherEntry](({ dispatcher }) => {
if (dispatcher.stats) {
allClientStats[dispatcher[kUrl].origin] = dispatcher.stats
}
}
})

return allClientStats
}
}
Expand Down
Loading
Loading