If you are implementing more than one service and you need to establish asynchronous communication in-between, or if you need bullet-proof asynchronous communication with 3rd party services the outbox pattern might be a handy tool. Let's look at how can we implement one in the beloved F#!
Introduction
Let’s consider a simple use-case to give you a full understanding of when the outbox pattern can be useful. If you know this and that about it, feel free to skip the introduction. If you are eager to see the full source code here’s the link: https://github.com/marcingolenia/fsharp-outbox.
I am not a fan of UML, but for learning or explanations it is just fine so let’s try to use the sequence diagram to explain the problem to you step by step. Basic flow;
- User places an order in the app.
- The order is created and saved in the database.
- An event is being published “order placed”.
- User is informed in the app “thank you for placing the order”.
- Event is being consumed, and invoice is prepared.
- Invoice is saved.
- Happy end.
Let’s say that invoicing is done by another service and another team. You agreed on the published language “order placed” to exchange the data. Next, let’s keep things simple, and let’s neglect the doubts you might have like “What about payments?” or “Why the invoice is not being sent?”. I need to make you understand the outbox pattern, not to build a perfect business use-case. So you might plan to build something like this:
And it should work in most cases, unless…
The problem
Unless the application will die just after saving the order in the database, or the message broker will not be reachable (network problems, any reason…):
You might try to do things in a different order. What if we publish the message first, then we save changes in the database? As you can guess it is now possible to emit the message but the order won’t be saved because the database can’t be reached:
Now we have to deal with an invoice for a not existing order. Of course, we can make our best to reduce the risk by introducing retries, but even with 100 retries eventually, you will meet the problem.
The solution
As you may guess (because of the title of the blog post) the solution here is to use the outbox pattern. You can also read a little bit about it on the microservices.io page [1] and related book [8]. If you do, please pay attention to the term ** High-level domain events**. What the hell is that? More important domain events? No - these are events that have to be distinguished from domain events as they directly cause side effects in different domains. I encourage you to name it differently. Kamil Grzybek in his article about the outbox pattern in C# [3] comes up with “domain notification”. I like it, let’s stick to this name.
The idea behind the outbox is to create outbox storage in the same database you mutate the state and use a transaction to achieve atomicity. In other words; save the new order and save the “order placed” domain notification, if one of these operations fails, don’t commit the transaction, so changes (if any) are rolled back. This mechanism is reflected in the pattern name: “Transactional Outbox”. Then another process picks up the pending notification and distributes it in the desired way. I have made one sequence diagram more that shows this:
Keep in mind one important thing - the outbox pattern helps to achieve “at least once delivery”. In corner cases, you may end up with message duplicates (when a notification message is published and outbox storage goes down before “marking” the notification as processed). If you need exactly-once delivery you might google a little bit for the inbox pattern (or wait for my post in the future 😉). But make sure you can’t go with Idemponent receiver - this guy will make your life simpler. You can read a general description here [4] and a more detailed one in the Enterprise Integration Patterns book [9].
Time to get serious and write some F# 💪
Outbox in F#'
We will need 2 functions to implement Outbox and a type to represent an outbox message.
1 type OutboxMessage =
2 { Id: int64
3 OccuredOn: DateTime
4 Payload: string
5 Type: string }
The Type: string
will help us deal with serialization/deserialization, the rest is obvious. Commit and execute functions:
1type Commit = (unit -> int64) -> (OutboxMessage list -> Async<unit>) -> 'a list -> Async<unit>
2type Execute = Async<seq<OutboxMessage>> -> (OutboxMessage -> Async<unit>) -> (obj -> Async<unit>) -> Async<unit>
- Commit:
(unit -> int64)
is a function that will generate identifiers for new outbox messages.(OutboxMessage list -> Async<unit>)
is a function that will store OutboxMessages list.'a list
is list of notification (thank you F# for type inference, I love you).Async<unit>
we don’t need nothing from the function as result - it is just a side effect.
- Execute:
Async<seq<OutboxMessage>>
is a sequence asynchronusly retrieved outbox messages.(OutboxMessage -> Async<unit>)
is a function that sets the message as processed.(obj -> Async<unit>)
is a function that publishes the message.Async<unit>
again - we don’t need nothing from the function as result - it is just a side effect.
Now forget about the types - the anatomy is behind us and let us write
1 let commit generateId save notifications =
2 ...
3 let execute read setProcessed publish =
much easier isn’t it? That is why I always argue (especially with my C# biased friends) that naming is more important that type annotation (some of them need to be convienced to use var). I have a good news to you - that was the hardest part. Let us complete the commit function:
1 let commit generateId save notifications =
2 let outboxMessages =
3 notifications |> List.map (fun notification ->
4 { Id = generateId()
5 OccuredOn = DateTime.UtcNow
6 Payload = JsonConvert.SerializeObject notification
7 Type = notification.GetType().FullName })
8 async { do! save outboxMessages }
Do I need to explain something here? I don’t think so but just ask in comments if something bothers you. Let’s look how execute can be implemented:
1 let execute read setProcessed publish =
2 async {
3 let! (messages: OutboxMessage seq) = read
4 let notificationAssembly = Assembly.GetAssembly(typeof<Marker>)
5 let processes = messages |> Seq.map(fun message ->
6 async {
7 do! publish (JsonConvert.DeserializeObject(message.Payload, notificationAssembly.GetType(message.Type)))
8 do! setProcessed message })
9 do! processes |> Async.Sequential
10 |> Async.Ignore
11 }
What is Marker? Marker is an empty interface to help us with assembly scanning and type solving. Just add this:
1type Marker = interface end
to the project with your domain notifications. Keep in mind that it can be anything - record, class, struct, whatever. Some developers like to force a convention for instance you must inherit from INotification
interface. I don’t like that - I see no benefit from it, but it’s just my opinion. You may also ask why I decided to process the messages sequentially. Well… if we cannot publish the messages for any reason for some time, the pending outbox messages will keep growing. Firing thousands of messages in Parallel can be a bad idea. If you are sure you will handle the massive load, change it to Parallel - no problem. On GitHub [11] I have introduced a ParallelizationThreshold
parameter so you can tune the behavior elegantly.
Full source code can be found on GitHub [11]. The transactional outbox pattern is ready. We only need 2 types (including empty interface) and 2 functions - as simple as that. The rest are just dependencies and hosting. Let’s look into that.
Persistence dependencies
I have decided to use Postgres. As you can see I have decoupled the persistence from the outbox, so feel free to replace it with anything you want. First, let’s create a schema for our outbox. It may look like this:
1create table outbox_messages (
2 id bigint constraint pk_outbox_messages primary key,
3 occured_on timestamp not null,
4 type varchar(255) not null,
5 payload json not null
6);
7
8create table outbox_messages_processed (
9 id bigint constraint pk_outbox_messages_processed primary key,
10 occured_on timestamp not null,
11 type varchar(255) not null,
12 payload json not null,
13 processed_on timestamp not null
14);
15
You can store processed and not processed messages in one table and nullable column processed_on
which will store processed date if already processed. I will stick to a very inspirational document [10] in which Hugh Darwen (who was working on the relation model since the beginning) puts forward alternative approaches by introducing specialized relations. Let’s do that. One nice benefit of this approach is that we gain some segregation of concerns - one table for holding things to process and one table that is an archive. We can apply tailor-made indexes, partitioning, etc.
With small dapper wrapper (you can find it on github) the functions might look as foolows:
1 let save createConnection outboxMessages =
2 let cmd = "INSERT INTO outbox_messages(id, occured_on, payload, type) VALUES (@Id, @OccuredOn, @Payload::jsonb, @Type)"
3 async {
4 use! connection = createConnection ()
5 do! connection |> sqlExecute cmd outboxMessages
6 }
7
8 let read createConnection =
9 let cmd = "SELECT id, occured_on as OccuredOn, payload, type FROM outbox_messages"
10 async {
11 use! connection = createConnection ()
12 return! connection |> sqlQuery<OutboxMessage> cmd
13 }
14
15 let moveToProcessed createConnection outboxMessage =
16 let cmd = "
17 WITH moved_rows AS (
18 DELETE FROM outbox_messages deleted
19 WHERE Id = @id
20 RETURNING deleted.*
21 )
22 INSERT INTO outbox_messages_processed(id, occured_on, payload, type, processed_on)
23 SELECT id, occured_on, payload, type, now() at time zone 'utc' FROM moved_rows;"
24 async {
25 use! connection = createConnection ()
26 do! connection |> sqlExecute cmd {| id = outboxMessage.Id |}
27 }
The two first functions are basic stuff. Let me comment on the last one. The RETURNING
clause will help us to maintain the sql statement atomicity. If the record won’t be able to be inserted, it won’t be deleted. It works the same way in MSSQL Server (but with OUTPUT
clause), regarding other RDBMS you have to check yourself.
Note that the functions signatures match the outbox save
, read
, setProcessed
functions signatures.
Publisher dependency
For publishing messages, I’ve decided to pull in a library that makes the underlying message broker just a matter of configuration, despite I decided to use RabbitMQ. I’ve picked up Rebus [6] just because I wanted to put my fingers on something new for me. In the past, I’ve also used MassTransit which is also great. To be honest, I like Rebus more now, the configuration is easier and it seems that (by a chance or not) fits better with F# compared to MassTransit (in the team we were not able to write consumers without implementing the IConsumer<>
interface). Let me present some helping-functions to deal with message publishing and subscription:
1namespace RebusMessaging
2
3open System
4open System.Threading.Tasks
5open Rebus.Activation
6open Rebus.Bus
7open Rebus.Logging
8open Rebus.Config
9
10module Messaging =
11 let private queueName = "mcode.fun"
12
13 let private toTask asyncA =
14 asyncA |> Async.StartAsTask :> Task
15
16 let publish (bus: IBus) message =
17 bus.Publish message |> Async.AwaitTask
18
19 let configure (endpoint: string)
20 (connectionName: string)
21 (activator: BuiltinHandlerActivator)
22 =
23 Configure.With(activator)
24 .Transport(fun transport -> transport.UseRabbitMq(endpoint, queueName)
25 .ClientConnectionName(connectionName) |> ignore)
26 .Logging(fun logConfig -> logConfig.Console(LogLevel.Info))
27 .Start()
28
29 let configureOneWay (endpoint: string)
30 (connectionName: string)
31 (activator: BuiltinHandlerActivator)
32 =
33 Configure.With(activator)
34 .Transport(fun transport -> transport.UseRabbitMqAsOneWayClient(endpoint)
35 .ClientConnectionName(connectionName) |> ignore)
36 .Logging(fun logConfig -> logConfig.Console(LogLevel.Info))
37 .Start()
38
39 let registerHandler
40 (handler: 'a -> Async<Unit>)
41 (activator: BuiltinHandlerActivator) =
42 activator.Handle<'a>(fun message -> handler message |> toTask)
43
44 let markerNeighbourTypes<'marker> =
45 (typeof<'marker>.DeclaringType).GetNestedTypes()
46 |> Array.filter(fun type_ -> type_.IsAbstract = false)
47
48 let turnSubscriptionsOn (types: Type[]) (bus: IBus) =
49 async {
50 types |> Array.iter(fun type_ -> bus.Subscribe type_ |> ignore)
51 }
Let’s talk about the code now.
toTask
function simply takes an asynchronous function, turns it to the hot task model, and converts it to task type. It is handy for registering event handlers (line 42).publish
andregisterHandler
are simple wrappers to make the rebus methods calls sexier in F# code.configure
andconfigureOneWay
deal with configuration. Here you can implement a simple if-statement to connect to different message brokers. I’ve focused on RabbitMq so there is no if. The ConfigureOneWay is almost identical but does not require a queue, as it is intended for publishing messages. We will create two separate connections - one for handling incoming messages, second for publishing. This is a good practice that positively influences performance [5]. Connection name is not required, but I decided to use it as well, rabbit management plugin shows the connection name if it is specified, so you can easily tell which is which.registerHandler
function is the next wrapper that will help us to register handlers. We won’t be using any DI Container, so let’s useBuiltinHandlerActivator
which is a nice Rebus thing that fits well into the world of partial application and function-based handlers (not classes that implementsIHandler
interface). This beats MassTransit regarding F# world - As far as I know, it doesn’t offer anything like that out-of-the-box.markerNeighbourTypes
andturnSubscriptionsOn
functions allow us easily to subscribe asynchronously to messages of the desired type. The first function can be changed to any other function that returns an array of types, you can also remove it completely and subscribe to explicit types if you wish. I decided to use reflection and establish a mini-convention, so I can forget about adding the next subscriptions as long as they are kept in the same module. Good stuff for small thins. You will see that I use these functions together - I simply passmarkerNeighbourTypes
toturnSubscriptionsOn
.
Nice stuff, but will it work?
Of course it will! It was working as soon as I ended writing the code. The test was green ;) Here it is:
1[<Fact>]
2let ``GIVEN pending outbox messages WHEN execute THEN messages are published to the broker and can be consumed from the broker`` () =
3 // Arrange
4 let expectedNotification1 = { Id = (generateId()); SomeText = "Whatever1"; Amount = 11.11M }
5 let expectedNotification2 = { Id = (generateId()); SomeText = "Whatever2"; Amount = 22.22M }
6 let (tcs1, tcs2) = (TaskCompletionSource<WhateverHappened>(), TaskCompletionSource<WhateverHappened>())
7 let handler (message: WhateverHappened) = async {
8 message.Id |> function
9 | _ when expectedNotification1.Id = message.Id -> tcs1.SetResult message
10 | _ when expectedNotification2.Id = message.Id -> tcs2.SetResult message
11 | _ -> failwith $"This shouldn't happened, %s{nameof WhateverHappened} with unexpected Id: %d{message.Id} was received."
12 }
13 use activator = new BuiltinHandlerActivator() |> Messaging.registerHandler handler
14 use bus = Messaging.configure "amqp://localhost" "two-way-connection-tests" activator
15 bus |> Messaging.turnSubscriptionsOn Messaging.markerNeighbourTypes<Marker> |> Async.RunSynchronously
16 Outbox.commit generateId (save DbConnection.create) [expectedNotification1; expectedNotification2] |> Async.RunSynchronously
17 // Act
18 Outbox.execute (read DbConnection.create)
19 (moveToProcessed DbConnection.create)
20 (Messaging.publish bus)
21 |> Async.RunSynchronously
22 // Assert
23 let actualNotification1 = tcs1.Task |> Async.AwaitTask |> Async.RunSynchronously
24 let actualNotification2 = tcs2.Task |> Async.AwaitTask |> Async.RunSynchronously
25 actualNotification1 |> should equal expectedNotification1
26 actualNotification2 |> should equal expectedNotification2
Lead me to guide you through the test (but I hope that you already know everything! I put quite an effort to make all of my tests easy to understand).
- First I create 2 domain notifications of a
WhateverHappened
type. - Then I prepare two
TaskCompletionSource
, which will allow me to elegantly wait for the messages delivered from the bus (noThread.Sleep
orTask.Delay
crap). handler
is my subscription handler that will resolve theTaskCompletionSource
once it will get the message from the bus.- Then I create Rebus
BuiltinHandlerActivator
which will call my handler, and start the bus. - Time to commit the domain notifications to the outbox - on line 29. Normally you would partially apply the generateId and save functions and just pass the domain events to the function call in the application layer (aka imperative host).
- Now we trigger the outbox action and pass the dependencies. Normally this would be done by another process (background job).
- In the assert part we synchronously get the notifications from
TaskCompleationSource
. - And check if what we give (publish) is what we get (received from subscription).
Keep in mind that the test requires the Postgres database and RabbitMq to be up and running. In the repo there is a docker-compose file, so you can set this up in one-line command. When you run the test you should see some side-effects using rabbit management plugin like follows:
Of course, the test should be green.
Hosting: Polling publisher
So… let’s do the boring stuff in Giraffe (because almost everything is running on the web nowadays) and job scheduler - Quartz.NET (In the past I’ve used Hangfire - but in C# - for this and it worked very well so you may try this instead). The job scheduler and Outbox.execute
function combination is also a pattern and it is named “Polling publisher” [2][8]. This will be a simple app that;
- Will expose an endpoint that you can call to commit a domain notification.
- Will subscribe to the
WhateverHappened
notification. - Will handle the
WhateverHappened
by printing to console. - Will publish the committed notifications using Quartz.Net Job and .net hosted services.
- We will create two connections - one for publishing and one for subscribing so you will have an overview of how to create an “all-in-one” app, only consuming app or only publishing app.
All that with composition root, partial application, and F#.
Domain notification handler
This is the easiest part here. It looks just like this:
1module Handlers =
2
3 let printWhateverHappenedWithSmiley (notification: WhateverHappened) =
4 async {
5 printfn "%A" notification
6 printfn ":)"
7 }
Cool isn’t it? No IHandler<>
implementation :) I like that one.
Giraffe HttpHandler with endpoint for Outbox.Commit
No drama here, if you ever wrote something in giraffe and read my previous post about composition root and partial application you should be bored.
1module HttpHandlers =
2 let whateverHappened (commit: obj list -> Async<unit>)
3 generateId
4 : HttpHandler =
5 fun (next: HttpFunc) (ctx: HttpContext) ->
6 task {
7 let whateverHappened: WhateverHappened = {
8 Id = generateId()
9 SomeText = "Hi there!"
10 Amount = 100.20M
11 }
12 do! commit [whateverHappened]
13 return! text $"Thank you. %A{whateverHappened} was scheduled" next ctx
14 }
15
16 let handlers (root: CompositionRoot.Dependencies) =
17 choose [
18 route "/whatever" >=> (whateverHappened root.OutboxCommit root.GenerateId)
19 route "/" >=> text "Hello :)" ]
The commit: obj list -> Async<unit>
is something we already have (Outbox.commit
), generateId is a simple function that returns int64 (you can see its implementation in the repo - simple stuff with IdGen library [7]). Normally you would call some workflow from the app layer, fetch some kind of aggregate from somewhere (database), apply some domain operations, and then save it with the commit function in the same TransactionScope
. I decided to make my life easier here - so I publish directly from the endpoint.
Glueing stuff together - CompositionRoot
Let’s gather the dependencies now and compose the dependency tree:
1namespace WebHost
2
3open Outbox
4open DapperFSharp
5open Rebus.Activation
6open Rebus.Bus
7open RebusMessaging
8
9module CompositionRoot =
10
11 type Dependencies = {
12 OutboxCommit: obj list -> Async<unit>
13 OutboxExecute: Async<unit>
14 MessageBus: IBus
15 GenerateId: unit -> int64
16 }
17
18 let compose =
19 let pubBus = Messaging.configureOneWay
20 "amqp://localhost"
21 "pubConnection"
22 (new BuiltinHandlerActivator())
23 let subBus = Messaging.configure
24 "amqp://localhost"
25 "subConnection"
26 (new BuiltinHandlerActivator() |> Messaging.registerHandler Handlers.printWhateverHappenedWithSmiley)
27 let dbConnection = "Host=localhost;User Id=postgres;Password=Secret!Passw0rd;Database=outbox;Port=5432"
28 {
29 OutboxCommit = Outbox.commit
30 IdGenerator.generateId
31 (PostgresPersistence.save (createSqlConnection dbConnection))
32 OutboxExecute = Outbox.execute
33 (PostgresPersistence.read (createSqlConnection dbConnection))
34 (PostgresPersistence.moveToProcessed (createSqlConnection dbConnection))
35 (Messaging.publish pubBus)
36 MessageBus = subBus
37 GenerateId = IdGenerator.generateId
38 }
We pass the pubBus
to the Outbox.execute
- so we have one connection for publishing. We don’t need it elsewhere (you shouldn’t even in a big commercial app). The Outbox.commit
will be the single place, that will publish the messages. The subBus
will be needed, so we can subscribe to messages at the app startup. Note that we add handlers as well to the subBus
. The nice helper-function we wrote allows us to pipe next handlers elegantly if we need more.
Finally, we compose Outbox functions and the tiny-shiny GenerateId function. Keep in mind that it would be even better to split the composition root to 2-levels (so we can test stuff without publishing events).
Program and EntryPoint
We are close here to what Giraffe provides in its docs. The main differences are that we compose the composition root here, we subscribe to messages, and we add a hosted service - with quartz job and the polling publisher that uses the Outbox.execute
function.
1module App =
2 let configureApp (compositionRoot: CompositionRoot.Dependencies)
3 (app : IApplicationBuilder) =
4 app.UseGiraffe (HttpHandlers.handlers compositionRoot)
5
6 let configureServices (compositionRoot: CompositionRoot.Dependencies)
7 (services: IServiceCollection)
8 =
9 services
10 .AddGiraffe()
11 .AddHostedService(fun _ -> QuartzHosting.Service compositionRoot.OutboxExecute)
12 |> ignore
13
14 [<EntryPoint>]
15 let main _ =
16 let root = CompositionRoot.compose
17 Messaging.turnSubscriptionsOn
18 Messaging.markerNeighbourTypes<Marker>
19 root.MessageBus |> Async.RunSynchronously
20 Host.CreateDefaultBuilder()
21 .ConfigureWebHostDefaults(fun webHostBuilder ->
22 webHostBuilder
23 .Configure(configureApp root)
24 .ConfigureServices(configureServices root)
25 |> ignore)
26 .Build()
27 .Run()
28 0
QuartzHosting Service
Let me show you now how I have implemented Hosted Service with Quartz scheduler.
1namespace WebHost
2
3open System.Threading.Tasks
4open Microsoft.Extensions.Hosting
5open FSharp.Control.Tasks.V2.ContextInsensitive
6open Quartz
7open Quartz.Impl
8open Quartz.Spi
9
10module QuartzHosting =
11
12 type JobFactory(outboxExecute: Async<unit>) =
13 interface IJobFactory with
14 member _.NewJob(bundle, _) =
15 match bundle.JobDetail.JobType with
16 | _type when _type = typeof<PollingPublisher.Job> -> PollingPublisher.Job(outboxExecute) :> IJob
17 | _ -> failwith "Not supported Job"
18 member _.ReturnJob _ = ()
19
20 type Service(outboxExecute: Async<unit>) =
21 let mutable scheduler: IScheduler = null
22 interface IHostedService with
23
24 member _.StartAsync(cancellation) =
25 printfn $"Starting Quartz Hosting Service"
26 task {
27 let! schedulerConfig = StdSchedulerFactory().GetScheduler()
28 schedulerConfig.JobFactory <- JobFactory(outboxExecute)
29 let! _ = schedulerConfig.ScheduleJob(
30 PollingPublisher.job,
31 PollingPublisher.trigger,
32 cancellation)
33 do! schedulerConfig.Start(cancellation)
34 scheduler <- schedulerConfig
35 } :> Task
36
37 member _.StopAsync(cancellation) =
38 printfn $"Stopping Quartz Hosting Service"
39 scheduler.Shutdown(cancellation)
Let’s go through this step by step
- Quartz easily integrates with many DI Containers. We have none. You can use the built-in JobFactory, but it assumes that the Job has a default empty constructor. That is why I implemented the IJobFactory and rolled out my own. Consider moving JobFactory to its own file if you need more Quartz Jobs. The factory needs outboxExecute dependency, let’s pass it through the constructor. We will get it from the composition root (see configureServices in the 4.4 section).
- Service is an actual Hosted Service. We store scheduler reference which we Shutdown if the service stops. Since the StartAsync expects C# Tasks I decided to use task computation expression to avoid repetitive conversions between async models.
- In
StartAsync
we grab the Scheduler instance from the Factory and initialize it (its ThreadPool, JobStore, and DataSources). - Then we specify our JobFactory, and we schedule our actual job (I will show the implementation in the next section).
- Finally we start the scheduler, assign the reference and convert the resulting Task
to Task to make the interface method and compiler happy.
At this point, you should already have an idea of how to add more Quartz Jobs to the Quartz Hosted Services if you need them. Let’s see how a Quartz Job may look like in F#.
Polling Publisher Quarzt Job
To create the job we need 3 things.
- A trigger - Quartz comes with a handful set of methods which by using method chaining can lead you to configure different triggers. It also can accept a Cron expression if you prefer.
- A class that implements Quartz IJob interface and its Execute method. We only need to fire the
Outbox.execute
function here so we ended up with one line method body.[<DisallowConcurrentExecution>]
is very important for us. Quartz will make sure that only one Job at a time will take place (even in case of some delays etc). Thanks to this we avoid table locks - so the polling publish won’t end up on races. - A job again :) This is actual instruction that we have to pass to the scheduler, as it expects IJobDetail (not IJob). I am not sure why we can’t just pass IJob to the scheduler + give some details there. This could be improved in Quartz.Net.
The resulting code is short (Keep in mind that I have hardcoded some configuration, you may want to parametrize this using configuration files):
1namespace WebHost
2
3open System.Threading.Tasks
4open Quartz
5
6module PollingPublisher =
7 let trigger = TriggerBuilder
8 .Create()
9 .WithSimpleSchedule(fun scheduler ->
10 scheduler.WithIntervalInSeconds(5)
11 .RepeatForever() |> ignore)
12 .Build()
13
14 [<DisallowConcurrentExecution>]
15 type Job(outboxExecute: Async<unit>) =
16 interface IJob with
17 member _.Execute _ =
18 outboxExecute |> Async.StartAsTask :> Task
19
20 let job = JobBuilder
21 .Create<Job>()
22 .WithIdentity("PollingPublisher")
23 .Build();
And That’s it! We have all the pieces. That was a long way, let me remind you that there is a repository with the source code, tested outbox and CI set up [11].
Testing
I tried this implementation in different ways - I left it running for a couple of hours and hitting the endpoint from time to time, I filled the queue with pending messages and then I’ve connected with the app, I tried to publish several dozen messages at once - everything is working. When I stop the app the connections with channels are closing nicely. But! If you will find something please let me know - I will try to improve my solution or simply submit me a PR.
Summary
I hope we filled a gap in the F# world.
- I wasn’t able to find any implementation of transactional outbox pattern in F#.
- I was not able to find any example of Quartz.Net in F# and Giraffe.
- I was not able to find easy examples with Rebus and F# (I found the only example of Rebus with Saga in the console app).
But maybe I am a bad googler? If so, I am happy that we’ve added more results to the search engine ;) What is nice is that we glued all that stuff here! Go and spread the news to the world that F# is a kickass language and you can do such things easily!
What I am concerned about is that almost everything wants to integrate with DI Containers. This is nice for C# devs - especially in times of Microsoft.Extensions.DependencyInjection
packages, but for us - functional lovers this is a pain in the ass. Especially in Quartz, I was forced to implement some kind of JobFactory… that wasn’t hard but that was a strange experience… maybe I am just needlessly skeptical - all in all, I found the way to do it more “functionally” but still I had to play with classes and interfaces. What is funny - I think that implementing an interface in F# is more well-thought than in C#. You know - in C# you write “:” and you can’t skip “I” in the interface name because then you don’t know if you inherit or if you implement an interface when you read the code. This does not have a place in F#.
I like Rebus. Maybe MassTransit has more stars on GitHub (there is also Brighter now) but this lib is just great. It is easier to start with, plays nice with F#. The only drawback is that I do not understand why the hell we need an activator for a publish-only bus??? I suspect that not all types of transport support that kind of communication, thereby mookid8000 decided to keep it that way. All in all, you can pass just a new instance there and carry on. Remember - Rebus is cool and You should consider it.
References:
Websites:
[1] microservices.io: outbox pattern - Chris Richardson
[2] microservices.io: polling publisher - Chris Richardson
[3] Outbox pattern in C# - Kamil Grzybek
[4] Enterprise Integration Patterns: Idempotent Receiver - Gregor Hophe, Bobby Woolf
[5] RabbitMq best practices
[6] Rebus library
[7] IdGen library
Websites:
[8] Microservices Patterns - Chris Richardson
[9] Enterprise Integration Patterns - Gregor Hohpe, Bobby Woolf
[10] How To Handle Missing Information Without Using NULL - Hugh Darwen
Source Code:
[11] F# Outbox on GitHub