Mavnn's blog

Stuff from my brain

We're running Building Solid Systems in F# in London, 31 Jan - 1 Feb 2018. High quality training for building .NET Core, F#, distributed production ready systems.

Throttling the Level of Concurrency in F#

Async.Parallel |> Async.RunSynchronously is great for running a load of stuff in parallel in F#, as long as you don't mind them all running at the same time.

Often, though, you want to map across a sequence and run functions on the elements in parallel, but with a limit to how many are being processed concurrently. Whether you're doing something CPU heavy and there's no point running more than the number of processors on the box, or whether you know that you'll swamp a remote server if you just dump all of your connections on it at once, this issue comes up surprisingly often.

As a first stab, you might be tempted to do something like this (if you think like I do):

1
2
3
4
5
6
7
8
9
10
11
let inline doParallelWithThrottle limit f items =
    use sem = new System.Threading.Semaphore(limit, limit)
    items
    |> Seq.map (fun element -> async {
            sem.WaitOne() |> ignore
            let result = Async.RunSynchronously <| async { return f element }
            sem.Release() |> ignore
            return result
        })
    |> Async.Parallel
    |> Async.RunSynchronously

In a word: don't. The contention in the Semaphore make this enormously inefficient with even a few hundred tasks.

In the end, the simplest implementation I could come up with that didn't involve dragging in external dependencies was the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
open System.Collections.Concurrent

type JobRequest<'T> =
    {
        Id : int
        WorkItem : 'T
    }

type WorkRequest<'T> =
    | Job of JobRequest<'T>
    | End

let inline doParallelWithThrottle<'a, 'b> limit f items =
    let itemArray = Seq.toArray items
    let itemCount = Array.length itemArray
    let resultMap = ConcurrentDictionary<int, 'b>()
    use block = new BlockingCollection<WorkRequest<'a>>(1)
    use completeBlock = new BlockingCollection<unit>(1)
    let monitor =
        MailboxProcessor.Start(fun inbox ->
            let rec inner complete =
                async {
                    do! inbox.Receive()
                    if complete + 1 = limit then
                        completeBlock.Add(())
                        return ()
                    else
                        return! inner <| complete + 1
                }
            inner 0)
    let createAgent () =
        MailboxProcessor.Start(
            fun inbox ->
                let rec inner () = async {
                        let! request = async { return block.Take() }
                        match request with
                        | Job job ->
                            let! result = async { return f (job.WorkItem) }
                            resultMap.AddOrUpdate(job.Id, result, fun _ _ -> result) |> ignore
                            return! inner ()
                        | End  ->
                            monitor.Post ()
                            return ()
                    }
                inner ()
        )
    let agents =
        [| for i in 1..limit -> createAgent() |]
    itemArray
    |> Array.mapi (fun i item -> Job { Id = i; WorkItem = item })
    |> Array.iter (block.Add)

    [1..limit]
    |> Seq.iter (fun x -> block.Add(End))

    completeBlock.Take()
    let results = Array.zeroCreate itemCount
    resultMap
    |> Seq.iter (fun kv -> results.[kv.Key] <- kv.Value)
    results

Comments