Node.js でパラレルダウンロードする時

Node.jsならパラレルダウンロードですよね!

※ 先日の NDS25 で LT用に何かと思ったんですけど、何も思いつかなかったんで、今頃ネタ出し。
※ 今回の NDS25 では初見の方も多かったので、その方たち向けの自己紹介エントリーを兼用してます。


さて、 http.request で 同一ドメインに接続できる上限というのは設定されていて、デフォルトで 5つ。
恐ろしいことに、この上限を超えたリクエストは無視されます。
ただし、この上限数は変更できます。

var url, n, parsedUrl, http, agent;

url       = 'http://hogehoge.org/';
n         = 8; // 変更したい上限数
parsedUrl = require('url').parse(url);
http      = require('http');
agent     = http.getAgent(parsedUrl.host, parsedUrl.port || 80);

agent.maxSocket = n;

のようにすればいいんですが、動画のようなファイルを並列でダウンロードしたい場合に上限を増やしてDLするとメモリがこわいです。

追記:@neko_gata_s さんの指摘にあるように v0.5.3以上では、上限を超えたリクエストはagent.requests にプールされて処理されてました。
ので、v0.5.3以前限定の回避方法になります


ということで、どうにか上限を超えたリクエストをさばく戦略を考えます。

考えますと言いつつ、結局は agent が使用しているソケット数を監視して

  • ソケットの接続数が上限以下の場合、リクエストを投げる
  • ソケットの接続数が上限に達している場合、ソケットの接続数が上限を下回るまで、リクエストを待機させて、下回った段階でリクエストを投げる

ようにすればいいかなと。

んじゃ、実際どうするか

  • setInterval を使って定期的に接続数を監視して、接続数が減った段階でリクエストを投げる
  • ソケット切れイベントに合わせた(リクエストを投げる)リスナーを使う

まず、最初の例。こっちのほうが分かりやすいですね。

var help, watcher;

help = function () {
    var req = http.request(requestOptions); // requestOptions は端折ります。

    req.on('response', function (res) {
        ...処理
    });

    if (requestBody) req.write(requestBody); // requestBody も端折ります。
    req.end();
};

watcher = function () {
    var agent, intervalID;
    agent      = http.getAgent(requestOptions.host, requestOptions.port || 80);
    intervalID = setInterval(function () {
        if (agent.sockets.length < agent.maxSocket) {
            clearinterval(intervalID);
            help();
            return;
        }
    }, 10);
};

watcher();

ソケット切れイベントにあわせて発火させる例。ちょっとめんどくさい。

var agents, agentId, help;
agents = {};
agents.onSocketsLengthChange = function (agent) {
    var help;
    while (agent.sockets.length < agent.maxSocket && agent.waitRequests.length > 0) {
        help = agent.waitRequests.shift();
        help();
    }

    agent.sockets.forEach(function (socket) {
        if (! socket.hasOnCloseListener) {
            socket.hasOnCloseListener = 1;
            socket.on('close', funcion (had_error) {
                if (had_error) console.log('! failed: socket closing, and data is not transmitted.');
                agents.onSocketsLengthChange(agent);
            });
        }
    });
};

help = function () { // 上記と同じ };

agentId = [ requestOptions.host, (requestOptions.port || 80) ].join(':');
if (! agents[agentId]) {
    agents[agentId] = http.getAgent(requestOptions.host, requestOptions.port || 80);
    agents[agentId].waitRequests = [];
}

agents[agentId].waitRequests.push(help);
agents.onSocketsLengthChange(agents[agentId]);

どっちのほうがいいのかな? というか

みなさんはどうしてますか?


追記: 実際に動かしてみたものは「http.agent の agent.maxSockets の上限を避ける その2 — Gist」にあります。