In reimplementing an EasyNetQ process manager one of the things I wanted to keep from the original project was an in memory message bus that could be used for testing without requiring a running RabbitMQ server. The code has ended up being pleasingly short and also uses a few techniques that seemed interesting, so I thought I'd document it here as part of the design process.
Please note we're not going for a full re-implementation of RabbitMQ in memory here, but this does give us enough to do some useful testing!
Author's note: since this post was written, this code was updated to be async. I've added the new version as appendix 2
What are we building?
In the main process manager library, I'm starting to hash out the underlying types which will drive the various abstractions in play. As a consumer of the library, you'll probably never have cause to use these types directly.
One of these is an interface class representing a message bus which routes on a combination of topic and .net type (i.e. how EasyNetQ routes by default). It looks like this:
Production code will wrap an instance of an EasyNetQ IBus here, but for testing we're going to build an in memory version.
Underlying concepts
What concepts are we going to have in play here? Well, there's going to be subscribers, who should have an action called when a relevant message is published. And we're going to want to be able to actually publish the messages.
It makes sense to model the message bus as an agent which can have commands sent to it (a MailboxProcessor in F# terms), so let's model the commands we want to be able to send first:
So, a subscriber knows what topic it is binding to (which might include wildcards, we'll get there in a moment), which type it is listening for, and an action to call when that type arrives. The agent will need to store a list of subscribers, so we wrap our generic Subscriber<'a> type in a non-generic interface (Subscriber).
The BusMessage type then reflects the three things that we might ask the agent to do: publish a message to current subscribers, add a subscriber, or shut down and reply when shutting down is complete.
Add the logic
We'll also need some logic for determining whether a topic published to match a topic which has been bound to by a subscriber. Topics in RabbitMQ are multipart strings with . separators - "one.two.three", and messages must be published to a specific topic. But when you bind a subscriber, you can bind with two types of wildcard. A * matches a "section" (so binding to "*.two" will receive messages published to "one.two" and "1.two"), while a # finishes a binding string and matches any number of sections (so binding to "one.#" will match "one.two", "one.2" and "one.two.three").
letprivatecompareSection(topicSection:string,bindingSection:string)=matchbindingSectionwith|"#"|"*"->true|_whenbindingSection=topicSection->true|_->falseletprivatetopicBindingMatchtopicOpt(binding:string)=matchtopicOptwith|Some(Topictopic)->lettopicSections=topic.Split'.'letbindingSections=binding.Split'.'ifbindingSections.[bindingSections.Length-1]="#"then// Seq.zip truncates the longer sequence of the two// provided - so here we ignore any sections beyond// the "#"Seq.ziptopicSectionsbindingSections|>Seq.forallcompareSectionelse// If there's no "#" at the end of the binding, there// can only be a match if there is exactly the same number// of sections; check that before zipping the sections// together to compareifbindingSections.Length=topicSections.LengththenSeq.ziptopicSectionsbindingSections|>Seq.forallcompareSectionelsefalse|None->// If there is no publish topic, the only binding which can match// is "#" as there are no sections to compare.binding="#"
Build the agent
We now have all of the logic our agent requires. Let's put into together into an Async recursive function listening for commands.
With the correct types to guide us, this function ends up almost trivial. If we receive a stop message, we reply to say we're stopped and then return unit, meaning we'll process no further messages.
If we receive a subscriber, we just add it to the list of subscribers and call back into the loop.
And finally, if there's a request to publish we check the message hasn't expired and then call of the subscribers that have the correct type and a matching binding (before calling back into the loop).
Wrap it all in the correct interface
Now we just need a type which implements the ProcessManagerBus interface and we're done. We want Dispose to stop the underlying agent, and the other methods are straight forward translations. The only real thing of note here is the line do agent.Error.Add raise. This is needed because by default exceptions thrown in MailboxProcessors kill the background thread the agent loop is running on, but are not propagated up to the overall process. That's not the behaviour we want here: if a subscriber throws, we want to know about the error.
And there you have it! An in memory message bus in 100 lines or less of F# code. For bonus points, here's a simple set of test cases for it so you can see what it looks like in action.
moduleEasyNetQ.ProcessManager.Tests.MemoryBusopenSystemopenEasyNetQ.ProcessManager.TypesopenEasyNetQ.ProcessManager.MemoryBusopenExpectotypeT1=T1ofstringtypeT2=T2ofstring[<Tests>]letmemoryBusTests=testList"memory bus tests"[testCase"Basic send/subscibe works"<|fun_->letreceivedMessage=refNoneletbus=newMemoryBus():>ProcessManagerBusletsubId=SubscriptionId"t1"bus.Subscribe<T1>subId(fun(T1m)->receivedMessage:=Somem)bus.Publish(T1"message")(TimeSpan.FromMinutes1.)bus.Dispose()Expect.equal(!receivedMessage)(Some"message")"Should match"testCase"Subscribe filters correctly by type"<|fun_->letreceivedMessage=refNoneletbus=newMemoryBus():>ProcessManagerBusletsubId=SubscriptionId"t1"bus.Subscribe<T2>subId(fun(T2m)->receivedMessage:=Somem)bus.Publish(T1"message")(TimeSpan.FromMinutes1.)bus.Dispose()Expect.equal(!receivedMessage)None"Should match"testCase"Can publish to topic"<|fun_->letreceivedMessage=refNoneletbus=newMemoryBus():>ProcessManagerBusletsubId=SubscriptionId"t1"bus.TopicSubscribe<T1>subId(Topic"one.two")(fun(T1m)->receivedMessage:=Somem)bus.TopicPublish(T1"message")(Topic"one.two")(TimeSpan.FromMinutes1.)bus.Dispose()Expect.equal(!receivedMessage)(Some"message")"Should match"testCase"Only receives from matching topic"<|fun_->letreceivedMessage=refNoneletbus=newMemoryBus():>ProcessManagerBusletsubId=SubscriptionId"t1"bus.TopicSubscribe<T1>subId(Topic"one.two")(fun(T1m)->receivedMessage:=Somem)bus.TopicPublish(T1"message")(Topic"two.one")(TimeSpan.FromMinutes1.)bus.Dispose()Expect.equal(!receivedMessage)None"Should match"testCase"Matching wildcard topic is matched"<|fun_->letreceivedMessage=refNoneletbus=newMemoryBus():>ProcessManagerBusletsubId=SubscriptionId"t1"bus.TopicSubscribe<T1>subId(Topic"*.two")(fun(T1m)->receivedMessage:=Somem)bus.TopicPublish(T1"message")(Topic"one.two")(TimeSpan.FromMinutes1.)bus.Dispose()Expect.equal(!receivedMessage)(Some"message")"Should match"]
Appendix 1
Just to round everything off, here's a listing of the complete implementation from beginning to end.