@@ -3,6 +3,7 @@ package pubsub
33import (
44 "errors"
55 "github.com/google/uuid"
6+ "github.com/hashicorp/go-set"
67 "log"
78 "sync"
89)
@@ -11,15 +12,17 @@ func (l *localPubSub) CreateTopic(topic string) error {
1112 if l .closed {
1213 return errors .New ("pubsub client is closed" )
1314 }
14- if l .topics .Contains (topic ) {
15+ l .mutex .RLock ()
16+ isContains := l .topics .Contains (topic )
17+ l .mutex .RUnlock ()
18+ if isContains {
1519 return nil
1620 } else {
17- // lock
1821 l .mutex .Lock ()
19- defer l .mutex .Unlock ()
2022 // insert
2123 l .topics .Insert (topic )
2224 l .subscriptions [topic ] = make (map [string ]localPubSubSubscription )
25+ l .mutex .Unlock ()
2326 return nil
2427 }
2528}
@@ -28,56 +31,45 @@ func (l *localPubSub) RemoveTopic(topic string) error {
2831 if l .closed {
2932 return errors .New ("pubsub client is closed" )
3033 }
31- // lock
32- l . mutex . Lock ( )
33- defer l .mutex .Unlock ()
34+ l . mutex . RLock ()
35+ isContains := l . topics . Contains ( topic )
36+ l .mutex .RUnlock ()
3437 // check if topic exists
35- if ! l . topics . Contains ( topic ) {
38+ if ! isContains {
3639 return nil
3740 } else {
38- // close and remove all subscribers
39- for subscriptionRecord := range l .subscriptions [topic ] {
40- l .cancelSubscriptions (topic , subscriptionRecord )
41+ l .mutex .Lock ()
42+ // close all subscribers
43+ subscriptionRecords := l .subscriptions [topic ]
44+ for _ , subscription := range subscriptionRecords {
45+ m := subscription .Mutex
46+ m .Lock ()
47+ close (subscription .Channel )
48+ m .Unlock ()
4149 }
42- // delete subscribers
50+ // delete topic
4351 delete (l .subscriptions , topic )
52+ l .mutex .Unlock ()
4453 }
4554 return nil
4655}
4756
48- func (l * localPubSub ) cancelSubscriptions (topic string , subscriptionId string ) {
49- if l .closed {
50- return
51- }
52- // verify topic exists with ok
53- if _ , ok := l .subscriptions [topic ]; ! ok {
54- return
55- }
56- // verify subscription exists
57- if _ , ok := l.subscriptions [topic ][subscriptionId ]; ! ok {
58- return
59- }
60- // lock
61- mutex := l.subscriptions [topic ][subscriptionId ].Mutex
62- mutex .Lock ()
63- defer mutex .Unlock ()
64- // close channel
65- close (l.subscriptions [topic ][subscriptionId ].Channel )
66- // delete subscription
67- delete (l .subscriptions [topic ], subscriptionId )
68- }
69-
7057// Subscribe returns a subscription id and a channel to listen to
7158func (l * localPubSub ) Subscribe (topic string ) (string , <- chan string , error ) {
7259 if l .closed {
7360 return "" , nil , errors .New ("pubsub client is closed" )
7461 }
7562 // lock
76- l .mutex .Lock ()
77- defer l .mutex .Unlock ()
63+ l .mutex .RLock ()
64+ isContains := l .topics .Contains (topic )
65+ l .mutex .RUnlock ()
7866 // check if topic exists
79- if ! l .topics .Contains (topic ) {
80- return "" , nil , errors .New ("topic does not exist" )
67+ if ! isContains {
68+ l .mutex .Lock ()
69+ // insert topic
70+ l .topics .Insert (topic )
71+ l .subscriptions [topic ] = make (map [string ]localPubSubSubscription )
72+ l .mutex .Unlock ()
8173 }
8274 // create a new subscription id
8375 subscriptionId := topic + "_" + uuid .NewString ()
@@ -88,8 +80,12 @@ func (l *localPubSub) Subscribe(topic string) (string, <-chan string, error) {
8880 Mutex : & sync.RWMutex {},
8981 Channel : channel ,
9082 }
83+ // critical section
84+ l .mutex .Lock ()
9185 // add subscription record to subscriptions
9286 l.subscriptions [topic ][subscriptionId ] = subscriptionRecord
87+ // unlock
88+ l .mutex .Unlock ()
9389 // return subscription id and channel
9490 return subscriptionId , channel , nil
9591}
@@ -100,29 +96,32 @@ func (l *localPubSub) Unsubscribe(topic string, subscriptionId string) error {
10096 return errors .New ("pubsub client is closed" )
10197 }
10298 // lock main mutex
103- l .mutex .Lock ()
104- defer l .mutex .Unlock ()
99+ l .mutex .RLock ()
100+ isContains := l .topics .Contains (topic )
101+ l .mutex .RUnlock ()
105102 // check if topic exists
106- if ! l . topics . Contains ( topic ) {
107- return errors . New ( "topic does not exist" )
103+ if ! isContains {
104+ return nil
108105 }
109106 // check if subscription exists
107+ l .mutex .RLock ()
110108 if _ , ok := l.subscriptions [topic ][subscriptionId ]; ! ok {
111- return errors .New ("subscription does not exist" )
109+ l .mutex .RUnlock ()
110+ return nil
112111 }
113112 // fetch subscription record
114113 subscriptionRecord := l.subscriptions [topic ][subscriptionId ]
114+ l .mutex .RUnlock ()
115115 // lock
116116 mutex := subscriptionRecord .Mutex
117117 mutex .Lock ()
118- defer mutex .Unlock ()
119118 // cleanup channel
120- if _ , ok := <- subscriptionRecord .Channel ; ok {
121- <- subscriptionRecord .Channel
122- close (subscriptionRecord .Channel )
123- }
119+ close (subscriptionRecord .Channel )
120+ mutex .Unlock ()
121+ l .mutex .Lock ()
124122 // delete subscription
125123 delete (l .subscriptions [topic ], subscriptionId )
124+ l .mutex .Unlock ()
126125 return nil
127126}
128127
@@ -131,29 +130,33 @@ func (l *localPubSub) Publish(topic string, data string) error {
131130 if l .closed {
132131 return errors .New ("pubsub client is closed" )
133132 }
134- // lock main mutex
135- l . mutex . Lock ( )
136- defer l .mutex .Unlock ()
133+ l . mutex . RLock ()
134+ isContains := l . topics . Contains ( topic )
135+ l .mutex .RUnlock ()
137136 // check if topic exists
138- if ! l .topics .Contains (topic ) {
137+ if ! isContains {
138+ l .mutex .Lock ()
139139 // insert topic
140140 l .topics .Insert (topic )
141141 l .subscriptions [topic ] = make (map [string ]localPubSubSubscription )
142+ l .mutex .Unlock ()
142143 }
143144 // fetch all subscriptions
145+ l .mutex .RLock ()
144146 subscriptions := l .subscriptions [topic ]
147+ l .mutex .RUnlock ()
145148 // iterate over all subscriptions
146149 for _ , subscriptionRecord := range subscriptions {
147150 // lock subscription mutex
148151 mutex := subscriptionRecord .Mutex
149152 mutex .Lock ()
153+ channel := subscriptionRecord .Channel
150154 // clear channel if full
151- if len (subscriptionRecord . Channel ) == cap (subscriptionRecord . Channel ) {
152- <- subscriptionRecord . Channel
155+ if len (channel ) == cap (channel ) {
156+ <- channel
153157 }
154158 // send data
155- subscriptionRecord .Channel <- data
156- // unlock subscription mutex
159+ channel <- data
157160 mutex .Unlock ()
158161 }
159162 return nil
@@ -177,11 +180,17 @@ func (l *localPubSub) Close() error {
177180 }
178181 // set closed to true
179182 l .closed = true
180- // close and remove all subscribers
183+ // close
181184 for topic := range l .subscriptions {
182- for subscriptionRecord := range l .subscriptions [topic ] {
183- l .cancelSubscriptions (topic , subscriptionRecord )
185+ for _ , subscription := range l .subscriptions [topic ] {
186+ m := subscription .Mutex
187+ m .Lock ()
188+ close (subscription .Channel )
189+ m .Unlock ()
184190 }
185191 }
192+ // remove all topics
193+ l .topics = set.New [string ](0 )
194+ l .subscriptions = make (map [string ]map [string ]localPubSubSubscription )
186195 return nil
187196}
0 commit comments