readStream.pipe( duplexStream ).pipe(writableStream) してみた

Re:node.jsでストリーミング的な - .blog
これ書いた後に

HTTP_Request.pipe(MyDuplexStream).pipe(HTTP_Response);

させたくなった。
実際には、HTTP_Request.on('data')時のデータをそのまま HTTP_Responseに流すわけじゃないのでデータを変換するストリームの実装になる。

MyDuplexStream の実装

readableStram な
  • this.readable = true する
  • this.emit('data', data) する
  • this.emit('end') する
  • util.inherits(MyDuplexStream, require('stream').Stream) する(ことで、pipeを継承する)
writableStream な
  • this.writable = true する
  • this.write, this.destroy, this.end を実装する
#!/usr/bin/env node
var stream = require('stream')
,   util   = require('util')
,   http   = require('http')
,   path   = require('path')
,   fs     = require('fs')
,   url    = require('url')
;

var config = {
    port : 3000
  , statics : __dirname
};

function FunctorStream (req, res) {
    this.readable = true;
    this.writable = true;

    var rs =
    this.readStream = fs.createReadStream(path.join(
        config.statics
      , decodeURIComponent(url.parse(req.url).pathname)
    ));
    this.writeStream = res;

    rs.on(  'error', this.onError.bind(this));
    rs.once('data',  this.onceData.bind(this));
    rs.on(  'data',  this.onData.bind(this));
    rs.on(  'end',   this.emit.bind(this, 'end'));

    res.on('drain',  this.emit.bind(this, 'drain'));
}
util.inherits(FunctorStream, stream.Stream);
(function (fp) {
    fp.onError = function (err) {
        this.writeStream.writeHead(500, {
            'Content-Type': 'text/plain'
        });
        this.writeStream.end(err.toString());
        console.error(err);
    };
    ('write end').split(' ').forEach(function (writeableMethod) {
        fp[writeableMethod] = function () {}; // dummy
    });
    ('pause resume').split(' ').forEach(function (readableMethod) {
        fp[readableMethod] = function () { this.readStream[readableMethod] };
    });
    fp.destroy = function () {
        this.writable = false;
        this.readable = false;
    };
    fp.onceData = function () {
        this.writeStream.writeHead(200, {
            'Content-Type': 'video/mp4'
          , 'Transfer-Encoding': 'chunked'
        });
    };
    fp.onData = function (chunk) {
        this.emit('data', chunk);
    };
})(FunctorStream.prototype);

http.createServer(function (req, res) {
    req.pipe(new FunctorStream(req, res)).pipe(res);
}).listen(config.port);

console.log('server start to listen on port "%d"', config.port);

現実的じゃないですね