1717package mclock
1818
1919import (
20+ "container/heap"
2021 "sync"
2122 "time"
2223)
@@ -32,18 +33,24 @@ import (
3233// the timeout using a channel or semaphore.
3334type Simulated struct {
3435 now AbsTime
35- scheduled [] * simTimer
36+ scheduled simTimerHeap
3637 mu sync.RWMutex
3738 cond * sync.Cond
38- lastId uint64
3939}
4040
41- // simTimer implements Timer on the virtual clock.
41+ // simTimer implements ChanTimer on the virtual clock.
4242type simTimer struct {
43- do func ()
44- at AbsTime
45- id uint64
46- s * Simulated
43+ at AbsTime
44+ index int // position in s.scheduled
45+ s * Simulated
46+ do func ()
47+ ch <- chan AbsTime
48+ }
49+
50+ func (s * Simulated ) init () {
51+ if s .cond == nil {
52+ s .cond = sync .NewCond (& s .mu )
53+ }
4754}
4855
4956// Run moves the clock by the given duration, executing all timers before that duration.
@@ -53,14 +60,9 @@ func (s *Simulated) Run(d time.Duration) {
5360
5461 end := s .now + AbsTime (d )
5562 var do []func ()
56- for len (s .scheduled ) > 0 {
57- ev := s .scheduled [0 ]
58- if ev .at > end {
59- break
60- }
61- s .now = ev .at
63+ for len (s .scheduled ) > 0 && s .scheduled [0 ].at <= end {
64+ ev := heap .Pop (& s .scheduled ).(* simTimer )
6265 do = append (do , ev .do )
63- s .scheduled = s .scheduled [1 :]
6466 }
6567 s .now = end
6668 s .mu .Unlock ()
@@ -102,61 +104,106 @@ func (s *Simulated) Sleep(d time.Duration) {
102104 <- s .After (d )
103105}
104106
107+ // NewTimer creates a timer which fires when the clock has advanced by d.
108+ func (s * Simulated ) NewTimer (d time.Duration ) ChanTimer {
109+ s .mu .Lock ()
110+ defer s .mu .Unlock ()
111+
112+ ch := make (chan AbsTime , 1 )
113+ var timer * simTimer
114+ timer = s .schedule (d , func () { ch <- timer .at })
115+ timer .ch = ch
116+ return timer
117+ }
118+
105119// After returns a channel which receives the current time after the clock
106120// has advanced by d.
107- func (s * Simulated ) After (d time.Duration ) <- chan time.Time {
108- after := make (chan time.Time , 1 )
109- s .AfterFunc (d , func () {
110- after <- (time.Time {}).Add (time .Duration (s .now ))
111- })
112- return after
121+ func (s * Simulated ) After (d time.Duration ) <- chan AbsTime {
122+ return s .NewTimer (d ).C ()
113123}
114124
115125// AfterFunc runs fn after the clock has advanced by d. Unlike with the system
116126// clock, fn runs on the goroutine that calls Run.
117127func (s * Simulated ) AfterFunc (d time.Duration , fn func ()) Timer {
118128 s .mu .Lock ()
119129 defer s .mu .Unlock ()
130+
131+ return s .schedule (d , fn )
132+ }
133+
134+ func (s * Simulated ) schedule (d time.Duration , fn func ()) * simTimer {
120135 s .init ()
121136
122137 at := s .now + AbsTime (d )
123- s .lastId ++
124- id := s .lastId
125- l , h := 0 , len (s .scheduled )
126- ll := h
127- for l != h {
128- m := (l + h ) / 2
129- if (at < s .scheduled [m ].at ) || ((at == s .scheduled [m ].at ) && (id < s .scheduled [m ].id )) {
130- h = m
131- } else {
132- l = m + 1
133- }
134- }
135138 ev := & simTimer {do : fn , at : at , s : s }
136- s .scheduled = append (s .scheduled , nil )
137- copy (s .scheduled [l + 1 :], s .scheduled [l :ll ])
138- s .scheduled [l ] = ev
139+ heap .Push (& s .scheduled , ev )
139140 s .cond .Broadcast ()
140141 return ev
141142}
142143
143144func (ev * simTimer ) Stop () bool {
144- s := ev .s
145- s .mu .Lock ()
146- defer s .mu .Unlock ()
145+ ev .s .mu .Lock ()
146+ defer ev .s .mu .Unlock ()
147147
148- for i := 0 ; i < len (s .scheduled ); i ++ {
149- if s .scheduled [i ] == ev {
150- s .scheduled = append (s .scheduled [:i ], s .scheduled [i + 1 :]... )
151- s .cond .Broadcast ()
152- return true
153- }
148+ if ev .index < 0 {
149+ return false
154150 }
155- return false
151+ heap .Remove (& ev .s .scheduled , ev .index )
152+ ev .s .cond .Broadcast ()
153+ ev .index = - 1
154+ return true
156155}
157156
158- func (s * Simulated ) init ( ) {
159- if s . cond == nil {
160- s . cond = sync . NewCond ( & s . mu )
157+ func (ev * simTimer ) Reset ( d time. Duration ) {
158+ if ev . ch == nil {
159+ panic ( "mclock: Reset() on timer created by AfterFunc" )
161160 }
161+
162+ ev .s .mu .Lock ()
163+ defer ev .s .mu .Unlock ()
164+ ev .at = ev .s .now .Add (d )
165+ if ev .index < 0 {
166+ heap .Push (& ev .s .scheduled , ev ) // already expired
167+ } else {
168+ heap .Fix (& ev .s .scheduled , ev .index ) // hasn't fired yet, reschedule
169+ }
170+ ev .s .cond .Broadcast ()
171+ }
172+
173+ func (ev * simTimer ) C () <- chan AbsTime {
174+ if ev .ch == nil {
175+ panic ("mclock: C() on timer created by AfterFunc" )
176+ }
177+ return ev .ch
178+ }
179+
180+ type simTimerHeap []* simTimer
181+
182+ func (h * simTimerHeap ) Len () int {
183+ return len (* h )
184+ }
185+
186+ func (h * simTimerHeap ) Less (i , j int ) bool {
187+ return (* h )[i ].at < (* h )[j ].at
188+ }
189+
190+ func (h * simTimerHeap ) Swap (i , j int ) {
191+ (* h )[i ], (* h )[j ] = (* h )[j ], (* h )[i ]
192+ (* h )[i ].index = i
193+ (* h )[j ].index = j
194+ }
195+
196+ func (h * simTimerHeap ) Push (x interface {}) {
197+ t := x .(* simTimer )
198+ t .index = len (* h )
199+ * h = append (* h , t )
200+ }
201+
202+ func (h * simTimerHeap ) Pop () interface {} {
203+ end := len (* h ) - 1
204+ t := (* h )[end ]
205+ t .index = - 1
206+ (* h )[end ] = nil
207+ * h = (* h )[:end ]
208+ return t
162209}
0 commit comments