Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dbos/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func init() {

func runMigrate(cmd *cobra.Command, args []string) error {
// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/dbos/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func runReset(cmd *cobra.Command, args []string) error {
}

// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/dbos/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"os"

"github.com/dbos-inc/dbos-transact-golang/dbos"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)

Expand Down Expand Up @@ -43,7 +42,7 @@ func maskPassword(dbURL string) string {
}

// getDBURL resolves the database URL from flag, config, or environment variable
func getDBURL(_ *cobra.Command) (string, error) {
func getDBURL() (string, error) {
var resolvedURL string
var source string

Expand Down
12 changes: 6 additions & 6 deletions cmd/dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func init() {

func runWorkflowList(cmd *cobra.Command, args []string) error {
// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func runWorkflowGet(cmd *cobra.Command, args []string) error {
workflowID := args[0]

// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func runWorkflowSteps(cmd *cobra.Command, args []string) error {
workflowID := args[0]

// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func runWorkflowCancel(cmd *cobra.Command, args []string) error {
workflowID := args[0]

// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand All @@ -285,7 +285,7 @@ func runWorkflowResume(cmd *cobra.Command, args []string) error {
workflowID := args[0]

// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func runWorkflowFork(cmd *cobra.Command, args []string) error {
workflowID := args[0]

// Get database URL
dbURL, err := getDBURL(cmd)
dbURL, err := getDBURL()
if err != nil {
return err
}
Expand Down
8 changes: 3 additions & 5 deletions dbos/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -839,14 +839,11 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
return earlyReturnPollingHandle, nil
}

outcomeChan := make(chan workflowOutcome[any], 1)

// Create workflow state to track step execution
wfState := &workflowState{
workflowID: workflowID,
stepID: -1, // Steps are O-indexed
}

workflowCtx := WithValue(c, workflowStateKey, wfState)

// If the workflow has a timeout but no deadline, compute the deadline from the timeout.
Expand All @@ -863,7 +860,7 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
if !durableDeadline.IsZero() {
workflowCtx, _ = WithTimeout(workflowCtx, time.Until(durableDeadline))
// Register a cancel function that cancels the workflow in the DB as soon as the context is cancelled
dbosCancelFunction := func() {
workflowCancelFunction := func() {
c.logger.Info("Cancelling workflow", "workflow_id", workflowID)
err = retry(c, func() error {
return c.systemDB.cancelWorkflow(uncancellableCtx, workflowID)
Expand All @@ -873,10 +870,11 @@ func (c *dbosContext) RunWorkflow(_ DBOSContext, fn WorkflowFunc, input any, opt
}
close(cancelFuncCompleted)
}
stopFunc = context.AfterFunc(workflowCtx, dbosCancelFunction)
stopFunc = context.AfterFunc(workflowCtx, workflowCancelFunction)
}

// Run the function in a goroutine
outcomeChan := make(chan workflowOutcome[any], 1)
c.workflowsWg.Add(1)
go func() {
defer c.workflowsWg.Done()
Expand Down
Loading