readStream.pipe( duplexStream ).pipe(writableStream) してみた
Re:node.jsでストリーミング的な - .blog
これ書いた後に
HTTP_Request.pipe(MyDuplexStream).pipe(HTTP_Response);
させたくなった。
実際には、HTTP_Request.on('data')時のデータをそのまま HTTP_Responseに流すわけじゃないのでデータを変換するストリームの実装になる。
MyDuplexStream の実装
readableStram な
- this.readable = true する
- this.emit('data', data) する
- this.emit('end') する
- util.inherits(MyDuplexStream, require('stream').Stream) する(ことで、pipeを継承する)
writableStream な
- this.writable = true する
- this.write, this.destroy, this.end を実装する
#!/usr/bin/env node var stream = require('stream') , util = require('util') , http = require('http') , path = require('path') , fs = require('fs') , url = require('url') ; var config = { port : 3000 , statics : __dirname }; function FunctorStream (req, res) { this.readable = true; this.writable = true; var rs = this.readStream = fs.createReadStream(path.join( config.statics , decodeURIComponent(url.parse(req.url).pathname) )); this.writeStream = res; rs.on( 'error', this.onError.bind(this)); rs.once('data', this.onceData.bind(this)); rs.on( 'data', this.onData.bind(this)); rs.on( 'end', this.emit.bind(this, 'end')); res.on('drain', this.emit.bind(this, 'drain')); } util.inherits(FunctorStream, stream.Stream); (function (fp) { fp.onError = function (err) { this.writeStream.writeHead(500, { 'Content-Type': 'text/plain' }); this.writeStream.end(err.toString()); console.error(err); }; ('write end').split(' ').forEach(function (writeableMethod) { fp[writeableMethod] = function () {}; // dummy }); ('pause resume').split(' ').forEach(function (readableMethod) { fp[readableMethod] = function () { this.readStream[readableMethod] }; }); fp.destroy = function () { this.writable = false; this.readable = false; }; fp.onceData = function () { this.writeStream.writeHead(200, { 'Content-Type': 'video/mp4' , 'Transfer-Encoding': 'chunked' }); }; fp.onData = function (chunk) { this.emit('data', chunk); }; })(FunctorStream.prototype); http.createServer(function (req, res) { req.pipe(new FunctorStream(req, res)).pipe(res); }).listen(config.port); console.log('server start to listen on port "%d"', config.port);
現実的じゃないですね