2008/10/22

[Common Lisp] [*scratch*] Erlang のまねっこ

[*scratch*] と題して、適当なコードをはりつける試み(笑)

ハードと OS の進歩によって、そのうち普通のスレッドでも Erlang なみの並列処理ができるようになることを期待しつつ。

(in-package :quek)

(import 'sb-thread:*current-thread*)

(export '(spawn ! @ *exit* *current-thread*))

(defvar *processes* (make-hash-table :weakness :key))

(defvar *processes-mutex* (sb-thread:make-mutex))

(defvar *exit* (gensym "*exit*"))

(defclass process ()
((name :initarg :name :accessor name-of)
(mbox :initform nil :accessor mbox-of)
(waitqueue :initform (sb-thread:make-waitqueue) :accessor waitqueue-of)
(mutex :initform (sb-thread:make-mutex) :accessor mutex-of)
(childrent :initform nil :accessor children-of)))

(defgeneric kill-children (process)
(:method ((process process))
(loop for i in (children-of process)
do (! i *exit*))))

(defun get-process (&optional (thread sb-thread:*current-thread*))
(sb-thread:with-mutex (*processes-mutex*)
(sif (gethash thread *processes*)
it
(setf it (make-instance 'process
:name (sb-thread:thread-name thread))))))

(defun spawn% (function)
(let* ((thread (sb-thread:make-thread
function
:name (symbol-name (gensym "quek.pid"))))
(current-process (get-process sb-thread:*current-thread*)))
(push thread (children-of current-process))
thread))

(defmacro spawn (&body body)
`(spawn% (lambda ()
,@body)))

(defgeneric ! (reciever message))

(defmethod ! ((thread sb-thread:thread) message)
(! (get-process thread) message))

(defmethod ! ((process process) message)
(if (eq message *exit*)
(progn
(kill-children process)
(sb-thread:with-mutex (*processes-mutex*)
(maphash (_ (when (and (eq _v process)
(sb-thread:thread-alive-p _k))
(sb-thread:terminate-thread _k)))
*processes*)))
(sb-thread:with-mutex ((mutex-of process))
(setf (mbox-of process)
(append (mbox-of process) (list message)))
(sb-thread:condition-notify (waitqueue-of process)))))

(defun @ (&key timeout timeout-value)
(with-accessors ((waitqueue waitqueue-of)
(mutex mutex-of)
(mbox mbox-of)) (get-process)
(let (timeout-p)
(when timeout
(spawn (sleep timeout)
(sb-thread:with-mutex (mutex)
(setf timeout-p t)
(sb-thread:condition-notify waitqueue))))
(sb-thread:with-mutex (mutex)
(unless (or mbox timeout-p)
(sb-thread:condition-wait waitqueue mutex))
(if mbox
(pop mbox)
timeout-value)))))


#+test
(let ((thread (spawn (labels ((f (rev)
(case rev
('quit
(print "quit!"))
(t (print rev)
(force-output)
(f (@))))))
(f (@))))))
(! thread 'hello)
(sleep 1)
(! thread 'world)
(sleep 1)
(! thread 'quit))


#+test
(let ((th (spawn
(print "start...")
(print (@ :timeout 0.1 :timeout-value "タイムアウトした"))
(print "end...")
(force-output))))
(sleep 1)
(! th "おわり"))

;;(@ :timeout 0 :timeout-value "タイムアウトした")

0 件のコメント: