2007/05/05

Erlang で Comet

みかログさんでErlangでCometが書かれていますが、同様に Erlang で Comet です。こちらの方が随分長いソースになっていますが。

クライアント(ブラウザ、セッション)毎に受信プロセスを常駐させます。
送信されたメッセージは全ての受信プロセスに送信され、受信プロセスはクライアントが受信待ちの場合、クライアントへメッセージを返します。

受信プロセスにはタイムアウト管理プロセスがリンクしてあり、一定時間クライアントへの送信を行わない場合、受信プロセスは exit します。

クライアント1つにつき、受信プロセスとタイムアウト管理プロセス(あとおそらく、Yaws のリクエスト処理プロセス)が常駐します。プロセス大盤振る舞いです。



ファイルの構成は次のようになっています。


chat.erl

プログラム本体。Yaws の起動も行ないます。

www/index.yaws

チャットのページ

www/send.yaws

メッセージ送信用ページ

www/receive.yaws

メッセージ受信用ページ

www/js/prototype.js

Ajax 通信等で利用する prototype.js


そのうち、OTP 流に gen_server なんかを使うようにしましょう。そのうちに。

chat.erl

-module(chat).
-compile(export_all).

%%-define(LOG(X), error_logger:info_msg("~p = ~p~n", [??X, X])).
-define(LOG(X), ok).

-define(COOKIE_KEY, "chat").

-include_lib("yaws/include/yaws_api.hrl").

-record(receive_status, {pid, ref, waiting=false, messages=[], timeout_pid}).

%% Yaws と チャットサーバを開始します。
start() ->
LogDir = filename:absname("log"),
DocRoot = filename:absname("www"),
start(LogDir, DocRoot).

%% Yaws と チャットサーバを開始します。
start(LogDir, DocRoot) ->
pg2:create(receive_proc_group), % 受信プロセスグループを作成
start_yaws(LogDir, DocRoot). % Yaws 開始

%% Yaws を開始します。
start_yaws(LogDir, DocRoot) ->
SC = [{port, 9999}], % サーバ設定
GC = [{logdir, LogDir}], % グローバル設定
yaws:start_embedded(DocRoot, SC, GC). % 組み込みモードでスタート

