Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"1.2.0"
"1.3.0"
1 change: 1 addition & 0 deletions aether-tests.asd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
(:file "cheap-heap")
(:file "event")
(:file "network")
(:file "courier")
(:file "process")
(:file "process-tree")
(:file "cast")
Expand Down
56 changes: 25 additions & 31 deletions src/courier.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -182,53 +182,47 @@
&key
(timeout 0)
(catch-RTS? t)
(peruse-inbox? t)
&allow-other-keys)
&body clauses)
"Peruses the mailbox at `ADDRESS' for a `MESSAGE' which matches one of the provided `CLAUSES'. Each clause has the form (MESSAGE-TYPE &BODY BODY). Clauses are processed according to the following Erlang-ian rules:

+ Each clause is processed in the order supplied.
+ If a clause is matched, no further clauses are processed.
+ When `PERUSE-INBOX?' is T, each clause (processed in order) searches the whole inbox(in latest-to-most-recent order) for a `MESSAGE-TYPE' match. When NIL, each clause just looks at the first message in the inbox for a `MESSAGE-TYPE' match.
+ If a waiting message of the appropriate type is found, it is bound to `MESSAGE' and `BODY' is processed.
+ Each message is processed in the order received.
+ For each message, each clause is processed in the order supplied.
+ If a clause is matched, no further clauses or messages are processed. In this case, the matching message is bound to `MESSAGE' and that clause's `BODY' is executed.

NOTES:

When `CATCH-RTS?' is T, we append a `MESSAGE-RTS' clause that throws an error.

