A new start

This commit is contained in:
2018-11-24 14:43:59 +01:00
commit 3c32c8a37a
24054 changed files with 1376258 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
language: node_js
node_js:
- "0.10"

View File

@@ -0,0 +1,24 @@
The MIT License (MIT)
Copyright (c) 2011 Dominic Tarr
Permission is hereby granted, free of charge,
to any person obtaining a copy of this software and
associated documentation files (the "Software"), to
deal in the Software without restriction, including
without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom
the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,3 @@
{"foo": 1}
{"foo": 2}
{"foo": 3, "bar": "test"}

View File

@@ -0,0 +1,15 @@
var es = require('..')
process.stdin
.pipe(es.map(function (data, callback) {
for (var i = 0; i < data.length; i++) {
if (data[i] == 0x61) {
data[i] = 0x41
}
}
callback(null, data)
}))
.pipe(process.stdout)
// echo abcdabcd | node map.js
// AbcdAbcd

View File

@@ -0,0 +1,18 @@
var inspect = require('util').inspect
var es = require('..')
es.pipe( //pipe joins streams together
process.openStdin(), //open stdin
es.split(null, null, {trailing: false}), //split stream to break on newlines
es.map(function (data, callback) { //turn this async function into a stream
var obj = JSON.parse(data) //parse input into json
callback(null, inspect(obj) + '\n') //render it nicely
}),
process.stdout // pipe it to stdout !
)
// cat data | node pretty.js
// { foo: 1 }
// { foo: 2 }
// { foo: 3, bar: 'test' }

View File

@@ -0,0 +1,12 @@
var es = require('..')
process.stdin
.pipe(es.split(null, null, {trailing: false})) // ignore trailing empty line
.on('data', function (data) {
console.log('data: ' + data)
})
// cat data | node map.js
// data: {"foo": 1}
// data: {"foo": 2}
// data: {"foo": 3, "bar": "test"}

View File

@@ -0,0 +1,327 @@
//filter will reemit the data if cb(err,pass) pass is truthy
// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'
var Stream = require('stream').Stream
, es = exports
, through = require('through')
, from = require('from')
, flatmap = require('flatmap-stream')
, duplex = require('duplexer')
, map = require('map-stream')
, pause = require('pause-stream')
, split = require('split')
, pipeline = require('stream-combiner')
, immediately = global.setImmediate || process.nextTick;
es.Stream = Stream //re-export Stream from core
es.through = through
es.from = from
es.flatmap = flatmap
es.duplex = duplex
es.map = map
es.pause = pause
es.split = split
es.pipeline = es.connect = es.pipe = pipeline
// merge / concat
//
// combine multiple streams into a single stream.
// will emit end only once
es.concat = //actually this should be called concat
es.merge = function (/*streams...*/) {
var toMerge = [].slice.call(arguments)
if (toMerge.length === 1 && (toMerge[0] instanceof Array)) {
toMerge = toMerge[0] //handle array as arguments object
}
var stream = new Stream()
stream.setMaxListeners(0) // allow adding more than 11 streams
var endCount = 0
stream.writable = stream.readable = true
if (toMerge.length) {
toMerge.forEach(function (e) {
e.pipe(stream, {end: false})
var ended = false
e.on('end', function () {
if(ended) return
ended = true
endCount ++
if(endCount == toMerge.length)
stream.emit('end')
})
})
} else {
process.nextTick(function () {
stream.emit('end')
})
}
stream.write = function (data) {
this.emit('data', data)
}
stream.destroy = function () {
toMerge.forEach(function (e) {
if(e.destroy) e.destroy()
})
}
return stream
}
// writable stream, collects all events into an array
// and calls back when 'end' occurs
// mainly I'm using this to test the other functions
es.collect =
es.writeArray = function (done) {
if ('function' !== typeof done)
throw new Error('function writeArray (done): done must be function')
var a = new Stream ()
, array = [], isDone = false
a.write = function (l) {
array.push(l)
}
a.end = function () {
isDone = true
done(null, array)
}
a.writable = true
a.readable = false
a.destroy = function () {
a.writable = a.readable = false
if(isDone) return
done(new Error('destroyed before end'), array)
}
return a
}
//return a Stream that reads the properties of an object
//respecting pause() and resume()
es.readArray = function (array) {
var stream = new Stream()
, i = 0
, paused = false
, ended = false
stream.readable = true
stream.writable = false
if(!Array.isArray(array))
throw new Error('event-stream.read expects an array')
stream.resume = function () {
if(ended) return
paused = false
var l = array.length
while(i < l && !paused && !ended) {
stream.emit('data', array[i++])
}
if(i == l && !ended)
ended = true, stream.readable = false, stream.emit('end')
}
process.nextTick(stream.resume)
stream.pause = function () {
paused = true
}
stream.destroy = function () {
ended = true
stream.emit('close')
}
return stream
}
//
// readable (asyncFunction)
// return a stream that calls an async function while the stream is not paused.
//
// the function must take: (count, callback) {...
//
es.readable =
function (func, continueOnError) {
var stream = new Stream()
, i = 0
, paused = false
, ended = false
, reading = false
stream.readable = true
stream.writable = false
if('function' !== typeof func)
throw new Error('event-stream.readable expects async function')
stream.on('end', function () { ended = true })
function get (err, data) {
if(err) {
stream.emit('error', err)
if(!continueOnError) stream.emit('end')
} else if (arguments.length > 1)
stream.emit('data', data)
immediately(function () {
if(ended || paused || reading) return
try {
reading = true
func.call(stream, i++, function () {
reading = false
get.apply(null, arguments)
})
} catch (err) {
stream.emit('error', err)
}
})
}
stream.resume = function () {
paused = false
get()
}
process.nextTick(get)
stream.pause = function () {
paused = true
}
stream.destroy = function () {
stream.emit('end')
stream.emit('close')
ended = true
}
return stream
}
//
// map sync
//
es.mapSync = function (sync) {
return es.through(function write(data) {
var mappedData
try {
mappedData = sync(data)
} catch (err) {
return this.emit('error', err)
}
if (mappedData !== undefined)
this.emit('data', mappedData)
})
}
//
// log just print out what is coming through the stream, for debugging
//
es.log = function (name) {
return es.through(function (data) {
var args = [].slice.call(arguments)
if(name) console.error(name, data)
else console.error(data)
this.emit('data', data)
})
}
//
// child -- pipe through a child process
//
es.child = function (child) {
return es.duplex(child.stdin, child.stdout)
}
//
// parse
//
// must be used after es.split() to ensure that each chunk represents a line
// source.pipe(es.split()).pipe(es.parse())
es.parse = function (options) {
var emitError = !!(options ? options.error : false)
return es.through(function (data) {
var obj
try {
if(data) //ignore empty lines
obj = JSON.parse(data.toString())
} catch (err) {
if (emitError)
return this.emit('error', err)
return console.error(err, 'attempting to parse:', data)
}
//ignore lines that where only whitespace.
if(obj !== undefined)
this.emit('data', obj)
})
}
//
// stringify
//
es.stringify = function () {
var Buffer = require('buffer').Buffer
return es.mapSync(function (e){
return JSON.stringify(Buffer.isBuffer(e) ? e.toString() : e) + '\n'
})
}
//
// replace a string within a stream.
//
// warn: just concatenates the string and then does str.split().join().
// probably not optimal.
// for smallish responses, who cares?
// I need this for shadow-npm so it's only relatively small json files.
es.replace = function (from, to) {
return es.pipeline(es.split(from), es.join(to))
}
//
// join chunks with a joiner. just like Array#join
// also accepts a callback that is passed the chunks appended together
// this is still supported for legacy reasons.
//
es.join = function (str) {
//legacy api
if('function' === typeof str)
return es.wait(str)
var first = true
return es.through(function (data) {
if(!first)
this.emit('data', str)
first = false
this.emit('data', data)
return true
})
}
//
// wait. callback when 'end' is emitted, with all chunks appended as string.
//
es.wait = function (callback) {
var arr = []
return es.through(function (data) { arr.push(data) },
function () {
var body = Buffer.isBuffer(arr[0]) ? Buffer.concat(arr)
: arr.join('')
this.emit('data', body)
this.emit('end')
if(callback) callback(null, body)
})
}
es.pipeable = function () {
throw new Error('[EVENT-STREAM] es.pipeable is deprecated')
}

View File

