FizzBuzz問題をstreamで解く

動機的なもの

別に Stream である必要はないんだけど、Nodejs でデータを扱うなら Stream を通したいですよね! pipe 使いたいですよね!!

(countStream).pipe(fizzBuzzStream).pipe(process.stdout);

コード的には上記のように書きたいので、とりあえず機能ごとに各ストリームに分離する。

  • 1, 2, 3 ... リミットの数 とカウントして その数を流すストリーム
  • 数字を 'Fizz', 'Buzz', 'FizzBuzz' に変換するストリーム
  • 文字列をコンソールに書きだすストリーム
さて、

FizzBuzzStream の実装を自前でやると面倒くさいです。なので、npmライブラリの through を使うと吉です。
through 自体は文字通り throughストリームを作るライブラリ。

fizzbuzz-stream.js

var require('through');
module.exports = function () {
    return through(
        function _write (data) {
            var num = Number(data);
            this.emit('data', num % 15 === 0 ? 'FizzBuzz' :
                              num %  3 === 0 ? 'Fizz' :
                              num %  5 === 0 ? 'Buzz' : data
                     );
        }
      , function _end () { this.emit('end') }
    );
};

countStreamを作るモジュールは記事の最後に記載しますが、ここでは準備されているものとして、 FizzBuzz問題を解いてみます。

fizzbuzz.js

var createCountStream    = require( __dirname + '/count-stream');
var createFizzBuzzStream = require( __dirname + '/fizzbuzz-stream');

// process.stdout.write は改行コードを吐かないので細工する
// fizzbuzzStream の方で改行コードをくっつけて流せばいいんだけど、
// なんか気味悪い感じするので...
process.stdout.write = function () {
    var w = process.stdout.write.bind(process.stdout);
    return function processStdoutWrite (s) { w(s + "\n") };
}();

var count = createCountStream(16);
count
  .pipe( createFizzBuzzStream())
    .pipe( process.stdout);
count.resume();

/* 結果
 * 1
 * 2
 * Fizz
 * 4
 * Buzz
 * Fizz
 * 7
 * 8
 * Fizz
 * Buzz
 * 11
 * Fizz
 * 13
 * 14
 * FizzBuzz
 * 16
*/

fizzbuzz-stream.js を見ればわかることだけど、このストリームは、単純に値を変換しているだけのストリームなので、FizzBuzz 以外の要件でも _write 関数の中身を書き換えるだけで、色々応用出来るのが嬉しいですね。


count-stream.js

var stream = require('stream');
var util   = require('util');

function CountStream (max) {
    stream.Stream.call(this);

    if (typeof max !== 'number'   ||
        parseInt(max, 10) !== max ||
        max <= 0
    ) {
        throw new TypeError('"max" must be "Integer Number" and over "0"');
    }

    this.max      = max;
    this.count    = 0;
    this.readable = true;

    this.once('close', function () {
        this.readable = false;
        this.emit('end');
    }.bind(this));
}
util.inherits(CountStream, stream.Stream);

CountStream.prototype.resume = function () {
    var that = this;
    var count = function () {
        process.nextTick(function () {
            if (that.paused || ! that.readable) return;
            if (that.count >= that.max) return that.destroy();

            that.emit('data', (that.count += 1).toString());
            count();
        });
    };

    delete this.paused;
    count();
};
CountStream.prototype.pause = function () {
    if (! this.readable) return;
    this.paused = true;
};
CountStream.prototype.destroy = function () {
    if (! this.readable) return;
    this.emit('close');
};

module.exports.CountStream = CountStream;
module.exports = function (count) {
    return new CountStream(count);
};