Proto Cache: Implementing Basic Pub Sub

Today’s Updates

In our last post we saw some of the features of the ace.core.defun and ace.core.thread libraries by creating a thread-safe cache of the Any protocol buffer object. Today we are going to update the proto-cache repository to implement publisher/subscriber features. This will allow a publisher to publish a feed of Any messages and a subscriber to subscribe to such a  feed. 

It is expected (but not required) that the reader has read the previous post Proto Cache: A Caching Story. That post details some of the functions and objects you will see in today’s code.

Note: This is a basic implementation, not one ready for production use. This will serve as our working project going forward.

Code Updates

Proto-cache.asd

We want subscribers to be able to get new versions of an Any protocol buffer message. On the web, the usual way to receive messages is over HTTP. We use the Drakma HTTP client. You can see we added :drakma to the depends-on list in the defsystem.

Proto-cache.lisp

There are three major regions to this code. The first region is the global objects that make up the cache. The second is the definition of a new class, pub-sub-details. Finally the actual publisher-subscriber functions are at the bottom of the page.

Global objects:

The global objects section looks much like it did in our previous post. We update the *cache* hash-table to use equal as its test function and we are going to make the keys to this cache be username strings.

Pub-sub-details class:

The global objects section looks much like it did in our previous post. We update the *cache* hash-table to use equal as its test function and we are going to make the keys to this cache be username strings.

The pub-sub-details class contains the data we need to keep track of the publisher and subscriber features:

  • subscriber-list: This will be a list of the HTTP endpoints to send the Any messages to after the Any message is updated. Currently, we only allow for an HTTP message string. Future implementations should allow for security functionality on those endpoints.
  • current-any: The current Any message that the publisher has supplied.
  • mutex: A fr-mutex to protect the current-any slot. This should be read-held to get the current-any and it should be read-held to set a new current-any message.
  • password: The password for the subscriber held as a string. 

We shouldn’t be saving the password being as a string in the pub-sub-details class. At a minimum we should be salting and hashing this value. In the future we should implement an account system for readers and subscribers giving access to reading and updating the pub-sub-details. As this is only instructional and not production-ready code, I feel okay leaving it as is for the moment.

We create a make-pub-sub-details function that will create a pub-sub-details object with a given password. The register function doesn’t allow the user to set an Any message at creation time, and none of the other slots are useful to the publisher.

We create an accessor method to set the any-message value slot. We also create an :after method to send the Any message to any listening subscribers by iterating through the subscriber list and calling a drakma:http-request. We wrap this in unwind-protect so an IO failure doesn’t stop other subscribers from getting the message.

Finally we add a setter function for the subscriber list.

Function definitions:

Register-publisher:

This function is the registration point for a new publisher. It is almost the same as set-in-cache from our previous post except it checks that an entry in the cache for the soon-to-be-registered publisher doesn’t already exist. It would be bad to let a new publisher overwrite an existing publisher.

Register-subscriber:

Here we use a new macro, ace.core.etc:clet from the etc package in ace.core.

(defun register-subscriber (publisher address)
  "Register a new subscriber to a publisher."
  (ace:clet ((ps-struct
               (act:with-frmutex-read (*cache-mutex*)
                 (gethash publisher *cache*)))
             (ps-mutex (mutex ps-struct)))
    (act:with-frmutex-write (ps-mutex)
      (push address (subscriber-list ps-struct)))))

In the code below we search the cache for a user entry, if the entry is found then ps-struct will be non-nil and we can evaluate the body adding the subscriber to the list. If the subscriber is not found we return nil.

Update-publisher-any:

(defun update-publisher-any (username password any)
  "Updates the google:any message for a publisher
   with a specified username and password.
   The actual subscriber calls happen in a separate thread
   but 'T is returned to the user to indicate the any
   was truly updated."
  (ace:clet ((ps-class
              (act:with-frmutex-read (*cache-mutex*)
                (gethash username *cache*)))
             (correct-password (string= (password ps-class)
                                        password)))
    (declare (ignore correct-password))
    (act:make-thread
     (lambda (ps-class)
       (setf (current-any ps-class) any))
     :arguments (list ps-class))
    t))

In the update-publisher-any code we use clet to verify that the publisher exists and the password is found. We ignore the correct-password entry though.

We don’t want the publisher to be thread-blocked while we send the new message to all of the subscribers so we update the current-any in a separate thread. To do this we use the ace.core.thread function make-thread. A keen reader will see for SBCL this calls the sbcl make-thread function, otherwise it calls bordeaux-threads make-thread function.

If we are able to find a publisher with the correct password we return T to show success.

Conclusion

In today’s post we have made a basic publisher-subscriber library that will send an Any protocol buffer message to a list of subscribers. We have detailed some new functions that we used in ace.core. We have also listed some of the problems with this library. The code has evolved substantially from the previous post but it still has a long way to go before being production-ready.

Thank you for your reading!


Ron Gut, Carl Gay, and Ben Kuehnert gave comments and edits to this post.

One thought on “Proto Cache: Implementing Basic Pub Sub

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s