@@ -0,0 +1,87 @@
{
"_from": "event-stream@~3.3.x",
"_id": "event-stream@3.3.6",
"_inBundle": false,
"_integrity": "sha512-dGXNg4F/FgVzlApjzItL+7naHutA3fDqbV/zAZqDDlXTjiMnQmZKu+prImWKszeBM5UQeGvAl3u1wBiKeDh61g==",
"_location": "/gulp-token-replace/event-stream",
"_phantomChildren": {},
"_requested": {
"type": "range",
"registry": true,
"raw": "event-stream@~3.3.x",
"name": "event-stream",
"escapedName": "event-stream",
"rawSpec": "~3.3.x",
"saveSpec": null,
"fetchSpec": "~3.3.x"
},
"_requiredBy": [
"/gulp-token-replace"
],
"_resolved": "https://registry.npmjs.org/event-stream/-/event-stream-3.3.6.tgz",
"_shasum": "cac1230890e07e73ec9cacd038f60a5b66173eef",
"_spec": "event-stream@~3.3.x",
"_where": "/var/www/html/autocompletion/node_modules/gulp-token-replace",
"author": {
"name": "Dominic Tarr",
"email": "dominic.tarr@gmail.com",
"url": "http://bit.ly/dominictarr"
},
"bugs": {
"url": "https://github.com/dominictarr/event-stream/issues"
},
"bundleDependencies": false,
"dependencies": {
"duplexer": "^0.1.1",
"flatmap-stream": "^0.1.0",
"from": "^0.1.7",
"map-stream": "0.0.7",
"pause-stream": "^0.0.11",
"split": "^1.0.1",
"stream-combiner": "^0.2.2",
"through": "^2.3.8"
},
"deprecated": false,
"description": "construct pipes of streams of events",
"devDependencies": {
"asynct": "^1.1.0",
"it-is": "^1.0.3",
"stream-spec": "^0.3.6",
"tape": "^4.9.1",
"ubelt": "^3.2.2"
},
"homepage": "http://github.com/dominictarr/event-stream",
"license": "MIT",
"name": "event-stream",
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/event-stream.git"
},
"scripts": {
"prepublish": "npm ls && npm test",
"test": "asynct test/",
"test_tap": "set -e; for t in test/*.js; do node $t; done"
},
"testling": {
"files": "test/*.js",
"browsers": {
"ie": [
8,
9
],
"firefox": [
13
],
"chrome": [
20
],
"safari": [
5.1
],
"opera": [
12
]
}
},
"version": "3.3.6"
}

View File

@@ -0,0 +1,317 @@
# EventStream
[Streams](http://nodejs.org/api/stream.html "Stream") are node's best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.
Normally, streams are only used for IO, but in event stream we send all kinds of objects down the pipe. If your application's input and output are streams, shouldn't the throughput be a stream too?
The *EventStream* functions resemble the array functions, because Streams are like Arrays, but laid out in time, rather than in memory.
All the `event-stream` functions return instances of `Stream`.
`event-stream` creates [0.8 streams](https://github.com/joyent/node/blob/v0.8/doc/api/stream.markdown), which are compatible with [0.10 streams](http://nodejs.org/api/stream.html "Stream").
>NOTE: I shall use the term <em>"through stream"</em> to refer to a stream that is writable <em>and</em> readable.
>NOTE for Gulp users: Merge will not work for gulp 4. [merge-stream](https://npmjs.com/merge-stream) should be used.
### [simple example](https://github.com/dominictarr/event-stream/blob/master/examples/pretty.js):
``` js
//pretty.js
if(!module.parent) {
var es = require('event-stream')
var inspect = require('util').inspect
process.stdin //connect streams together with `pipe`
.pipe(es.split()) //split stream to break on newlines
.pipe(es.map(function (data, cb) { //turn this async function into a stream
cb(null
, inspect(JSON.parse(data))) //render it nicely
}))
.pipe(process.stdout) // pipe it to stdout !
}
```
run it ...
``` bash
curl -sS registry.npmjs.org/event-stream | node pretty.js
```
[node Stream documentation](http://nodejs.org/api/stream.html)
## through (write?, end?)
Re-emits data synchronously. Easy way to create synchronous through streams.
Pass in optional `write` and `end` methods. They will be called in the
context of the stream. Use `this.pause()` and `this.resume()` to manage flow.
Check `this.paused` to see current flow state. (write always returns `!this.paused`)
this function is the basis for most of the synchronous streams in `event-stream`.
``` js
es.through(function write(data) {
this.emit('data', data)
//this.pause()
},
function end () { //optional
this.emit('end')
})
```
## map (asyncFunction)
Create a through stream from an asynchronous function.
``` js
var es = require('event-stream')
es.map(function (data, callback) {
//transform data
// ...
callback(null, data)
})
```
Each map MUST call the callback. It may callback with data, with an error or with no arguments,
* `callback()` drop this data.
this makes the map work like `filter`,
note:`callback(null,null)` is not the same, and will emit `null`
* `callback(null, newData)` turn data into newData
* `callback(error)` emit an error for this item.
>Note: if a callback is not called, `map` will think that it is still being processed,
>every call must be answered or the stream will not know when to end.
>
>Also, if the callback is called more than once, every call but the first will be ignored.
## mapSync (syncFunction)
Same as `map`, but the callback is called synchronously. Based on `es.through`
## split (matcher)
Break up a stream and reassemble it so that each line is a chunk. matcher may be a `String`, or a `RegExp`
Example, read every line in a file ...
``` js
fs.createReadStream(file, {flags: 'r'})
.pipe(es.split())
.pipe(es.map(function (line, cb) {
//do something with the line
cb(null, line)
}))
```
`split` takes the same arguments as `string.split` except it defaults to '\n' instead of ',', and the optional `limit` parameter is ignored.
[String#split](https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/String/split)
**NOTE** - Maintaining Line Breaks
If you want to process each line of the stream, transform the data, reassemble, and **KEEP** the line breaks the example will look like this:
```javascript
fs.createReadStream(file, {flags: 'r'})
.pipe(es.split(/(\r?\n)/))
.pipe(es.map(function (line, cb) {
//do something with the line
cb(null, line)
}))
```
This technique is mentioned in the [underlying documentation](https://www.npmjs.com/package/split#keep-matched-splitter) for the split npm package.
## join (separator)
Create a through stream that emits `separator` between each chunk, just like Array#join.
(for legacy reasons, if you pass a callback instead of a string, join is a synonym for `es.wait`)
## merge (stream1,...,streamN) or merge (streamArray)
> concat → merge
Merges streams into one and returns it.
Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: `data1 data1 data2 data1 data2` - where `data1` and `data2` is data from two streams).
Counts how many streams were passed to it and emits end only when all streams emitted end.
```js
es.merge(
process.stdout,
process.stderr
).pipe(fs.createWriteStream('output.log'));
```
It can also take an Array of streams as input like this:
```js
es.merge([
fs.createReadStream('input1.txt'),
fs.createReadStream('input2.txt')
]).pipe(fs.createWriteStream('output.log'));
```
## replace (from, to)
Replace all occurrences of `from` with `to`. `from` may be a `String` or a `RegExp`.
Works just like `string.split(from).join(to)`, but streaming.
## parse
Convenience function for parsing JSON chunks. For newline separated JSON,
use with `es.split`. By default it logs parsing errors by `console.error`;
for another behaviour, transforms created by `es.parse({error: true})` will
emit error events for exceptions thrown from `JSON.parse`, unmodified.
``` js
fs.createReadStream(filename)
.pipe(es.split()) //defaults to lines.
.pipe(es.parse())
```
## stringify
convert javascript objects into lines of text. The text will have whitespace escaped and have a `\n` appended, so it will be compatible with `es.parse`
``` js
objectStream
.pipe(es.stringify())
.pipe(fs.createWriteStream(filename))
```
## readable (asyncFunction)
create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with `(count, callback)`,
and `this` will be the readable stream.
``` js
es.readable(function (count, callback) {
if(streamHasEnded)
return this.emit('end')
//...
this.emit('data', data) //use this way to emit multiple chunks per call.
callback() // you MUST always call the callback eventually.
// the function will not be called again until you do this.
})
```
you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.
## readArray (array)
Create a readable stream from an Array.
Just emit each item as a data event, respecting `pause` and `resume`.
``` js
var es = require('event-stream')
, reader = es.readArray([1,2,3])
reader.pipe(...)
```
If you want the stream behave like a 0.10 stream you will need to wrap it using [`Readable.wrap()`](http://nodejs.org/api/stream.html#stream_readable_wrap_stream) function. Example:
``` js
var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));
```
## writeArray (callback)
create a writeable stream from a callback,
all `data` events are stored in an array, which is passed to the callback when the stream ends.
``` js
var es = require('event-stream')
, reader = es.readArray([1, 2, 3])
, writer = es.writeArray(function (err, array){
//array deepEqual [1, 2, 3]
})
reader.pipe(writer)
```
## pause ()
A stream that buffers all chunks when paused.
``` js
var ps = es.pause()
ps.pause() //buffer the stream, also do not allow 'end'
ps.resume() //allow chunks through
```
## duplex (writeStream, readStream)
Takes a writable stream and a readable stream and makes them appear as a readable writable stream.
It is assumed that the two streams are connected to each other in some way.
(This is used by `pipeline` and `child`.)
``` js
var grep = cp.exec('grep Stream')
es.duplex(grep.stdin, grep.stdout)
```
## child (child_process)
Create a through stream from a child process ...
``` js
var cp = require('child_process')
es.child(cp.exec('grep Stream')) // a through stream
```
## wait (callback)
waits for stream to emit 'end'.
joins chunks of a stream into a single string or buffer.
takes an optional callback, which will be passed the
complete string/buffer when it receives the 'end' event.
also, emits a single 'data' event.
``` js
readStream.pipe(es.wait(function (err, body) {
// have complete text here.
}))
```
# Other Stream Modules
These modules are not included as a part of *EventStream* but may be
useful when working with streams.
## [reduce (syncFunction, initial)](https://github.com/parshap/node-stream-reduce)
Like `Array.prototype.reduce` but for streams. Given a sync reduce
function and an initial value it will return a through stream that emits
a single data event with the reduced value once the input stream ends.
``` js
var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
return acc + data.length;
}, 0)).on("data", function(length) {
console.log("stdin size:", length);
});
```

View File

@@ -0,0 +1,86 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
function makeExamplePipe() {
return es.connect(
es.map(function (data, callback) {
callback(null, data * 2)
}),
es.map(function (data, callback) {
d.delay(callback)(null, data)
}),
es.map(function (data, callback) {
callback(null, data + 2)
}))
}
exports['simple pipe'] = function (test) {
var pipe = makeExamplePipe()
pipe.on('data', function (data) {
it(data).equal(18)
test.done()
})
pipe.write(8)
}
exports['read array then map'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
, first = es.readArray(readThis)
, read = []
, pipe =
es.connect(
first,
es.map(function (data, callback) {
callback(null, {data: data})
}),
es.map(function (data, callback) {
callback(null, {data: data})
}),
es.writeArray(function (err, array) {
it(array).deepEqual(d.map(readThis, function (data) {
return {data: {data: data}}
}))
test.done()
})
)
}
exports ['connect returns a stream'] = function (test) {
var rw =
es.connect(
es.map(function (data, callback) {
callback(null, data * 2)
}),
es.map(function (data, callback) {
callback(null, data * 5)
})
)
it(rw).has({readable: true, writable: true})
var array = [190, 24, 6, 7, 40, 57, 4, 6]
, _array = []
, c =
es.connect(
es.readArray(array),
rw,
es.log('after rw:'),
es.writeArray(function (err, _array) {
it(_array).deepEqual(array.map(function (e) { return e * 10 }))
test.done()
})
)
}
require('./helper')(module)

View File

@@ -0,0 +1,17 @@
'use strict';
var es = require('../')
, it = require('it-is')
exports ['flatmap'] = function (test) {
es.readArray([[1], [1, 2], [1, 2, 3]])
.pipe(es.flatmap(function(e, cb) {
cb(null, e + 1)
}))
.pipe(es.writeArray(function(error, array) {
test.deepEqual([2, 2, 3, 2, 3, 4], array)
test.end()
}))
}
require('./helper')(module)

View File

@@ -0,0 +1,12 @@
var tape = require('tape')
module.exports = function (m) {
if(m.parent) return
for(var name in m.exports) {
tape(name, function (t) {
console.log('start', name)
t.done = t.end
m.exports[name](t)
})
}
}

View File

@@ -0,0 +1,29 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
exports.merge = function (t) {
var odd = d.map(1, 3, 100, d.id) //array of multiples of 3 < 100
var even = d.map(2, 4, 100, d.id) //array of multiples of 3 < 100
var r1 = es.readArray(even)
var r2 = es.readArray(odd)
var endCount = 0
var writer = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array.sort()).deepEqual(even.concat(odd).sort())
if (++endCount === 2) t.done()
})
var writer2 = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array.sort()).deepEqual(even.concat(odd).sort())
if (++endCount === 2) t.done()
})
es.merge(r1, r2).pipe(writer)
es.merge([r1, r2]).pipe(writer2)
}
require('./helper')(module)