Permits a clause with head `OTHERWISE' which is executed when no such waiting message is found.

Returns as a secondary value whether a message was processed. (An `OTHERWISE' clause also results in a secondary value of NIL.)"
Returns as a secondary value whether a message was processed. (An `OTHERWISE' clause also results in a secondary value of NIL.)

Defines an implicit NIL block from which the user can RETURN."
(assert (zerop timeout) () "Blocking RECEIVE-MESSAGE not currently supported. Consider SYNC-RECEIVE for PROCESS instances.")
(when catch-RTS?
(setf clauses (append clauses `((message-RTS (error "Got an RTS."))))))
(assert (zerop timeout) () "Blocking RECEIVE-MESSAGE not currently supported.")
(a:with-gensyms (block-name inbox found? q-deq-fn)
(flet ((process-clause (clause-head clause-body)
`(a:when-let ((,message
(funcall ,q-deq-fn ,inbox
(lambda (m) (typep m ',clause-head)))))
(return-from ,block-name
(values (progn ,@clause-body)
t)))))
`(block ,block-name
(a:with-gensyms (block-name inbox x)
(a:once-only (address)
`(let ((,inbox
(gethash (address-channel ,address)
(courier-inboxes *local-courier*))))
(policy-cond:policy-cond
((= 3 safety)
((= 3 safety)
(check-key-secret ,address))
((> 3 safety)
((> 3 safety)
nil))
(let ((,q-deq-fn (if ,peruse-inbox? #'q-deq-first #'q-deq-when)))
(multiple-value-bind (,inbox ,found?)
(gethash (address-channel ,address)
(courier-inboxes *local-courier*))
(declare (ignorable ,inbox))
(assert ,found? ()
"Address ~a not registered to this courier."
(address-channel ,address))
,@(loop :for (clause-head . clause-body) :in clauses
:unless (eql 'otherwise clause-head)
:collect (process-clause clause-head clause-body))
(values (progn ,@(cdr (find 'otherwise clauses :key #'car)))
nil)))))))
(block ,block-name
(doq (,message ,inbox)
(typecase ,message
,@(loop :for (clause-head . clause-body) :in clauses
:unless (eql 'otherwise clause-head)
:collect `(,clause-head
(q-deq-first ,inbox (lambda (,x) (typep ,x ',clause-head)))
(return-from ,block-name
(values (progn ,@clause-body) t))))))
(values (progn ,@(cdr (find 'otherwise clauses :key #'car)))
nil))))))

;;;
;;; event producers for message passing infrastructure
Expand Down
2 changes: 2 additions & 0 deletions src/process/dpu-helpers.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ NOTE: `MESSAGE-RTS' replies must be explicitly handled. Otherwise, the default
:next-command (caar (process-command-stack ,process-name))
:sync-channel ,sync-channel)
(multiple-value-bind (,retval ,sr-done?)
;; NOTE: this is implemented inefficiently. we could maintain an end-of-queue
;; pointer across successive unsuccessful receive attempts.
(receive-message (,sync-channel ,sync-message-place)
,@(loop :for (clause-head . clause-body) :in sync-clauses
:collect `(,clause-head
Expand Down
80 changes: 38 additions & 42 deletions src/process/process.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ IMPORTANT NOTE: Use #'SPAWN-PROCESS to generate a new PROCESS object."))
(error "Undefined command ~a for server of type ~a."
command (type-of server))))

(defgeneric %message-dispatch (node)
(defgeneric %message-dispatch (node message)
(:documentation "Use DEFINE-MESSAGE-DISPATCH to install methods here."))

(defun finish-handler ()
Expand Down Expand Up @@ -154,65 +154,61 @@ There is one exception: (CALL-NEXT-METHOD) is also a legal clause, and it refere
NOTES:
+ If no clause is matched, execution proceeds to the semantics specified by `DEFINE-PROCESS-UPKEEP'.
+ Automatically appends a `MESSAGE-RTS' clause which calls `HANDLE-MESSAGE-RTS' and results in an error. Because of this, we set `CATCH-RTS?' to NIL when processing clauses and building `RECEIVE-MESSAGE' blocks. Otherwise, it would be impossible to override the default handling of `MESSAGE-RTS'es. Additionally, this extra handler is _not_ inherited through (CALL-NEXT-METHOD).
+ `PROCESS-PERUSE-INBOX?' is passed along to `RECEIVE-MESSAGE', where it determines how we search for a message to handle.

WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you will probably stall the underlying `PROCESS' if you perform some waiting action here, like the analogue of a `SYNC-RECEIVE'."
(a:with-gensyms (node message results trapped?)
`(defmethod %message-dispatch ((,node ,node-type))
WARNING: These actions are to be thought of as \"interrupts\". Accordingly, you will probably stall the underlying `PROCESS' if you perform some waiting action here, like the analogue of a `SYNC-RECEIVE'. See DEFINE-MESSAGE-SUBORDINATE for a workaround."
(a:with-gensyms (node message)
`(defmethod %message-dispatch ((,node ,node-type) ,message)
,@(mapcar
(lambda (clause)
(cond
((and (listp clause)
(= 1 (length clause))
(symbolp (first clause))
(string= "CALL-NEXT-METHOD" (symbol-name (first clause))))
`(multiple-value-bind (,results ,trapped?) (call-next-method)
(when ,trapped?
(return-from %message-dispatch (values ,results ,trapped?)))))
`(when (call-next-method)
(return-from %message-dispatch t)))
((and (listp clause)
(member (length clause) '(2 3)))
(destructuring-bind (message-type receiver . rest) clause
`(when (let ((,node-type ,node))
(declare (ignorable ,node-type))
,(or (first rest) t))
(receive-message ((process-key ,node) ,message
:catch-RTS? nil
:peruse-inbox? (process-peruse-inbox?
,node))
(,message-type
(when (process-debug? ,node)
(log-entry :time (now)
:entry-type ':handler-invoked
:source ,node
:message-id (message-message-id ,message)
:payload-type ',message-type
:log-level 0))
(return-from %message-dispatch
(values
(funcall ,receiver ,node ,message)
t)))))))
`(when (and
(typep ,message ',message-type)
(let ((,node-type ,node))
(declare (ignorable ,node-type))
,(or (first rest) t)))
(when (process-debug? ,node)
(log-entry :time (now)
:entry-type ':handler-invoked
:source ,node
:message-id (message-message-id ,message)
:payload-type ',message-type
:log-level 0))
(funcall ,receiver ,node ,message)
(return-from %message-dispatch t))))
(t
(error "Bad DEFINE-MESSAGE-DISPATCH clause: ~a" clause))))
clauses))))

(defun message-dispatch (node)
"Use DEFINE-MESSAGE-DISPATCH to install methods here."
(multiple-value-bind (results trapped?) (%message-dispatch node)
(declare (ignore results))
(when trapped?
(return-from message-dispatch t)))
(receive-message ((process-key node) message
:catch-RTS? nil ; we do this ourselves so we can log-entry
:peruse-inbox? (process-peruse-inbox? node))
(message-RTS
(when (process-debug? node)
(log-entry :time (now)
"Use DEFINE-MESSAGE-DISPATCH to install methods here."
(let* ((address (process-public-address node))
(inbox (gethash (address-channel address)
(courier-inboxes *local-courier*))))
(policy-cond:policy-cond
((= 3 safety) (check-key-secret address))
((> 3 safety) nil))
(doq (message inbox)
(when (%message-dispatch node message)
(q-deq-first inbox (lambda (x) (eq x message)))
(return-from message-dispatch t))
(when (and (process-debug? node)
(typep message 'message-RTS))
(log-entry :time (now)
:entry-type ':handler-invoked
:source node
:message-id (message-message-id message)
:payload-type (type-of message)))
(return-from message-dispatch
(values (funcall 'handle-message-RTS node message) t)))))
:payload-type (type-of message))
(handle-message-RTS node message)
(return-from message-dispatch t)))))

;; TODO: DEFINE-DPU-MACRO and DEFINE-DPU-FLET don't check syntactic sanity at
;; their runtime, they wait for DEFINE-PROCESS-UPKEEP to discover it.
Expand Down Expand Up @@ -302,7 +298,7 @@ NOTE: LOG-ENTRY is treated separately."

PROCESS is COMMAND is a KEYWORD, and COMMAND-ARGS is a DESTRUCTURING-BIND-LAMBDA-LIST.

Locally enables the use of the function PROCESS-DIE and the special form SYNC-RECEIVE."
Locally enables the use of the various functions and macro forms defined in dpu-helpers.lisp ."
(check-type command symbol)
(multiple-value-bind (body decls docstring) (a:parse-body body :documentation t)
(a:with-gensyms (command-place argument-list active?)
Expand Down
12 changes: 12 additions & 0 deletions src/queue.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,15 @@
(defun q-push (q el)
(push el (cdar q))
q)

(defmacro doq ((var q &optional return-form) &body body)
Comment thread
ecpeterson marked this conversation as resolved.
"Like DOLIST, but for queues."
(a:with-gensyms (sigil rest)
(a:once-only ((q q))
`(block nil
Comment thread
ecpeterson marked this conversation as resolved.
(let ((,sigil (cdr ,q)))
(loop :for ,var :in (cdar ,q)
:for ,rest :on (cdar ,q)
:when (eq ,rest ,sigil)
:return ,return-form
:do ,@body))))))
32 changes: 32 additions & 0 deletions tests/courier.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
;;;; tests/courier.lisp
;;;;
;;;; Tests of courier.lisp functionality.

