Skip to content
This repository was archived by the owner on Mar 11, 2020. It is now read-only.

Commit 199be5d

Browse files
committed
chore: use stream registry map
1 parent 7310a84 commit 199be5d

File tree

6 files changed

+55
-284
lines changed

6 files changed

+55
-284
lines changed

src/connection.js

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ const PeerId = require('peer-id')
44
const multiaddr = require('multiaddr')
55

66
const withIs = require('class-is')
7-
const Stream = require('./stream')
87

98
const assert = require('assert')
109
const errCode = require('err-code')
@@ -23,6 +22,7 @@ class Connection {
2322
* @param {PeerId} properties.remotePeer remote peer-id.
2423
* @param {function} properties.newStream new stream muxer function.
2524
* @param {function} properties.close close raw connection function.
25+
* @param {function} properties.getStreams get streams from muxer function.
2626
* @param {Object} properties.stat metadata of the connection.
2727
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
2828
* @param {Object} properties.stat.timeline connection relevant events timestamp.
@@ -31,13 +31,14 @@ class Connection {
3131
* @param {string} [properties.stat.multiplexer] connection multiplexing identifier.
3232
* @param {string} [properties.stat.encryption] connection encryption method identifier.
3333
*/
34-
constructor ({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, stat }) {
34+
constructor ({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat }) {
3535
assert(multiaddr.isMultiaddr(localAddr), 'localAddr must be an instance of multiaddr')
3636
assert(multiaddr.isMultiaddr(remoteAddr), 'remoteAddr must be an instance of multiaddr')
3737
assert(PeerId.isPeerId(localPeer), 'localPeer must be an instance of peer-id')
3838
assert(PeerId.isPeerId(remotePeer), 'remotePeer must be an instance of peer-id')
3939
assert(typeof newStream === 'function', 'new stream must be a function')
4040
assert(typeof close === 'function', 'close must be a function')
41+
assert(typeof getStreams === 'function', 'getStreams must be a function')
4142
assert(stat, 'connection metadata object must be provided')
4243
assert(stat.direction === 'inbound' || stat.direction === 'outbound', 'direction must be "inbound" or "outbound"')
4344
assert(stat.timeline, 'connection timeline object must be provided in the stat object')
@@ -92,9 +93,14 @@ class Connection {
9293
this._close = close
9394

9495
/**
95-
* Connection streams
96+
* Reference to the getStreams function of the muxer
9697
*/
97-
this._streams = []
98+
this._getStreams = getStreams
99+
100+
/**
101+
* Connection streams registry
102+
*/
103+
this.streamRegistry = new Map()
98104

99105
/**
100106
* User provided tags
@@ -111,17 +117,24 @@ class Connection {
111117
}
112118

113119
/**
114-
* Get all the streams associated with the connection.
115-
* @return {Array<Stream>}
120+
* Get all the streams of the muxer.
121+
* @return {Array<*>}
122+
*/
123+
get streams () {
124+
return this._getStreams()
125+
}
126+
127+
/**
128+
* Get stream registry map with stream metadata indexed by id.
116129
*/
117-
getStreams () {
118-
return this._streams
130+
get registry () {
131+
return this.streamRegistry
119132
}
120133

121134
/**
122135
* Create a new stream from this connection
123136
* @param {string[]} protocols intended protocol for the stream
124-
* @return {Stream} new muxed+multistream-selected stream
137+
* @return {*} new muxed+multistream-selected stream
125138
*/
126139
async newStream (protocols) {
127140
if (this.stat.status === 'closing') {
@@ -134,40 +147,37 @@ class Connection {
134147

135148
if (!Array.isArray(protocols)) protocols = [protocols]
136149

137-
const { stream: duplexStream, protocol } = await this._newStream(protocols)
150+
const { stream: muxedStream, protocol } = await this._newStream(protocols)
138151

139-
return this.addStream({
140-
stream: duplexStream,
141-
protocol,
142-
direction: 'outbound'
143-
})
152+
this.addStream(muxedStream, protocol)
153+
return muxedStream
144154
}
145155

146156
/**
147-
* Add an inbound stream when it is opened.
148-
* @param {object} options
149-
* @param {*} options.stream an Iterable Duplex stream
150-
* @param {string} options.protocol the protocol the stream is using
151-
* @param {string} [options.direction = 'inbound'] stream establishment direction ("inbound" or "outbound")
152-
* @return {Stream} new stream within the connection
157+
* Add a stream when it is opened to the registry.
158+
* @param {*} muxedStream a muxed stream
159+
* @param {string} protocol the protocol the stream is using
160+
* @param {Array<String>} tags tags of the stream
161+
* @return {void}
153162
*/
154-
addStream ({ stream, protocol, direction = 'inbound' }) {
155-
assert(direction === 'inbound' || direction === 'outbound', 'direction must be "inbound" or "outbound"')
156-
157-
const s = new Stream({
158-
iterableDuplex: stream,
159-
conn: this,
160-
direction,
163+
addStream (muxedStream, protocol, tags = []) {
164+
// Add metadata for the stream
165+
this.streamRegistry.set(muxedStream.id, {
166+
tags,
161167
protocol
162168
})
169+
}
163170

164-
this._streams.push(s)
165-
166-
return s
171+
/**
172+
* Remove stream registry after it is closed.
173+
* @param {string} id identifier of the stream
174+
*/
175+
removeStream (id) {
176+
this.streamRegistry.delete(id)
167177
}
168178

169179
/**
170-
* Close the connection, as well as all its associated streams.
180+
* Close the connection.
171181
* @return {Promise}
172182
*/
173183
async close () {
@@ -184,9 +194,6 @@ class Connection {
184194
// Close raw connection
185195
this._closing = await this._close()
186196

187-
// All streams closed
188-
this._streams.map((stream) => stream.close())
189-
190197
this._stat.timeline.close = Date.now()
191198
this.stat.status = 'closed'
192199
}

src/stream.js

Lines changed: 0 additions & 125 deletions
This file was deleted.

src/tests.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
'use strict'
44

55
const connectionSuite = require('../test/connection')
6-
const streamSuite = require('../test/stream')
76

87
module.exports = (test) => {
98
connectionSuite(test)
10-
streamSuite(test)
119
}

test/compliance.spec.js

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ describe('compliance tests', () => {
1717
PeerId.createFromJSON(peers[0]),
1818
PeerId.createFromJSON(peers[1])
1919
])
20+
const openStreams = []
21+
let streamId = 0
2022

2123
return new Connection({
2224
localPeer,
@@ -33,14 +35,21 @@ describe('compliance tests', () => {
3335
multiplexer: '/mplex/6.7.0'
3436
},
3537
newStream: (protocols) => {
38+
const id = streamId++
3639
const stream = pair()
40+
3741
stream.close = () => stream.sink([])
42+
stream.id = id
43+
44+
openStreams.push(stream)
45+
3846
return {
3947
stream,
4048
protocol: protocols[0]
4149
}
4250
},
43-
close: () => {}
51+
close: () => {},
52+
getStreams: () => openStreams
4453
})
4554
},
4655
async teardown () {

test/connection.js

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ module.exports = (test) => {
3535
expect(connection.stat.timeline.upgraded).to.exist()
3636
expect(connection.stat.timeline.close).to.not.exist()
3737
expect(connection.stat.direction).to.exist()
38-
expect(connection.getStreams()).to.eql([])
38+
expect(connection.streams).to.eql([])
3939
expect(connection.tags).to.eql([])
4040
})
4141

@@ -50,33 +50,21 @@ module.exports = (test) => {
5050
})
5151

5252
it('should return an empty array of streams', () => {
53-
const streams = connection.getStreams()
53+
const streams = connection.streams
5454

5555
expect(streams).to.eql([])
5656
})
5757

5858
it('should be able to create a new stream', async () => {
5959
const protocol = '/echo/0.0.1'
6060
const stream = await connection.newStream(protocol)
61-
const connStreams = await connection.getStreams()
61+
const connStreams = await connection.streams
6262

6363
expect(stream).to.exist()
6464
expect(connStreams).to.exist()
6565
expect(connStreams).to.have.lengthOf(1)
6666
expect(connStreams[0]).to.equal(stream)
6767
})
68-
69-
it('should be able to add an inbound stream', () => {
70-
const s = goodbye({ source: ['hey'], sink: collect })
71-
s.close = () => s.sink([])
72-
73-
connection.addStream({ stream: s, protocol: '/echo/0.0.1' })
74-
75-
const connStreams = connection.getStreams()
76-
expect(connStreams).to.exist()
77-
expect(connStreams).to.have.lengthOf(1)
78-
expect(connStreams[0].protocol).to.equal('/echo/0.0.1')
79-
})
8068
})
8169

8270
describe('close connection', () => {

0 commit comments

Comments
 (0)