%% 受信プロセス初期化
receive_init(S) ->
Pid = spawn_link(?MODULE, receive_timeout, []),
receive_loop(S#receive_status{timeout_pid=Pid}).

%% 受信プロセス
receive_loop(S) ->
receive
{ping, Ref, Pid} -> % 受信プロセスの生存確認
S2 = S,
Pid ! {pong, Ref};
{wait, Ref, Pid} -> % クライアントが受信待ち
?LOG({wait, Ref, Pid}),
S2 = S#receive_status{pid=Pid, ref=Ref, waiting=true };
{message, Message} -> % メッセージが来た
?LOG({message, Message}),
S2 = S#receive_status{messages=[Message|S#receive_status.messages]}
end,
?LOG(S2),
%% 受信待ち状態かつメッセージがあるときのみ送信します。
case S2 of
#receive_status{waiting=false} -> % 受信待ちではない
?MODULE:receive_loop(S2);
#receive_status{messages=[]} -> % メッセージがない
?MODULE:receive_loop(S2);
_ -> % 受信待ちでメッセージあり
S#receive_status.timeout_pid ! ok, % タイムアウトをリセット
S2#receive_status.pid ! {message, % メッセージ送信
S2#receive_status.ref,
S2#receive_status.messages},
?MODULE:receive_loop(
S2#receive_status{waiting=false, messages=[]})
end.

%% 受信プロセスのタイムアウト管理プロセス
receive_timeout() ->
receive
_ -> % タイムアウトをリセット
receive_timeout()
after
1000 * 60 * 10 -> % 10分で受信プロセスは終了
erlang:exit(timeout)
end.

%% send.yaws から呼ばれます。
send_message(A) ->
?LOG(send_message),
{ok, User} = yaws_api:queryvar(A, "user"),
{ok, Message} = yaws_api:queryvar(A, "message"),
M = timestamp() ++ "(" ++ User ++ ")" ++ Message,
Pids = pg2:get_members(receive_proc_group), % 受信待ちプロセスを取得
%%error_logger:info_msg("~p", [length(Pids)]),
lists:foreach(fun(P) -> P ! {message, M} end, Pids), % 1つずつ送信
%%error_logger:info_msg("finish!", []).
{html, "ok"}.

%% receive.yaws から呼ばれます。
receive_message(A) ->
?LOG(receive_message),
{Pid, Header} = get_receive_proc(A),
Ref = make_ref(),
Pid ! {wait, Ref, self()}, % 待ってるよ、と送信。
receive
{message, Ref, Messages} -> % メッセージ受信。
?LOG({message, Ref, Messages}),
receive_message_response(edit_response(Messages), Header)
after 10 * 60 * 1000 -> % 10分でタイムアウトします。
receive_message_response(
io_lib:format(
"({'st': 'ok', 'mes': '~s メッセージがありません。
'})",
[timestamp()]),
Header)
end.

%% レスポンスの編集を行います。
edit_response(Messages) ->
M = lists:foldl(fun(X, Acc) -> yaws_api:htmlize(X) ++ "
" ++ Acc end,
[], Messages),
f("(~s)", [json:encode({struct, [{st, "ok"}, {mes, M}]})]).

%% クッキーをセットしない場合のレスポンス
receive_message_response(Html, undefined) ->
{html,Html};
%% クッキーをセットする場合のレスポンス
receive_message_response(Html, Header) ->
[{html, Html}, Header].

%% 受信プロセスIDを取得します。
get_receive_proc(A) ->
H = A#arg.headers,
C = H#headers.cookie,
case yaws_api:find_cookie_val(?COOKIE_KEY, C) of
[] ->
create_new_receive_proc();
Cookie ->
case yaws_api:cookieval_to_opaque(Cookie) of
{ok, Pid} ->
case check_receive_proc(Pid) of
ok ->
{Pid, undefined};
_ ->
create_new_receive_proc()
end;
_ ->
create_new_receive_proc()
end
end.

%% 受信プロセスが生きていることを確認します。
check_receive_proc(Pid) ->
Ref = make_ref(),
Pid ! {ping, Ref, self()},
receive
{pong, Ref} ->
ok
after
1000 ->
error
end.

%% 受信プロセスを作成します。
create_new_receive_proc() ->
Pid = spawn(?MODULE, receive_init, [#receive_status{pid=self()}]),
pg2:join(receive_proc_group, Pid), % プロセスグループに参加
Cookie = yaws_api:new_cookie_session(Pid),
{Pid, yaws_api:setcookie(?COOKIE_KEY, Cookie, "/")}.


%% タイムスタンプも欲しいよね。
timestamp() ->
{{_Y, _M, _D}, {H, Mi, S}} = erlang:localtime(),
f("~b:~b:~b", [H, Mi, S]).

%% 文字列化の関数です。
f(Format, Args) ->
lists:flatten(io_lib:format(Format, Args)).


www/index.yaws

<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>チャット</title>
<style>
#screen {
border: 1px solid black;
height: 20em;
overflow: scroll;
}
</style>
<script src="js/prototype.js"></script>
<script>
function debug(x) {
$('debug').innerHTML = x;
}
function send() {
$('message').select();
var param = 'message=' + $F('message') + '&user=' + $F('user');
var ajax = new Ajax.Request(
"send.yaws",
{
method: 'get',
parameters: param
});
}
function receive_call_back(res) {
debug(res.responseText);
eval("var obj = " + res.responseText);
if (obj.st == 'ok') {
$('screen').innerHTML = obj.mes + $('screen').innerHTML;
receive();
} else {
alert("error: " + res.responseText);
}
}
function receive() {
var param = 'user=' + $F('user');
var ajax = new Ajax.Request(
"receive.yaws",
{
method: 'get',
parameters: param,
onComplete: receive_call_back
});
}
Event.observe(window, 'load', receive, false);
</script>
</head>
<body>
<h1>チャット</h1>
<form action="javascript:send()">
名前 <input type="text" id="user"><br>
メッセージ <input type="text" id="message" size="80">
<input type="submit" value="送信">
</form>
<div id="screen"></div>
<hr>
デバッグ
<div id="debug"></div>
</body>
</html>


www/send.yaws

<erl>
out(Arg) ->
chat:send_message(Arg).
</erl>


www/receive.yaws

<erl>
out(Arg) ->
chat:receive_message(Arg).
</erl>



受信待ちプロセスをたくさん作ったらどうなるか、と思ってやって(/usr/sbin/ab -t 1000 -n 1000 -c 1000 http://localhost:9999/receive.yaws)みたら emfile エラーが発生しました。

sudo vi limits.conf で次の2行を追加して、再起動しました(OS は Debian です)。

* soft nofile 50000
* hard nofile 50000

もう一度やってみると、全てのリクエストがコネクトして受信待ちになるまでにとても時間がかかりました。その間、CPU も DISK もそれほど使っているような感じではありませんでした。どこでひっかかっているのかしら。

それはともかく、プロセスを怒涛の羊のように作るプログラミングはなかなか楽しいです。

2 件のコメント:

Rui さんのコメント...

接続が遅い件、epollを使うようerlのオプションに+K trueをつけてみたらどうなるでしょうか。

Yoshinori Tahara さんのコメント...

+K true を付けてやってみました。特に変化はないようでした。ab で -n 10000 -c 10000 のときは最初の1000件は一瞬で完了するのですが、-n 1000 -c 1000 としたときはとても遅いです。ab の使い方がおかしいのか、多重度によって Erlang の方の振舞がかわるのか… rui さんアドバイスありがとうございました。