new (stream.Transform) してみた

夏コミが終わったので、node.js のバージョンを v0.10.15 にした。
ので、Stream2 を使えるようになりました。
なので、早速(というか今更)Stream2 を触り始めてる

stream.Transform

stream.Readable と stream.Writable は Stream1 からあるので(API変わってるけど)、今回は Transformを使ってみる

まず簡単に

  • ReadableStream: 100ミリ秒毎に 10 > 9 ... 0 とカウントダウンしていく
  • WritableStream: 標準ストリームにプリントする
  • TransformStream: ReadableStream が発行するデータ(カウント)を (10 - カウント)に変換して出力する

を実装する


(transf01.js)

var util   = require('util')
var stream = require('stream')

var rs = stream.Readable()
rs.count = 10
rs._read = function () {
    this.count >= 0
        ? setTimeout(this.countdown.bind(this), 100)
        : this.push(null)
}
rs.countdown = function () {
    this.push(String(this.count--))
}


function Transf () {
    stream.Transform.call(this)
}
util.inherits(Transf, stream.Transform)

Transf.prototype._transform = function (chunk, enc, done) {
    this.push(String((10 - Number(chunk)) + '\n'))
    done()
}


process.stdout.on('error', process.exit)
process.on('exit', function () {
    console.error('process.exit')
})

//rs.pipe(process.stdout)
rs.pipe(new Transf).pipe(process.stdout)

結果

0
1
2
3
4
5
6
7
8
9
10
process.exit

この例だと MyTransform.ptototpe._flush を使ってないので、_flush を使ってみる。

  • ReadableStream: http.ServerRequest
  • WritableStream: http.ServerResponse
  • TransformStream: リクエストヘッダとリクエストデータをパースし、JSON形式に変換するストリーム

(transf02.js)

var util   = require('util')
var stream = require('stream')

function Transf () {
    stream.Transform.call(this)

    this.body = ''
    this.headers = null

    this.once('pipe', this.oncePipe.bind(this))
}
util.inherits(Transf, stream.Transform)

Transf.prototype.oncePipe = function (req) {
    req.setEncoding('utf8')
    this.headers = req.headers
}

Transf.prototype._transform = function (chunk, enc, done) {
    this.body += chunk
    done()
}
Transf.prototype._flush = function (done) { // 
    try {
        this.push(JSON.stringify({
            headers: this.headers
          , body:    JSON.parse(this.body)
        }, null, 4))
    } catch (err) {
        console.error(err)
        this.emit('error', err)
    }

    done()
}


var http = require('http')
http.createServer(function (req, res) {
    var transf = new Transf
    req.pipe(transf).pipe(res)

    transf.on('error', function (err) {
        res.statusCode = 500
        res.end(err.name + ': ' + err.message)
    })
}).listen(1337)

結果

$ curl localhost:1337 -d '{"foo": "bar", "hoge": {"hello": "world"}}'
...
{
    "headers": {
        "user-agent": "curl/7.21.2 (x86_64-apple-darwin10.6.0) libcurl/7.21.2 OpenSSL/1.0.0d zlib/1.2.5 libidn/1.20",
        "host": "localhost:1337",
        "accept": "*/*",
        "content-length": "42",
        "content-type": "application/x-www-form-urlencoded"
    },
    "body": {
        "foo": "bar",
        "hoge": {
            "hello": "world"
        }
    }
}

# 文法違反のリクエスト送ってみる
$ curl localhost:1337 -d '["foo": "bar"]'
...
SyntaxError: Unexpected token :

"_transform は writable.write の直前にフックする"、"_flush は writable.end の直前にフックする" とイメージしてみたけど、あってるんだろうか?

もうちょっとドキュメント読んでみないとまずい印象