(in-package #:aether-tests)

(defstruct (message-fast (:include message))
"Message type which gets processed quickly.")

(defstruct (message-slow (:include message))
"Message type which gets processed slowly.")

(deftest test-message-processing-ordering ()
"Test that clause ordering does not affect message queue traversal order."
(flet ((pull-message (address)
(let (first-message)
(receive-message (address message)
;; process fast clause before slow clause
(message-fast
(setf first-message message))
(message-slow
(setf first-message message)))
first-message)))
(let* ((*local-courier* (make-courier))
(address (register)))
(with-simulation (simulation (*local-courier*))
;; insert slow message before fast message
(send-message address (make-message-slow))
(send-message address (make-message-fast))
;; check that *first* message is processed before *fast* message
(is (typep (pull-message address) 'message-slow))
(is (typep (pull-message address) 'message-fast))))))
5 changes: 3 additions & 2 deletions tests/examples/data-frame.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ This assumes that the return-value has already been set up, contributes the fact
(alexandria:maxf max-command-depth (length (process-command-stack server)))
(receive-message (listen-address done-message)
(message-rpc-done
(return (values (message-rpc-done-result done-message)
max-data-depth max-command-depth))))))))
(return-from factorial-await
(values (message-rpc-done-result done-message)
max-data-depth max-command-depth))))))))
;; try non-tco factorial
(let ((old-clock clock))
(multiple-value-bind (result max-data-depth max-command-depth)
Expand Down