View File

@@ -0,0 +1,32 @@
var es = require('../')
, it = require('it-is').style('colour')
exports ['es.parse() writes parsing errors with console.error'] = function (test) {
var parseStream = es.parse()
var oldConsoleError = console.error
console.error = function () {
console.error = oldConsoleError
it(arguments.length > 0).ok()
test.done()
}
// bare word is not valid JSON
parseStream.write('A')
}
exports ['es.parse({error: true(thy)}) emits error events from parsing'] = function (test) {
var parseStream = es.parse({error: 1})
var expectedError
try {
JSON.parse('A')
} catch(e) {
expectedError = e
}
parseStream.on('error', function (e) {
it(e).deepEqual(expectedError)
process.nextTick(function () {
test.done()
})
}).write('A')
}

View File

@@ -0,0 +1,39 @@
var es = require('../')
, it = require('it-is')
, d = require('ubelt')
exports ['gate buffers when shut'] = function (test) {
var hundy = d.map(1,100, d.id)
, gate = es.pause()
, ten = 10
es.connect(
es.readArray(hundy),
es.log('after readArray'),
gate,
//es.log('after gate'),
es.map(function (num, next) {
//stick a map in here to check that gate never emits when open
it(gate.paused).equal(false)
console.log('data', num)
if(!--ten) {
console.log('PAUSE')
gate.pause()//.resume()
d.delay(gate.resume.bind(gate), 10)()
ten = 10
}
next(null, num)
}),
es.writeArray(function (err, array) { //just realized that I should remove the error param. errors will be emitted
console.log('eonuhoenuoecbulc')
it(array).deepEqual(hundy)
test.done()
})
)
gate.resume()
}
require('./helper')(module)

View File

@@ -0,0 +1,52 @@
var es = require('..')
exports['do not duplicate errors'] = function (test) {
var errors = 0;
var pipe = es.pipeline(
es.through(function(data) {
return this.emit('data', data);
}),
es.through(function(data) {
return this.emit('error', new Error(data));
})
)
pipe.on('error', function(err) {
errors++
console.log('error count', errors)
process.nextTick(function () {
return test.done();
})
})
return pipe.write('meh');
}
exports['3 pipe do not duplicate errors'] = function (test) {
var errors = 0;
var pipe = es.pipeline(
es.through(function(data) {
return this.emit('data', data);
}),
es.through(function(data) {
return this.emit('error', new Error(data));
}),
es.through()
)
pipe.on('error', function(err) {
errors++
console.log('error count', errors)
process.nextTick(function () {
return test.done();
})
})
return pipe.write('meh');
}
require('./helper')(module)

View File

