Skip to content

Commit 1fde953

Browse files
committed
fix: terminate websocket stream on end of reader
1 parent 2480d21 commit 1fde953

File tree

2 files changed

+50
-56
lines changed

2 files changed

+50
-56
lines changed

server/api_application.go

Lines changed: 48 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"path/filepath"
1313
"strings"
1414
GIT_MANAGER "swiftwave/m/git_manager"
15-
"sync"
1615

1716
"github.com/google/uuid"
1817
"github.com/gorilla/websocket"
@@ -417,7 +416,7 @@ func (server *Server) getApplicationBuildLog(c echo.Context) error {
417416
var applicationBuildLog ApplicationBuildLog
418417
logID := c.Param("log_id")
419418
applicationID := c.Param("id")
420-
tx := server.DB_CLIENT.Model(&ApplicationBuildLog{}).Select("logs").Where(map[string]interface{}{
419+
tx := server.DB_CLIENT.Model(&ApplicationBuildLog{}).Select("completed", "logs").Where(map[string]interface{}{
421420
"id": logID,
422421
"application_id": applicationID,
423422
}).Find(&applicationBuildLog)
@@ -437,56 +436,62 @@ func (server *Server) getApplicationBuildLog(c echo.Context) error {
437436
closed := false
438437

439438
// Listen to redis subscriber
440-
pubsub := server.REDIS_CLIENT.Subscribe(context.Background(), applicationBuildLog.GetRedisPubSubChannel())
439+
ctx := context.Background()
440+
channel := "log_update/" + logID
441+
pubsub := server.REDIS_CLIENT.Subscribe(ctx, channel)
441442
defer pubsub.Close()
442-
ch := pubsub.Channel()
443443

444-
// waitgroup for goroutines
445-
wg := sync.WaitGroup{}
446-
447-
// listen for close message from client
448-
wg.Add(1)
449-
go func() {
450-
for {
451-
messageType, _, err := ws.ReadMessage()
452-
if err != nil {
453-
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
454-
c.Logger().Error(err)
444+
if !applicationBuildLog.Completed {
445+
// listen for close message from client
446+
go func() {
447+
for {
448+
messageType, _, err := ws.ReadMessage()
449+
if err != nil {
450+
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
451+
c.Logger().Error(err)
452+
}
453+
closed = true
454+
return
455+
}
456+
if messageType == websocket.CloseMessage {
457+
// Peer has sent a close message, indicating they want to disconnect
458+
log.Println("Peer has sent a close message, indicating they want to disconnect")
459+
return
455460
}
456-
closed = true
457-
break
458-
}
459-
if messageType == websocket.CloseMessage {
460-
// Peer has sent a close message, indicating they want to disconnect
461-
log.Println("Peer has sent a close message, indicating they want to disconnect")
462-
break
463461
}
464-
}
465-
wg.Done()
466-
}()
462+
}()
463+
}
467464

468465
// send logs to client
469-
wg.Add(1)
470466
go func() {
471467
// Write initial logs
472468
err := ws.WriteMessage(websocket.TextMessage, []byte(applicationBuildLog.Logs))
473469
if err != nil {
474470
log.Println("failed to write initial logs to websocket")
475471
}
476-
// Listen to redis channel and send logs to client
477-
for msg := range ch {
478-
if closed {
479-
break
480-
}
481-
err := ws.WriteMessage(websocket.TextMessage, []byte(msg.Payload))
482-
if err != nil {
483-
log.Println("failed to write logs to websocket")
472+
if !applicationBuildLog.Completed {
473+
// Listen to redis channel and send logs to client
474+
for {
475+
if closed {
476+
return
477+
}
478+
msg, err := pubsub.ReceiveMessage(ctx)
479+
if err != nil {
480+
log.Println(err)
481+
return
482+
} else {
483+
if msg.Payload == "SWIFTWAVE_EOF_LOG" {
484+
return
485+
}
486+
err := ws.WriteMessage(websocket.TextMessage, []byte(msg.Payload))
487+
if err != nil {
488+
log.Println("failed to write logs to websocket")
489+
}
490+
}
484491
}
485492
}
486-
wg.Done()
487493
}()
488-
wg.Wait()
489-
return nil
494+
select {}
490495
}
491496

492497
// GET /ws/application/:id/logs/runtime
@@ -518,11 +523,7 @@ func (server *Server) getApplicationRuntimeLogs(c echo.Context) error {
518523

519524
closed := false
520525

521-
// waitgroup for goroutines
522-
wg := sync.WaitGroup{}
523-
524526
// listen for close message from client
525-
wg.Add(1)
526527
go func() {
527528
for {
528529
messageType, _, err := ws.ReadMessage()
@@ -531,42 +532,38 @@ func (server *Server) getApplicationRuntimeLogs(c echo.Context) error {
531532
c.Logger().Error(err)
532533
}
533534
closed = true
534-
break
535+
return
535536
}
536537
if messageType == websocket.CloseMessage {
537538
// Peer has sent a close message, indicating they want to disconnect
538539
log.Println("Peer has sent a close message, indicating they want to disconnect")
539-
break
540+
return
540541
}
541542
}
542-
wg.Done()
543543
}()
544544

545545
// send logs to client
546-
wg.Add(1)
547546
go func() {
548547
// Write
549548
for scanner.Scan() {
550549
if closed {
551-
break
550+
return
552551
}
553552
// Specific format for raw-stream logs
554553
// docs : https://docs.docker.com/engine/api/v1.42/#tag/Container/operation/ContainerAttach
555554
log_text := scanner.Bytes()
556555
if len(log_text) > 8 {
557556
log_text = log_text[8:]
558557
}
558+
// add new line
559+
log_text = append(log_text, []byte("\n")...)
559560
err = ws.WriteMessage(websocket.TextMessage, log_text)
560561
if err != nil {
561562
log.Println("failed to write logs to websocket")
562563
}
563564
}
564-
wg.Done()
565565
}()
566-
567-
// wait for goroutines to finish
568-
wg.Wait()
569-
return nil
566+
select {}
570567
}
571568

572569
// PUT /application/:id

server/helpers.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func (s *Server) MarkBuildLogAsCompleted(log_id string) {
5454
if tx.Error != nil {
5555
return
5656
}
57+
s.REDIS_CLIENT.Publish(context.Background(), "log_update/"+log_id, "SWIFTWAVE_EOF_LOG")
5758
logRecord.Completed = true
5859
s.DB_CLIENT.Save(&logRecord)
5960
}
@@ -89,8 +90,4 @@ func (s ApplicationSource) GetSourceSummary() string {
8990

9091
func (s *Server) isProductionEnvironment() bool {
9192
return strings.Compare(s.ENVIRONMENT, "production") == 0
92-
}
93-
94-
func (a *ApplicationBuildLog) GetRedisPubSubChannel() string {
95-
return "log_update/" + a.ID
96-
}
93+
}

0 commit comments

Comments
 (0)