Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/FSharp.Core/mailbox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ module AsyncHelpers =
[<Sealed>]
[<AutoSerializable(false)>]
type Mailbox<'Msg>(cancellationSupported: bool) =
let mutable isDisposed = false
let mutable inboxStore = null
let arrivals = Queue<'Msg>()
let syncRoot = arrivals
Expand Down Expand Up @@ -174,6 +175,8 @@ type Mailbox<'Msg>(cancellationSupported: bool) =

member x.Post msg =
lock syncRoot (fun () ->
if isDisposed then
raise (ObjectDisposedException(nameof Mailbox))

// Add the message to the arrivals queue
arrivals.Enqueue msg
Expand Down Expand Up @@ -331,6 +334,13 @@ type Mailbox<'Msg>(cancellationSupported: bool) =

interface System.IDisposable with
member _.Dispose() =
lock syncRoot (fun () ->
if isNotNull inboxStore then
inboxStore.Clear()

arrivals.Clear()
isDisposed <- true)

if isNotNull pulse then
(pulse :> IDisposable).Dispose()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
namespace FSharp.Core.UnitTests.Control

open System
open Xunit
open System.Threading
open System.Threading.Tasks
open Xunit

type Message =
| Increment of int
Expand Down Expand Up @@ -311,6 +312,49 @@ type MailboxProcessorType() =

finishedEv.Reset() |> ignore

[<Fact>]
member this.``After dispose is called, mailbox should stop receiving and processing messages``() = task {
let mutable isSkip = false
let mutable actualSkipMessagesCount = 0
let mutable actualMessagesCount = 0
let sleepDueTime = 100
let expectedMessagesCount = 2
use mre = new ManualResetEventSlim(false)
let mb =
MailboxProcessor.Start(fun b ->
let rec loop() =
async {
match! b.Receive() with
| Increment _ ->
if isSkip then
actualSkipMessagesCount <- actualSkipMessagesCount + 1
return! loop()
else
do! Async.Sleep sleepDueTime
if not isSkip then
actualMessagesCount <- actualMessagesCount + 1
if actualMessagesCount = expectedMessagesCount then mre.Set()
do! Async.Sleep sleepDueTime
return! loop()
| _ -> ()
}
loop()
)
let post() = Increment 1 |> mb.Post

[1..4] |> Seq.iter (fun x -> post())
do! task {
mre.Wait()
isSkip <- true
(mb :> IDisposable).Dispose()
Assert.Throws<ObjectDisposedException>(fun _ -> post()) |> ignore
}

Assert.Equal(expectedMessagesCount, actualMessagesCount)
Assert.Equal(0, actualSkipMessagesCount)
Assert.Equal(0, mb.CurrentQueueLength)
}

[<Fact>]
member this.Dispose() =

Expand Down