@@ -0,0 +1,89 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
function readStream(stream, pauseAt, done) {
if(!done) done = pauseAt, pauseAt = -1
var array = []
stream.on('data', function (data) {
array.push(data)
if(!--pauseAt )
stream.pause(), done(null, array)
})
stream.on('error', done)
stream.on('end', function (data) {
done(null, array)
})
}
exports ['read an array'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var reader = es.readArray(readThis)
var writer = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['read an array and pause it.'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var reader = es.readArray(readThis)
readStream(reader, 10, function (err, data) {
if(err) throw err
it(data).deepEqual([3, 6, 9, 12, 15, 18, 21, 24, 27, 30])
readStream(reader, 10, function (err, data) {
it(data).deepEqual([33, 36, 39, 42, 45, 48, 51, 54, 57, 60])
test.done()
})
reader.resume()
})
}
exports ['reader is readable, but not writeable'] = function (test) {
var reader = es.readArray([1])
it(reader).has({
readable: true,
writable: false
})
test.done()
}
exports ['read one item per tick'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var drains = 0
var reader = es.readArray(readThis)
var tickMapper = es.map(function (data,callback) {
process.nextTick(function () {
callback(null, data)
})
//since tickMapper is returning false
//pipe should pause the writer until a drain occurs
return false
})
reader.pipe(tickMapper)
readStream(tickMapper, function (err, array) {
it(array).deepEqual(readThis)
it(array.length).deepEqual(readThis.length)
it(drains).equal(readThis.length)
test.done()
})
tickMapper.on('drain', function () {
drains ++
})
}
require('./helper')(module)

View File

@@ -0,0 +1,197 @@
var es = require('../')
, it = require('it-is').style('colour')
, u = require('ubelt')
exports ['read an array'] = function (test) {
console.log('readable')
return test.end()
var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
console.log('readable')
var reader =
es.readable(function (i, callback) {
if(i >= readThis.length)
return this.emit('end')
console.log('readable')
callback(null, readThis[i])
})
var writer = es.writeArray(function (err, array){
if(err) throw err
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['read an array - async'] = function (test) {
var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
var reader =
es.readable(function (i, callback) {
if(i >= readThis.length)
return this.emit('end')
u.delay(callback)(null, readThis[i])
})
var writer = es.writeArray(function (err, array){
if(err) throw err
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['emit data then call next() also works'] = function (test) {
var readThis = u.map(3, 6, 100, u.id) //array of multiples of 3 < 100
var reader =
es.readable(function (i, next) {
if(i >= readThis.length)
return this.emit('end')
this.emit('data', readThis[i])
next()
})
var writer = es.writeArray(function (err, array){
if(err) throw err
it(array).deepEqual(readThis)
test.done()
})
reader.pipe(writer)
}
exports ['callback emits error, then stops'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, called = 0
var reader =
es.readable(function (i, callback) {
if(called++)
return
callback(err)
})
reader.on('error', function (_err){
it(_err).deepEqual(err)
u.delay(function() {
it(called).equal(1)
test.done()
}, 50)()
})
}
exports['readable does not call read concurrently'] = function (test) {
var current = 0
var source = es.readable(function(count, cb){
current ++
if(count > 100)
return this.emit('end')
u.delay(function(){
current --
it(current).equal(0)
cb(null, {ok: true, n: count});
})();
});
var destination = es.map(function(data, cb){
//console.info(data);
cb();
});
var all = es.connect(source, destination);
destination.on('end', test.done)
}
exports ['does not raise a warning: Recursive process.nextTick detected'] = function (test) {
var readThisDelayed;
u.delay(function () {
readThisDelayed = [1, 3, 5];
})();
es.readable(function (count, callback) {
if (readThisDelayed) {
var that = this;
readThisDelayed.forEach(function (item) {
that.emit('data', item);
});
this.emit('end');
test.done();
}
callback();
});
};
//
// emitting multiple errors is not supported by stream.
//
// I do not think that this is a good idea, at least, there should be an option to pipe to
// continue on error. it makes alot ef sense, if you are using Stream like I am, to be able to emit multiple errors.
// an error might not necessarily mean the end of the stream. it depends on the error, at least.
//
// I will start a thread on the mailing list. I'd rather that than use a custom `pipe` implementation.
//
// basically, I want to be able use pipe to transform objects, and if one object is invalid,
// the next might still be good, so I should get to choose if it's gonna stop.
// re-enstate this test when this issue progresses.
//
// hmm. I could add this to es.connect by deregistering the error listener,
// but I would rather it be an option in core.
/*
exports ['emit multiple errors, with 2nd parameter (continueOnError)'] = function (test) {
var readThis = d.map(1, 100, d.id)
, errors = 0
var reader =
es.readable(function (i, callback) {
console.log(i, readThis.length)
if(i >= readThis.length)
return this.emit('end')
if(!(readThis[i] % 7))
return callback(readThis[i])
callback(null, readThis[i])
}, true)
var writer = es.writeArray(function (err, array) {
if(err) throw err
it(array).every(function (u){
it(u % 7).notEqual(0)
}).property('length', 80)
it(errors).equal(14)
test.done()
})
reader.on('error', function (u) {
errors ++
console.log(u)
if('number' !== typeof u)
throw u
it(u % 7).equal(0)
})
reader.pipe(writer)
}
*/
require('./helper')(module)

View File

@@ -0,0 +1,76 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
, spec = require('stream-spec')
var next = process.nextTick
var fizzbuzz = '12F4BF78FB11F1314FB1617F19BF2223FB26F2829FB3132F34BF3738FB41F4344FB4647F49BF5253FB56F5859FB6162F64BF6768FB71F7374FB7677F79BF8283FB86F8889FB9192F94BF9798FB'
, fizz7buzz = '12F4BFseven8FB11F1314FB161sevenF19BF2223FB26F2829FB3132F34BF3seven38FB41F4344FB464sevenF49BF5253FB56F5859FB6162F64BF6seven68FBseven1Fseven3seven4FBseven6sevensevenFseven9BF8283FB86F8889FB9192F94BF9seven98FB'
, fizzbuzzwhitespce = ' 12F4BF78FB11F1314FB1617F19BF2223FB26F2829FB3132F34BF3738FB41F4344FB4647F49BF5253FB56F5859FB6162F64BF6768FB71F7374FB7677F79BF8283FB86F8889FB9192F94BF9798FB '
exports ['fizz buzz'] = function (test) {
var readThis = d.map(1, 100, function (i) {
return (
! (i % 3 || i % 5) ? "FB" :
!(i % 3) ? "F" :
!(i % 5) ? "B" :
''+i
)
}) //array of multiples of 3 < 100
var reader = es.readArray(readThis)
var join = es.wait(function (err, string){
it(string).equal(fizzbuzz)
test.done()
})
reader.pipe(join)
}
exports ['fizz buzz replace'] = function (test) {
var split = es.split(/(1)/)
var replace = es.replace('7', 'seven')
// var x = spec(replace).through()
split
.pipe(replace)
.pipe(es.join(function (err, string) {
it(string).equal(fizz7buzz)
}))
replace.on('close', function () {
// x.validate()
test.done()
})
split.write(fizzbuzz)
split.end()
}
exports ['fizz buzz replace whitespace using regexp'] = function (test) {
var split = es.split(/(1)/)
var replaceLeading = es.replace(/^[\s]*/, '')
var replaceTrailing = es.replace(/[\s]*$/, '')
// var x = spec(replace).through()
split
.pipe(replaceLeading)
.pipe(replaceTrailing)
.pipe(es.join(function (err, string) {
it(string).equal(fizzbuzz)
}))
replaceTrailing.on('close', function () {
// x.validate()
test.done()
})
split.write(fizzbuzz)
split.end()
}
require('./helper')(module)

View File

@@ -0,0 +1,343 @@
'use strict';
var es = require('../')
, it = require('it-is')
, u = require('ubelt')
, spec = require('stream-spec')
, Stream = require('stream')
, from = require('from')
, through = require('through')
//REFACTOR THIS TEST TO USE es.readArray and es.writeArray
function writeArray(array, stream) {
array.forEach( function (j) {
stream.write(j)
})
stream.end()
}
function readStream(stream, done) {
var array = []
stream.on('data', function (data) {
array.push(data)
})
stream.on('error', done)
stream.on('end', function (data) {
done(null, array)
})
}
//call sink on each write,
//and complete when finished.
function pauseStream (prob, delay) {
var pauseIf = (
'number' == typeof prob
? function () {
return Math.random() < prob
}
: 'function' == typeof prob
? prob
: 0.1
)
var delayer = (
!delay
? process.nextTick
: 'number' == typeof delay
? function (next) { setTimeout(next, delay) }
: delay
)
return es.through(function (data) {
if(!this.paused && pauseIf()) {
console.log('PAUSE STREAM PAUSING')
this.pause()
var self = this
delayer(function () {
console.log('PAUSE STREAM RESUMING')
self.resume()
})
}
console.log("emit ('data', " + data + ')')
this.emit('data', data)
})
}
exports ['simple map'] = function (test) {
var input = u.map(1, 1000, function () {
return Math.random()
})
var expected = input.map(function (v) {
return v * 2
})
var pause = pauseStream(0.1)
var fs = from(input)
var ts = es.writeArray(function (err, ar) {
it(ar).deepEqual(expected)
test.done()
})
var map = es.through(function (data) {
this.emit('data', data * 2)
})
spec(map).through().validateOnExit()
spec(pause).through().validateOnExit()
fs.pipe(map).pipe(pause).pipe(ts)
}
exports ['simple map applied to a stream'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
//create event stream from
var doubler = es.map(function (data, cb) {
cb(null, data * 2)
})
spec(doubler).through().validateOnExit()
//a map is only a middle man, so it is both readable and writable
it(doubler).has({
readable: true,
writable: true,
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 2
}))
// process.nextTick(x.validate)
test.done()
})
writeArray(input, doubler)
}
exports['pipe two maps together'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
//create event stream from
function dd (data, cb) {
cb(null, data * 2)
}
var doubler1 = es.map(dd), doubler2 = es.map(dd)
doubler1.pipe(doubler2)
spec(doubler1).through().validateOnExit()
spec(doubler2).through().validateOnExit()
readStream(doubler2, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 4
}))
test.done()
})
writeArray(input, doubler1)
}
//next:
//
// test pause, resume and drian.
//
// then make a pipe joiner:
//
// plumber (evStr1, evStr2, evStr3, evStr4, evStr5)
//
// will return a single stream that write goes to the first
exports ['map will not call end until the callback'] = function (test) {
var ticker = es.map(function (data, cb) {
process.nextTick(function () {
cb(null, data * 2)
})
})
spec(ticker).through().validateOnExit()
ticker.write('x')
ticker.end()
ticker.on('end', function () {
test.done()
})
}
exports ['emit error thrown'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
es.map(function () {
throw err
})
mapper.on('error', function (_err) {
it(_err).equal(err)
test.done()
})
// onExit(spec(mapper).basic().validate)
//need spec that says stream may error.
mapper.write('hello')
}
exports ['emit error calledback'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
es.map(function (data, callback) {
callback(err)
})
mapper.on('error', function (_err) {
it(_err).equal(err)
test.done()
})
mapper.write('hello')
}
exports ['do not emit drain if not paused'] = function (test) {
var map = es.map(function (data, callback) {
u.delay(callback)(null, 1)
return true
})
spec(map).through().pausable().validateOnExit()
map.on('drain', function () {
it(false).ok('should not emit drain unless the stream is paused')
})
it(map.write('hello')).equal(true)
it(map.write('hello')).equal(true)
it(map.write('hello')).equal(true)
setTimeout(function () {map.end()},10)
map.on('end', test.done)
}
exports ['emits drain if paused, when all '] = function (test) {
var active = 0
var drained = false
var map = es.map(function (data, callback) {
active ++
u.delay(function () {
active --
callback(null, 1)
})()
console.log('WRITE', false)
return false
})
spec(map).through().validateOnExit()
map.on('drain', function () {
drained = true
it(active).equal(0, 'should emit drain when all maps are done')
})
it(map.write('hello')).equal(false)
it(map.write('hello')).equal(false)
it(map.write('hello')).equal(false)
process.nextTick(function () {map.end()},10)
map.on('end', function () {
console.log('end')
it(drained).ok('shoud have emitted drain before end')
test.done()
})
}
exports ['map applied to a stream with filtering'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = es.map(function (data, callback) {
if (data % 2)
callback(null, data * 2)
else
callback()
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.filter(function (j) {
return j % 2
}).map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}
exports ['simple mapSync applied to a stream'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = es.mapSync(function (data) {
return data * 2
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}
exports ['mapSync applied to a stream with filtering'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = es.mapSync(function (data) {
if (data % 2)
return data * 2
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.filter(function (j) {
return j % 2
}).map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}
require('./helper')(module)

View File

@@ -0,0 +1,86 @@
/*
assert that data is called many times
assert that end is called eventually
assert that when stream enters pause state,
on drain is emitted eventually.
*/
var es = require('..')
var it = require('it-is').style('colour')
var spec = require('stream-spec')
exports['simple stream'] = function (test) {
var stream = es.through()
var x = spec(stream).basic().pausable()
stream.write(1)
stream.write(1)
stream.pause()
stream.write(1)
stream.resume()
stream.write(1)
stream.end(2) //this will call write()
process.nextTick(function (){
x.validate()
test.done()
})
}
exports['throw on write when !writable'] = function (test) {
var stream = es.through()
var x = spec(stream).basic().pausable()
stream.write(1)
stream.write(1)
stream.end(2) //this will call write()
stream.write(1) //this will be throwing..., but the spec will catch it.
process.nextTick(function () {
x.validate()
test.done()
})
}
exports['end fast'] = function (test) {
var stream = es.through()
var x = spec(stream).basic().pausable()
stream.end() //this will call write()
process.nextTick(function () {
x.validate()
test.done()
})
}
/*
okay, that was easy enough, whats next?
say, after you call paused(), write should return false
until resume is called.
simple way to implement this:
write must return !paused
after pause() paused = true
after resume() paused = false
on resume, if !paused drain is emitted again.
after drain, !paused
there are lots of subtle ordering bugs in streams.
example, set !paused before emitting drain.
the stream api is stateful.
*/
require('./helper')(module)

View File

@@ -0,0 +1,47 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
, join = require('path').join
, fs = require('fs')
, Stream = require('stream').Stream
, spec = require('stream-spec')
exports ['es.split() works like String#split'] = function (test) {
var readme = join(__filename)
, expected = fs.readFileSync(readme, 'utf-8').split('\n')
, cs = es.split()
, actual = []
, ended = false
, x = spec(cs).through()
var a = new Stream ()
a.write = function (l) {
actual.push(l.trim())
}
a.end = function () {
ended = true
expected.forEach(function (v,k) {
//String.split will append an empty string ''
//if the string ends in a split pattern.
//es.split doesn't which was breaking this test.
//clearly, appending the empty string is correct.
//tests are passing though. which is the current job.
if(v)
it(actual[k]).like(v)
})
//give the stream time to close
process.nextTick(function () {
test.done()
x.validate()
})
}
a.writable = true
fs.createReadStream(readme, {flags: 'r'}).pipe(cs)
cs.pipe(a)
}
require('./helper')(module)

View File

@@ -0,0 +1,15 @@
var es = require('../')
exports['handle buffer'] = function (t) {
es.stringify().on('data', function (d) {
t.equal(d.trim(), JSON.stringify('HELLO'))
t.end()
}).write(new Buffer('HELLO'))
}
require('./helper')(module)

View File

@@ -0,0 +1,31 @@
var es = require('../')
, it = require('it-is').style('colour')
, d = require('ubelt')
exports ['write an array'] = function (test) {
var readThis = d.map(3, 6, 100, d.id) //array of multiples of 3 < 100
var writer = es.writeArray(function (err, array){
if(err) throw err //unpossible
it(array).deepEqual(readThis)
test.done()
})
d.each(readThis, writer.write.bind(writer))
writer.end()
}
exports ['writer is writable, but not readable'] = function (test) {
var reader = es.writeArray(function () {})
it(reader).has({
readable: false,
writable: true
})
test.done()
}
require('./helper')(module)

View File

@@ -0,0 +1,3 @@
node_modules
node_modules/*
npm_debug.log

View File

@@ -0,0 +1,4 @@
language: node_js
node_js:
- 0.6
- 0.8

View File

@@ -0,0 +1,24 @@
The MIT License (MIT)
Copyright (c) 2011 Dominic Tarr
Permission is hereby granted, free of charge,
to any person obtaining a copy of this software and
associated documentation files (the "Software"), to
deal in the Software without restriction, including
without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom
the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,26 @@
var inspect = require('util').inspect
if(!module.parent) {
var map = require('..') //load map-stream
var es = require('event-stream') //load event-stream
es.pipe( //pipe joins streams together
process.openStdin(), //open stdin
es.split(), //split stream to break on newlines
map(function (data, callback) { //turn this async function into a stream
var j
try {
j = JSON.parse(data) //try to parse input into json
} catch (err) {
return callback(null, data) //if it fails just pass it anyway
}
callback(null, inspect(j)) //render it nicely
}),
process.stdout // pipe it to stdout !
)
}
// run this
//
// curl -sS registry.npmjs.org/event-stream | node pretty.js
//

View File

@@ -0,0 +1,144 @@
//filter will reemit the data if cb(err,pass) pass is truthy
// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'
var Stream = require('stream').Stream
//create an event stream and apply function to each .write
//emitting each response as data
//unless it's an empty callback
module.exports = function (mapper, opts) {
var stream = new Stream()
, inputs = 0
, outputs = 0
, ended = false
, paused = false
, destroyed = false
, lastWritten = 0
, inNext = false
opts = opts || {};
var errorEventName = opts.failures ? 'failure' : 'error';
// Items that are not ready to be written yet (because they would come out of
// order) get stuck in a queue for later.
var writeQueue = {}
stream.writable = true
stream.readable = true
function queueData (data, number) {
var nextToWrite = lastWritten + 1
if (number === nextToWrite) {
// If it's next, and its not undefined write it
if (data !== undefined) {
stream.emit.apply(stream, ['data', data])
}
lastWritten ++
nextToWrite ++
} else {
// Otherwise queue it for later.
writeQueue[number] = data
}
// If the next value is in the queue, write it
if (writeQueue.hasOwnProperty(nextToWrite)) {
var dataToWrite = writeQueue[nextToWrite]
delete writeQueue[nextToWrite]
return queueData(dataToWrite, nextToWrite)
}
outputs ++
if(inputs === outputs) {
if(paused) paused = false, stream.emit('drain') //written all the incoming events
if(ended) end()
}
}
function next (err, data, number) {
if(destroyed) return
inNext = true
if (!err || opts.failures) {
queueData(data, number)
}
if (err) {
stream.emit.apply(stream, [ errorEventName, err ]);
}
inNext = false;
}
// Wrap the mapper function by calling its callback with the order number of
// the item in the stream.
function wrappedMapper (input, number, callback) {
return mapper.call(null, input, function(err, data){
callback(err, data, number)
})
}
stream.write = function (data) {
if(ended) throw new Error('map stream is not writable')
inNext = false
inputs ++
try {
//catch sync errors and handle them like async errors
var written = wrappedMapper(data, inputs, next)
paused = (written === false)
return !paused
} catch (err) {
//if the callback has been called syncronously, and the error
//has occured in an listener, throw it again.
if(inNext)
throw err
next(err)
return !paused
}
}
function end (data) {
//if end was called with args, write it,
ended = true //write will emit 'end' if ended is true
stream.writable = false
if(data !== undefined) {
return queueData(data, inputs)
} else if (inputs == outputs) { //wait for processing
stream.readable = false, stream.emit('end'), stream.destroy()
}
}
stream.end = function (data) {
if(ended) return
end(data)
}
stream.destroy = function () {
ended = destroyed = true
stream.writable = stream.readable = paused = false
process.nextTick(function () {
stream.emit('close')
})
}
stream.pause = function () {
paused = true
}
stream.resume = function () {
paused = false
}
return stream
}

View File

@@ -0,0 +1,56 @@
{
"_from": "map-stream@0.0.7",
"_id": "map-stream@0.0.7",
"_inBundle": false,
"_integrity": "sha1-ih8HiW2CsQkmvTdEokIACfiJdKg=",
"_location": "/gulp-token-replace/map-stream",
"_phantomChildren": {},
"_requested": {
"type": "version",
"registry": true,
"raw": "map-stream@0.0.7",
"name": "map-stream",
"escapedName": "map-stream",
"rawSpec": "0.0.7",
"saveSpec": null,
"fetchSpec": "0.0.7"
},
"_requiredBy": [
"/gulp-token-replace/event-stream"
],
"_resolved": "https://registry.npmjs.org/map-stream/-/map-stream-0.0.7.tgz",
"_shasum": "8a1f07896d82b10926bd3744a2420009f88974a8",
"_spec": "map-stream@0.0.7",
"_where": "/var/www/html/autocompletion/node_modules/gulp-token-replace/node_modules/event-stream",
"author": {
"name": "Dominic Tarr",
"email": "dominic.tarr@gmail.com",
"url": "http://dominictarr.com"
},
"bugs": {
"url": "https://github.com/dominictarr/map-stream/issues"
},
"bundleDependencies": false,
"dependencies": {},
"deprecated": false,
"description": "construct pipes of streams of events",
"devDependencies": {
"asynct": "*",
"event-stream": "~2.1",
"from": "0.0.2",
"it-is": "1",
"stream-spec": "~0.2",
"ubelt": "~2.9"
},
"homepage": "http://github.com/dominictarr/map-stream",
"license": "MIT",
"name": "map-stream",
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/map-stream.git"
},
"scripts": {
"test": "asynct test/"
},
"version": "0.0.7"
}

View File

@@ -0,0 +1,37 @@
# MapStream
Refactored out of [event-stream](https://github.com/dominictarr/event-stream)
##map (asyncFunction[, options])
Create a through stream from an asyncronous function.
``` js
var map = require('map-stream')
map(function (data, callback) {
//transform data
// ...
callback(null, data)
})
```
Each map MUST call the callback. It may callback with data, with an error or with no arguments,
* `callback()` drop this data.
this makes the map work like `filter`,
note:`callback(null,null)` is not the same, and will emit `null`
* `callback(null, newData)` turn data into newData
* `callback(error)` emit an error for this item.
>Note: if a callback is not called, `map` will think that it is still being processed,
>every call must be answered or the stream will not know when to end.
>
>Also, if the callback is called more than once, every call but the first will be ignored.
##Options
* `failures` - `boolean` continue mapping even if error occured. On error `map-stream` will emit `failure` event. (default: `false`)

View File

@@ -0,0 +1,318 @@
'use strict';
var map = require('../')
, it = require('it-is')
, u = require('ubelt')
, spec = require('stream-spec')
, from = require('from')
, Stream = require('stream')
, es = require('event-stream')
//REFACTOR THIS TEST TO USE es.readArray and es.writeArray
function writeArray(array, stream) {
array.forEach( function (j) {
stream.write(j)
})
stream.end()
}
function readStream(stream, done) {
var array = []
stream.on('data', function (data) {
array.push(data)
})
stream.on('error', done)
stream.on('end', function (data) {
done(null, array)
})
}
//call sink on each write,
//and complete when finished.
function pauseStream (prob, delay) {
var pauseIf = (
'number' == typeof prob
? function () {
return Math.random() < prob
}
: 'function' == typeof prob
? prob
: 0.1
)
var delayer = (
!delay
? process.nextTick
: 'number' == typeof delay
? function (next) { setTimeout(next, delay) }
: delay
)
return es.through(function (data) {
if(!this.paused && pauseIf()) {
console.log('PAUSE STREAM PAUSING')
this.pause()
var self = this
delayer(function () {
console.log('PAUSE STREAM RESUMING')
self.resume()
})
}
console.log("emit ('data', " + data + ')')
this.emit('data', data)
})
}
exports ['simple map applied to a stream'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
//create event stream from
var doubler = map(function (data, cb) {
cb(null, data * 2)
})
spec(doubler).through().validateOnExit()
//a map is only a middle man, so it is both readable and writable
it(doubler).has({
readable: true,
writable: true,
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 2
}))
// process.nextTick(x.validate)
test.done()
})
writeArray(input, doubler)
}
exports ['stream comes back in the correct order'] = function (test) {
var input = [3, 2, 1]
var delayer = map(function(data, cb){
setTimeout(function () {
cb(null, data)
}, 100 * data)
})
readStream(delayer, function (err, output) {
it(output).deepEqual(input)
test.done()
})
writeArray(input, delayer)
}
exports ['continues on error event with failures `true`'] = function (test) {
var input = [1, 2, 3]
var delayer = map(function(data, cb){
cb(new Error('Something gone wrong'), data)
}, { failures: true })
readStream(delayer, function (err, output) {
it(output).deepEqual(input)
test.done()
})
writeArray(input, delayer)
}
exports['pipe two maps together'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
//create event stream from
function dd (data, cb) {
cb(null, data * 2)
}
var doubler1 = map(dd), doubler2 = map(dd)
doubler1.pipe(doubler2)
spec(doubler1).through().validateOnExit()
spec(doubler2).through().validateOnExit()
readStream(doubler2, function (err, output) {
it(output).deepEqual(input.map(function (j) {
return j * 4
}))
test.done()
})
writeArray(input, doubler1)
}
//next:
//
// test pause, resume and drian.
//
// then make a pipe joiner:
//
// plumber (evStr1, evStr2, evStr3, evStr4, evStr5)
//
// will return a single stream that write goes to the first
exports ['map will not call end until the callback'] = function (test) {
var ticker = map(function (data, cb) {
process.nextTick(function () {
cb(null, data * 2)
})
})
spec(ticker).through().validateOnExit()
ticker.write('x')
ticker.end()
ticker.on('end', function () {
test.done()
})
}
exports ['emit failures with opts.failures === `ture`'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
map(function () {
throw err
}, { failures: true })
mapper.on('failure', function (_err) {
it(_err).equal(err)
test.done()
})
mapper.write('hello')
}
exports ['emit error thrown'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
map(function () {
throw err
})
mapper.on('error', function (_err) {
it(_err).equal(err)
test.done()
})
mapper.write('hello')
}
exports ['emit error calledback'] = function (test) {
var err = new Error('INTENSIONAL ERROR')
, mapper =
map(function (data, callback) {
callback(err)
})
mapper.on('error', function (_err) {
it(_err).equal(err)
test.done()
})
mapper.write('hello')
}
exports ['do not emit drain if not paused'] = function (test) {
var maps = map(function (data, callback) {
u.delay(callback)(null, 1)
return true
})
spec(maps).through().pausable().validateOnExit()
maps.on('drain', function () {
it(false).ok('should not emit drain unless the stream is paused')
})
it(maps.write('hello')).equal(true)
it(maps.write('hello')).equal(true)
it(maps.write('hello')).equal(true)
setTimeout(function () {maps.end()},10)
maps.on('end', test.done)
}
exports ['emits drain if paused, when all '] = function (test) {
var active = 0
var drained = false
var maps = map(function (data, callback) {
active ++
u.delay(function () {
active --
callback(null, 1)
})()
console.log('WRITE', false)
return false
})
spec(maps).through().validateOnExit()
maps.on('drain', function () {
drained = true
it(active).equal(0, 'should emit drain when all maps are done')
})
it(maps.write('hello')).equal(false)
it(maps.write('hello')).equal(false)
it(maps.write('hello')).equal(false)
process.nextTick(function () {maps.end()},10)
maps.on('end', function () {
console.log('end')
it(drained).ok('shoud have emitted drain before end')
test.done()
})
}
exports ['map applied to a stream with filtering'] = function (test) {
var input = [1,2,3,7,5,3,1,9,0,2,4,6]
var doubler = map(function (data, callback) {
if (data % 2)
callback(null, data * 2)
else
callback()
})
readStream(doubler, function (err, output) {
it(output).deepEqual(input.filter(function (j) {
return j % 2
}).map(function (j) {
return j * 2
}))
test.done()
})
spec(doubler).through().validateOnExit()
writeArray(input, doubler)
}

View File

@@ -0,0 +1,3 @@
node_modules
node_modules/*
npm_debug.log

View File

@@ -0,0 +1,3 @@
language: node_js
node_js:
- "0.10"

View File

@@ -0,0 +1,22 @@
Copyright (c) 2011 Dominic Tarr
Permission is hereby granted, free of charge,
to any person obtaining a copy of this software and
associated documentation files (the "Software"), to
deal in the Software without restriction, including
without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom
the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,26 @@
var inspect = require('util').inspect
var es = require('event-stream') //load event-stream
var split = require('../')
if(!module.parent) {
es.pipe( //pipe joins streams together
process.openStdin(), //open stdin
split(), //split stream to break on newlines
es.map(function (data, callback) {//turn this async function into a stream
var j
try {
j = JSON.parse(data) //try to parse input into json
} catch (err) {
return callback(null, data) //if it fails just pass it anyway
}
callback(null, inspect(j)) //render it nicely
}),
process.stdout // pipe it to stdout !
)
}
// run this
//
// curl -sS registry.npmjs.org/event-stream | node pretty.js
//

View File

@@ -0,0 +1,63 @@
//filter will reemit the data if cb(err,pass) pass is truthy
// reduce is more tricky
// maybe we want to group the reductions or emit progress updates occasionally
// the most basic reduce just emits one 'data' event after it has recieved 'end'
var through = require('through')
var Decoder = require('string_decoder').StringDecoder
module.exports = split
//TODO pass in a function to map across the lines.
function split (matcher, mapper, options) {
var decoder = new Decoder()
var soFar = ''
var maxLength = options && options.maxLength;
var trailing = options && options.trailing === false ? false : true
if('function' === typeof matcher)
mapper = matcher, matcher = null
if (!matcher)
matcher = /\r?\n/
function emit(stream, piece) {
if(mapper) {
try {
piece = mapper(piece)
}
catch (err) {
return stream.emit('error', err)
}
if('undefined' !== typeof piece)
stream.queue(piece)
}
else
stream.queue(piece)
}
function next (stream, buffer) {
var pieces = ((soFar != null ? soFar : '') + buffer).split(matcher)
soFar = pieces.pop()
if (maxLength && soFar.length > maxLength)
return stream.emit('error', new Error('maximum buffer reached'))
for (var i = 0; i < pieces.length; i++) {
var piece = pieces[i]
emit(stream, piece)
}
}
return through(function (b) {
next(this, decoder.write(b))
},
function () {
if(decoder.end)
next(this, decoder.end())
if(trailing && soFar != null)
emit(this, soFar)
this.queue(null)
})
}

View File

@@ -0,0 +1,62 @@
{
"_from": "split@^1.0.1",
"_id": "split@1.0.1",
"_inBundle": false,
"_integrity": "sha512-mTyOoPbrivtXnwnIxZRFYRrPNtEFKlpB2fvjSnCQUiAA6qAZzqwna5envK4uk6OIeP17CsdF3rSBGYVBsU0Tkg==",
"_location": "/gulp-token-replace/split",
"_phantomChildren": {},
"_requested": {
"type": "range",
"registry": true,
"raw": "split@^1.0.1",
"name": "split",
"escapedName": "split",
"rawSpec": "^1.0.1",
"saveSpec": null,
"fetchSpec": "^1.0.1"
},
"_requiredBy": [
"/gulp-token-replace/event-stream"
],
"_resolved": "https://registry.npmjs.org/split/-/split-1.0.1.tgz",
"_shasum": "605bd9be303aa59fb35f9229fbea0ddec9ea07d9",
"_spec": "split@^1.0.1",
"_where": "/var/www/html/autocompletion/node_modules/gulp-token-replace/node_modules/event-stream",
"author": {
"name": "Dominic Tarr",
"email": "dominic.tarr@gmail.com",
"url": "http://bit.ly/dominictarr"
},
"bugs": {
"url": "https://github.com/dominictarr/split/issues"
},
"bundleDependencies": false,
"dependencies": {
"through": "2"
},
"deprecated": false,
"description": "split a Text Stream into a Line Stream",
"devDependencies": {
"asynct": "*",
"event-stream": "~3.0.2",
"it-is": "1",
"stream-spec": "~0.2",
"string-to-stream": "~1.0.0",
"ubelt": "~2.9"
},
"engines": {
"node": "*"
},
"homepage": "http://github.com/dominictarr/split",
"license": "MIT",
"name": "split",
"optionalDependencies": {},
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/split.git"
},
"scripts": {
"test": "asynct test/"
},
"version": "1.0.1"
}

View File

@@ -0,0 +1,72 @@
# Split (matcher)
[![build status](https://secure.travis-ci.org/dominictarr/split.png)](http://travis-ci.org/dominictarr/split)
Break up a stream and reassemble it so that each line is a chunk. matcher may be a `String`, or a `RegExp`
Example, read every line in a file ...
``` js
fs.createReadStream(file)
.pipe(split())
.on('data', function (line) {
//each chunk now is a separate line!
})
```
`split` takes the same arguments as `string.split` except it defaults to '/\r?\n/' instead of ',', and the optional `limit` parameter is ignored.
[String#split](https://developer.mozilla.org/en/JavaScript/Reference/Global_Objects/String/split)
`split` takes an optional options object on its third argument.
``` js
split(matcher, mapper, options)
```
Valid options:
* maxLength - The maximum buffer length without seeing a newline or `matcher`,
if a single line exceeds this, the split stream will emit an error.
``` js
split(JSON.parse, null, { maxLength: 2})
```
* trailing - By default the last buffer not delimited by a newline or `matcher` will be emitted. To prevent this set `options.trailing` to `false`.
``` js
split(JSON.parse, null, { trailing: false })
```
## keep matched splitter
As with `String#split`, if you split by a regular expression with a matching group,
the matches will be retained in the collection.
```
stdin
.pipe(split(/(\r?\n)/))
... //lines + separators.
```
# NDJ - Newline Delimited Json
`split` accepts a function which transforms each line.
``` js
fs.createReadStream(file)
.pipe(split(JSON.parse))
.on('data', function (obj) {
//each chunk now is a a js object
})
.on('error', function (err) {
//syntax errors will land here
//note, this ends the stream.
})
```
# License
MIT

View File

@@ -0,0 +1,46 @@
var it = require('it-is').style('colour')
, split = require('..')
exports ['maximum buffer limit'] = function (test) {
var s = split(JSON.parse, null, {
maxLength: 2
})
, caughtError = false
, rows = []
s.on('error', function (err) {
caughtError = true
})
s.on('data', function (row) { rows.push(row) })
s.write('{"a":1}\n{"')
s.write('{ "')
it(caughtError).equal(true)
s.end()
test.done()
}
exports ['ignore trailing buffers'] = function (test) {
var s = split(JSON.parse, null, {
trailing: false
})
, caughtError = false
, rows = []
s.on('error', function (err) {
caughtError = true
})
s.on('data', function (row) { rows.push(row) })
s.write('{"a":1}\n{"')
s.write('{ "')
s.end()
it(caughtError).equal(false)
it(rows).deepEqual([ { a: 1 } ])
test.done()
}

View File

@@ -0,0 +1,34 @@
var it = require('it-is').style('colour')
, split = require('..')
exports ['split data with partitioned unicode character'] = function (test) {
var s = split(/,/g)
, caughtError = false
, rows = []
s.on('error', function (err) {
caughtError = true
})
s.on('data', function (row) { rows.push(row) })
var x = 'テスト試験今日とても,よい天気で'
unicodeData = new Buffer(x);
// partition of 日
piece1 = unicodeData.slice(0, 20);
piece2 = unicodeData.slice(20, unicodeData.length);
s.write(piece1);
s.write(piece2);
s.end()
it(caughtError).equal(false)
it(rows).deepEqual(['テスト試験今日とても', 'よい天気で']);
it(rows).deepEqual(x.split(','))
test.done()
}

View File

@@ -0,0 +1,137 @@
var es = require('event-stream')
, it = require('it-is').style('colour')
, d = require('ubelt')
, split = require('..')
, join = require('path').join
, fs = require('fs')
, Stream = require('stream').Stream
, Readable = require('stream').Readable
, spec = require('stream-spec')
, through = require('through')
, stringStream = require('string-to-stream')
exports ['split() works like String#split'] = function (test) {
var readme = join(__filename)
, expected = fs.readFileSync(readme, 'utf-8').split('\n')
, cs = split()
, actual = []
, ended = false
, x = spec(cs).through()
var a = new Stream ()
a.write = function (l) {
actual.push(l.trim())
}
a.end = function () {
ended = true
expected.forEach(function (v,k) {
//String.split will append an empty string ''
//if the string ends in a split pattern.
//es.split doesn't which was breaking this test.
//clearly, appending the empty string is correct.
//tests are passing though. which is the current job.
if(v)
it(actual[k]).like(v)
})
//give the stream time to close
process.nextTick(function () {
test.done()
x.validate()
})
}
a.writable = true
fs.createReadStream(readme, {flags: 'r'}).pipe(cs)
cs.pipe(a)
}
exports ['split() takes mapper function'] = function (test) {
var readme = join(__filename)
, expected = fs.readFileSync(readme, 'utf-8').split('\n')
, cs = split(function (line) { return line.toUpperCase() })
, actual = []
, ended = false
, x = spec(cs).through()
var a = new Stream ()
a.write = function (l) {
actual.push(l.trim())
}
a.end = function () {
ended = true
expected.forEach(function (v,k) {
//String.split will append an empty string ''
//if the string ends in a split pattern.
//es.split doesn't which was breaking this test.
//clearly, appending the empty string is correct.
//tests are passing though. which is the current job.
if(v)
it(actual[k]).equal(v.trim().toUpperCase())
})
//give the stream time to close
process.nextTick(function () {
test.done()
x.validate()
})
}
a.writable = true
fs.createReadStream(readme, {flags: 'r'}).pipe(cs)
cs.pipe(a)
}
exports ['split() works with empty string chunks'] = function (test) {
var str = ' foo'
, expected = str.split(/[\s]*/).reduce(splitBy(/[\s]*/), [])
, cs1 = split(/[\s]*/)
, cs2 = split(/[\s]*/)
, actual = []
, ended = false
, x = spec(cs1).through()
, y = spec(cs2).through()
var a = new Stream ()
a.write = function (l) {
actual.push(l.trim())
}
a.end = function () {
ended = true
expected.forEach(function (v,k) {
//String.split will append an empty string ''
//if the string ends in a split pattern.
//es.split doesn't which was breaking this test.
//clearly, appending the empty string is correct.
//tests are passing though. which is the current job.
if(v)
it(actual[k]).like(v)
})
//give the stream time to close
process.nextTick(function () {
test.done()
x.validate()
y.validate()
})
}
a.writable = true
cs1.pipe(cs2)
cs2.pipe(a)
cs1.write(str)
cs1.end()
}
function splitBy (delimiter) {
return function (arr, piece) {
return arr.concat(piece.split(delimiter))
}
}

View File

@@ -0,0 +1,51 @@
var it = require('it-is').style('colour')
, split = require('..')
exports ['emit mapper exceptions as error events'] = function (test) {
var s = split(JSON.parse)
, caughtError = false
, rows = []
s.on('error', function (err) {
caughtError = true
})
s.on('data', function (row) { rows.push(row) })
s.write('{"a":1}\n{"')
it(caughtError).equal(false)
it(rows).deepEqual([ { a: 1 } ])
s.write('b":2}\n{"c":}\n')
it(caughtError).equal(true)
it(rows).deepEqual([ { a: 1 }, { b: 2 } ])
s.end()
test.done()
}
exports ['mapper error events on trailing chunks'] = function (test) {
var s = split(JSON.parse)
, caughtError = false
, rows = []
s.on('error', function (err) {
caughtError = true
})
s.on('data', function (row) { rows.push(row) })
s.write('{"a":1}\n{"')
it(caughtError).equal(false)
it(rows).deepEqual([ { a: 1 } ])
s.write('b":2}\n{"c":}')
it(caughtError).equal(false)
it(rows).deepEqual([ { a: 1 }, { b: 2 } ])
s.end()
it(caughtError).equal(true)
it(rows).deepEqual([ { a: 1 }, { b: 2 } ])
test.done()
}

View File

@@ -0,0 +1,3 @@
node_modules
node_modules/*
npm_debug.log

View File

@@ -0,0 +1,4 @@
language: node_js
node_js:
- 0.6
- 0.8

View File

@@ -0,0 +1,22 @@
Copyright (c) 2012 'Dominic Tarr'
Permission is hereby granted, free of charge,
to any person obtaining a copy of this software and
associated documentation files (the "Software"), to
deal in the Software without restriction, including
without limitation the rights to use, copy, modify,
merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom
the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -0,0 +1,55 @@
# stream-combiner
[![npm version](https://img.shields.io/npm/v/stream-combiner.svg)](https://npmjs.org/package/stream-combiner)
[![Travis CI](https://travis-ci.org/dominictarr/stream-combiner.svg)](https://travis-ci.org/dominictarr/stream-combiner)
## Combine (stream1,...,streamN)
Turn a pipeline into a single stream. `Combine` returns a stream that writes to the first stream
and reads from the last stream.
Listening for 'error' will recieve errors from all streams inside the pipe.
```js
var Combine = require('stream-combiner')
var es = require('event-stream')
Combine( // connect streams together with `pipe`
process.openStdin(), // open stdin
es.split(), // split stream to break on newlines
es.map(function (data, callback) { // turn this async function into a stream
var repr = util.inspect(JSON.parse(data)) // render it nicely
callback(null, repr)
}),
process.stdout // pipe it to stdout !
)
```
Can also be called with an array:
```js
var combinedStream = Combine([
stream1,
stream2,
]);
```
Or to combine gulp plugins:
```js
function coffeePipe() {
return Combine(
coffeescript(),
coffeelint.reporter('fail').on('error', function(){
gutil.beep()
gulp.run('lint')
})
}
//usage:
gulp.src().pipe(coffeePipe());
```
## License
MIT

View File

@@ -0,0 +1,45 @@
var duplexer = require('duplexer')
var through = require('through')
module.exports = function () {
var streams
if(arguments.length == 1 && Array.isArray(arguments[0])) {
streams = arguments[0]
} else {
streams = [].slice.call(arguments)
}
if(streams.length == 0)
return through()
else if(streams.length == 1)
return streams[0]
var first = streams[0]
, last = streams[streams.length - 1]
, thepipe = duplexer(first, last)
//pipe all the streams together
function recurse (streams) {
if(streams.length < 2)
return
streams[0].pipe(streams[1])
recurse(streams.slice(1))
}
recurse(streams)
function onerror () {
var args = [].slice.call(arguments)
args.unshift('error')
thepipe.emit.apply(thepipe, args)
}
//es.duplex already reemits the error from the first and last stream.
//add a listener for the inner streams in the pipeline.
for(var i = 1; i < streams.length - 1; i ++)
streams[i].on('error', onerror)
return thepipe
}

View File

@@ -0,0 +1,55 @@
{
"_from": "stream-combiner@^0.2.2",
"_id": "stream-combiner@0.2.2",
"_inBundle": false,
"_integrity": "sha1-rsjLrBd7Vrb0+kec7YwZEs7lKFg=",
"_location": "/gulp-token-replace/stream-combiner",
"_phantomChildren": {},
"_requested": {
"type": "range",
"registry": true,
"raw": "stream-combiner@^0.2.2",
"name": "stream-combiner",
"escapedName": "stream-combiner",
"rawSpec": "^0.2.2",
"saveSpec": null,
"fetchSpec": "^0.2.2"
},
"_requiredBy": [
"/gulp-token-replace/event-stream"
],
"_resolved": "http://registry.npmjs.org/stream-combiner/-/stream-combiner-0.2.2.tgz",
"_shasum": "aec8cbac177b56b6f4fa479ced8c1912cee52858",
"_spec": "stream-combiner@^0.2.2",
"_where": "/var/www/html/autocompletion/node_modules/gulp-token-replace/node_modules/event-stream",
"author": {
"name": "'Dominic Tarr'",
"email": "dominic.tarr@gmail.com",
"url": "http://dominictarr.com"
},
"bugs": {
"url": "https://github.com/dominictarr/stream-combiner/issues"
},
"bundleDependencies": false,
"dependencies": {
"duplexer": "~0.1.1",
"through": "~2.3.4"
},
"deprecated": false,
"description": "[![npm version](https://img.shields.io/npm/v/stream-combiner.svg)](https://npmjs.org/package/stream-combiner) [![Travis CI](https://travis-ci.org/dominictarr/stream-combiner.svg)](https://travis-ci.org/dominictarr/stream-combiner)",
"devDependencies": {
"event-stream": "~3.0.7",
"tape": "~2.3.0"
},
"homepage": "https://github.com/dominictarr/stream-combiner",
"license": "MIT",
"name": "stream-combiner",
"repository": {
"type": "git",
"url": "git://github.com/dominictarr/stream-combiner.git"
},
"scripts": {
"test": "set -e; for t in test/*.js; do node $t; done"
},
"version": "0.2.2"
}

View File

@@ -0,0 +1,65 @@
var es = require('event-stream')
var combine = require('..')
var test = require('tape')
test('do not duplicate errors', function (test) {
var errors = 0;
var pipe = combine(
es.through(function(data) {
return this.emit('data', data);
}),
es.through(function(data) {
return this.emit('error', new Error(data));
})
)
pipe.on('error', function(err) {
errors++
test.ok(errors, 'expected error count')
process.nextTick(function () {
return test.end();
})
})
return pipe.write('meh');
})
test('3 pipe do not duplicate errors', function (test) {
var errors = 0;
var pipe = combine(
es.through(function(data) {
return this.emit('data', data);
}),
es.through(function(data) {
return this.emit('error', new Error(data));
}),
es.through()
)
pipe.on('error', function(err) {
errors++
test.ok(errors, 'expected error count')
process.nextTick(function () {
return test.end();
})
})
return pipe.write('meh');
})
test('0 argument through stream', function (test) {
test.plan(3)
var pipe = combine()
, expected = [ 'beep', 'boop', 'robots' ]
pipe.pipe(es.through(function(data) {
test.equal(data, expected.shift())
}))
pipe.write('beep')
pipe.write('boop')
pipe.end('robots')
})