Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/release-notes/.FSharp.Core/8.0.300.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

* Minor tweaks to inline specifications to support Visibility PR ([PR #15484](https://github.com/dotnet/fsharp/pull/15484), [#PR 16427](https://github.com/dotnet/fsharp/pull/15484)
* Optimize equality in generic contexts. ([PR #16615](https://github.com/dotnet/fsharp/pull/16615))
* Add ctor for MailboxProcessor with a flag denotes will be thrown exception when `Post` is called after MailboxProcessor" disposed. ([PR #13036](https://github.com/dotnet/fsharp/pull/13036))

### Fixed

* Preserve original stack traces in resumable state machines generated code if available. ([PR #16568](https://github.com/dotnet/fsharp/pull/16568))
* Fix receiving and processing mailbox after Dispose. ([PR #13036](https://github.com/dotnet/fsharp/pull/13036))
45 changes: 39 additions & 6 deletions src/FSharp.Core/mailbox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ module AsyncHelpers =

[<Sealed>]
[<AutoSerializable(false)>]
type Mailbox<'Msg>(cancellationSupported: bool) =
type Mailbox<'Msg>(cancellationSupported: bool, isThrowExceptionAfterDisposed: bool) =
let mutable isDisposed = false
let mutable inboxStore = null
let arrivals = Queue<'Msg>()
let syncRoot = arrivals
Expand Down Expand Up @@ -174,9 +175,12 @@ type Mailbox<'Msg>(cancellationSupported: bool) =

member x.Post msg =
lock syncRoot (fun () ->

// Add the message to the arrivals queue
arrivals.Enqueue msg
if isDisposed then
if isThrowExceptionAfterDisposed then
raise (ObjectDisposedException(nameof Mailbox))
else
// Add the message to the arrivals queue
arrivals.Enqueue msg

// Cooperatively unblock any waiting reader. If there is no waiting
// reader we just leave the message in the incoming queue
Expand Down Expand Up @@ -331,6 +335,13 @@ type Mailbox<'Msg>(cancellationSupported: bool) =

interface System.IDisposable with
member _.Dispose() =
lock syncRoot (fun () ->
if isNotNull inboxStore then
inboxStore.Clear()

arrivals.Clear()
isDisposed <- true)

if isNotNull pulse then
(pulse :> IDisposable).Dispose()

Expand All @@ -347,15 +358,23 @@ type AsyncReplyChannel<'Reply>(replyf: 'Reply -> unit) =
[<Sealed>]
[<AutoSerializable(false)>]
[<CompiledName("FSharpMailboxProcessor`1")>]
type MailboxProcessor<'Msg>(body, ?cancellationToken) =
type MailboxProcessor<'Msg>(body, isThrowExceptionAfterDisposed, ?cancellationToken) =

let cancellationSupported = cancellationToken.IsSome
let cancellationToken = defaultArg cancellationToken Async.DefaultCancellationToken
let mailbox = new Mailbox<'Msg>(cancellationSupported)

let mailbox =
new Mailbox<'Msg>(cancellationSupported, isThrowExceptionAfterDisposed)

let mutable defaultTimeout = Threading.Timeout.Infinite
let mutable started = false
let errorEvent = new Event<Exception>()

new(body, ?cancellationToken: CancellationToken) =
match cancellationToken with
| None -> new MailboxProcessor<'Msg>(body, false)
| Some ct -> new MailboxProcessor<'Msg>(body, false, ct)

member _.CurrentQueueLength = mailbox.CurrentQueueLength // nb. unprotected access gives an approximation of the queue length

member _.DefaultTimeout
Expand Down Expand Up @@ -506,9 +525,23 @@ type MailboxProcessor<'Msg>(body, ?cancellationToken) =
mailboxProcessor.Start()
mailboxProcessor

static member Start(body, isThrowExceptionAfterDisposed, ?cancellationToken) =
let mailboxProcessor =
new MailboxProcessor<'Msg>(body, isThrowExceptionAfterDisposed, ?cancellationToken = cancellationToken)

mailboxProcessor.Start()
mailboxProcessor

static member StartImmediate(body, ?cancellationToken) =
let mailboxProcessor =
new MailboxProcessor<'Msg>(body, ?cancellationToken = cancellationToken)

mailboxProcessor.StartImmediate()
mailboxProcessor

static member StartImmediate(body, isThrowExceptionAfterDisposed, ?cancellationToken) =
let mailboxProcessor =
new MailboxProcessor<'Msg>(body, isThrowExceptionAfterDisposed, ?cancellationToken = cancellationToken)

mailboxProcessor.StartImmediate()
mailboxProcessor
61 changes: 61 additions & 0 deletions src/FSharp.Core/mailbox.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,27 @@ type MailboxProcessor<'Msg> =
/// <example-tbd></example-tbd>
new: body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>

/// <summary>Creates an agent. The <c>body</c> function is used to generate the asynchronous
/// computation executed by the agent. This function is not executed until
/// <c>Start</c> is called.</summary>
///
/// <param name="body">The function to produce an asynchronous computation that will be executed
/// as the read loop for the MailboxProcessor when Start is called.</param>
/// <param name="isThrowExceptionAfterDisposed">A flag denotes will be thrown exception
/// when <see cref="F:Microsoft.FSharp.Control.MailboxProcessor.Post"/> is called
/// after <see cref="F:Microsoft.FSharp.Control.MailboxProcessor"/> disposed.</param>
/// <param name="cancellationToken">An optional cancellation token for the <c>body</c>.
/// Defaults to <c>Async.DefaultCancellationToken</c>.</param>
///
/// <returns>The created MailboxProcessor.</returns>
///
/// <example-tbd></example-tbd>
new:
body: (MailboxProcessor<'Msg> -> Async<unit>) *
isThrowExceptionAfterDisposed: bool *
?cancellationToken: CancellationToken ->
MailboxProcessor<'Msg>

/// <summary>Creates and starts an agent. The <c>body</c> function is used to generate the asynchronous
/// computation executed by the agent.</summary>
///
Expand All @@ -57,6 +78,26 @@ type MailboxProcessor<'Msg> =
static member Start:
body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>

/// <summary>Creates and starts an agent. The <c>body</c> function is used to generate the asynchronous
/// computation executed by the agent.</summary>
///
/// <param name="body">The function to produce an asynchronous computation that will be executed
/// as the read loop for the MailboxProcessor when Start is called.</param>
/// <param name="isThrowExceptionAfterDisposed">A flag denotes will be thrown exception
/// when <see cref="F:Microsoft.FSharp.Control.MailboxProcessor.Post"/> is called
/// after <see cref="F:Microsoft.FSharp.Control.MailboxProcessor"/> disposed.</param>
/// <param name="cancellationToken">An optional cancellation token for the <c>body</c>.
/// Defaults to <c>Async.DefaultCancellationToken</c>.</param>
///
/// <returns>The created MailboxProcessor.</returns>
///
/// <example-tbd></example-tbd>
static member Start:
body: (MailboxProcessor<'Msg> -> Async<unit>) *
isThrowExceptionAfterDisposed: bool *
?cancellationToken: CancellationToken ->
MailboxProcessor<'Msg>

/// <summary>Creates and starts an agent immediately on the current operating system thread. The <c>body</c>
/// function is used to generate the asynchronous computation executed by the agent.</summary>
///
Expand All @@ -71,6 +112,26 @@ type MailboxProcessor<'Msg> =
static member StartImmediate:
body: (MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken: CancellationToken -> MailboxProcessor<'Msg>

/// <summary>Creates and starts an agent immediately on the current operating system thread. The <c>body</c>
/// function is used to generate the asynchronous computation executed by the agent.</summary>
///
/// <param name="body">The function to produce an asynchronous computation that will be executed
/// as the read loop for the MailboxProcessor when StartImmediately is called.</param>
/// <param name="isThrowExceptionAfterDisposed">A flag denotes will be thrown exception
/// when <see cref="F:Microsoft.FSharp.Control.MailboxProcessor.Post"/> is called
/// after <see cref="F:Microsoft.FSharp.Control.MailboxProcessor"/> disposed.</param>
/// <param name="cancellationToken">An optional cancellation token for the <c>body</c>.
/// Defaults to <c>Async.DefaultCancellationToken</c>.</param>
///
/// <returns>The created MailboxProcessor.</returns>
///
/// <example-tbd></example-tbd>
static member StartImmediate:
body: (MailboxProcessor<'Msg> -> Async<unit>) *
isThrowExceptionAfterDisposed: bool *
?cancellationToken: CancellationToken ->
MailboxProcessor<'Msg>

/// <summary>Posts a message to the message queue of the MailboxProcessor, asynchronously.</summary>
///
/// <param name="message">The message to post.</param>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
namespace FSharp.Core.UnitTests.Control

open System
open Xunit
open System.Threading
open System.Threading.Tasks
open Xunit

type Message =
| Increment of int
Expand Down Expand Up @@ -311,6 +312,93 @@ type MailboxProcessorType() =

finishedEv.Reset() |> ignore

[<Fact>]
member this.``After dispose is called, mailbox should stop receiving and processing messages``() = task {
let mutable isSkip = false
let mutable actualSkipMessagesCount = 0
let mutable actualMessagesCount = 0
let sleepDueTime = 100
let expectedMessagesCount = 2
use mre = new ManualResetEventSlim(false)
let mb =
MailboxProcessor.Start(fun b ->
let rec loop() =
async {
match! b.Receive() with
| Increment _ ->
if isSkip then
actualSkipMessagesCount <- actualSkipMessagesCount + 1
return! loop()
else
do! Async.Sleep sleepDueTime
if not isSkip then
actualMessagesCount <- actualMessagesCount + 1
if actualMessagesCount = expectedMessagesCount then mre.Set()
do! Async.Sleep sleepDueTime
return! loop()
| _ -> ()
}
loop()
)
let post() = Increment 1 |> mb.Post

[1..4] |> Seq.iter (fun x -> post())
do! task {
mre.Wait()
isSkip <- true
(mb :> IDisposable).Dispose()
post()
}

Assert.Equal(expectedMessagesCount, actualMessagesCount)
Assert.Equal(0, actualSkipMessagesCount)
Assert.Equal(0, mb.CurrentQueueLength)
}

[<Fact>]
member this.``After dispose is called, mailbox should stop receiving and processing messages with exception``() = task {
let mutable isSkip = false
let mutable actualSkipMessagesCount = 0
let mutable actualMessagesCount = 0
let sleepDueTime = 100
let expectedMessagesCount = 2
use mre = new ManualResetEventSlim(false)
let mb =
MailboxProcessor.Start((fun b ->
let rec loop() =
async {
match! b.Receive() with
| Increment _ ->
if isSkip then
actualSkipMessagesCount <- actualSkipMessagesCount + 1
return! loop()
else
do! Async.Sleep sleepDueTime
if not isSkip then
actualMessagesCount <- actualMessagesCount + 1
if actualMessagesCount = expectedMessagesCount then mre.Set()
do! Async.Sleep sleepDueTime
return! loop()
| _ -> ()
}
loop()),
true
)
let post() = Increment 1 |> mb.Post

[1..4] |> Seq.iter (fun x -> post())
do! task {
mre.Wait()
isSkip <- true
(mb :> IDisposable).Dispose()
Assert.Throws<ObjectDisposedException>(fun _ -> post()) |> ignore
}

Assert.Equal(expectedMessagesCount, actualMessagesCount)
Assert.Equal(0, actualSkipMessagesCount)
Assert.Equal(0, mb.CurrentQueueLength)
}

[<Fact>]
member this.Dispose() =

Expand Down