Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 53 additions & 87 deletions cmd/dbos/cli_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"context"
"database/sql"
_ "embed"
Expand All @@ -14,6 +13,7 @@ import (
"os/exec"
"path/filepath"
"runtime"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -96,17 +96,28 @@ func TestCLIWorkflow(t *testing.T) {
testProjectInitialization(t, cliPath)
})

t.Run("ApplicationLifecycle", func(t *testing.T) {
cmd := testApplicationLifecycle(t, cliPath)
t.Cleanup(func() {
if cmd.Process != nil {
/*
fmt.Println(cmd.Stderr)
fmt.Println(cmd.Stdout)
*/
cmd.Process.Kill()
}
})
// Start a test application using dbos start
cmd := exec.CommandContext(context.Background(), cliPath, "start")
cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL())
err = cmd.Start()
require.NoError(t, err, "Failed to start application")
// Wait for server to be ready
require.Eventually(t, func() bool {
resp, err := http.Get("http://localhost:" + testServerPort)
if err != nil {
return false
}
resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 10*time.Second, 500*time.Millisecond, "Server should start within 10 seconds")

t.Cleanup(func() {
fmt.Printf("Cleaning up application process %d\n", cmd.Process.Pid)
// fmt.Println(cmd.Stderr)
// fmt.Println(cmd.Stdout)
err := syscall.Kill(cmd.Process.Pid, syscall.SIGTERM)
require.NoError(t, err, "Failed to send interrupt signal to application process")
_ = cmd.Wait()
})

t.Run("WorkflowCommands", func(t *testing.T) {
Expand All @@ -120,7 +131,6 @@ func TestCLIWorkflow(t *testing.T) {

// testProjectInitialization verifies project initialization
func testProjectInitialization(t *testing.T, cliPath string) {

// Initialize project
cmd := exec.Command(cliPath, "init", testProjectName)
output, err := cmd.CombinedOutput()
Expand Down Expand Up @@ -164,71 +174,6 @@ func testProjectInitialization(t *testing.T, cliPath string) {
require.NoError(t, err, "go mod tidy failed: %s", string(modOutput))
}

// testApplicationLifecycle starts the application and triggers workflows
func testApplicationLifecycle(t *testing.T, cliPath string) *exec.Cmd {
// Should already be in project directory from previous test

// Start the application in background
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

cmd := exec.CommandContext(ctx, cliPath, "start")
cmd.Env = append(os.Environ(), "DBOS_SYSTEM_DATABASE_URL="+getDatabaseURL())

// Capture output for debugging
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = &stderr

err := cmd.Start()
require.NoError(t, err, "Failed to start application")

// Wait for server to be ready
require.Eventually(t, func() bool {
resp, err := http.Get("http://localhost:" + testServerPort)
if err != nil {
return false
}
resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 10*time.Second, 500*time.Millisecond, "Server should start within 10 seconds")

// Trigger workflows via HTTP endpoints
t.Run("TriggerExampleWorkflow", func(t *testing.T) {
resp, err := http.Get("http://localhost:" + testServerPort + "/workflow")
require.NoError(t, err, "Failed to trigger workflow")
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, resp.StatusCode, "Workflow endpoint should return 200")
assert.Contains(t, string(body), "Workflow result", "Should contain workflow result")
})

t.Run("TriggerQueueWorkflow", func(t *testing.T) {
resp, err := http.Get("http://localhost:" + testServerPort + "/queue")
require.NoError(t, err, "Failed to trigger queue workflow")
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, http.StatusOK, resp.StatusCode, "Queue endpoint should return 200")

// Parse JSON response to get workflow ID
var response map[string]string
err = json.Unmarshal(body, &response)
require.NoError(t, err, "Should be valid JSON response")

workflowID, exists := response["workflow_id"]
assert.True(t, exists, "Response should contain workflow_id")
assert.NotEmpty(t, workflowID, "Workflow ID should not be empty")
})

return cmd
}

// testWorkflowCommands comprehensively tests all workflow CLI commands
func testWorkflowCommands(t *testing.T, cliPath string) {

Expand Down Expand Up @@ -256,8 +201,29 @@ func testWorkflowCommands(t *testing.T, cliPath string) {
// testListWorkflows tests various workflow listing scenarios
func testListWorkflows(t *testing.T, cliPath string) {
// Create some test workflows first to ensure we have data to filter
// The previous test functions have already created workflows that we can query
resp, err := http.Get("http://localhost:" + testServerPort + "/workflow")
require.NoError(t, err, "Failed to trigger workflow")
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode, "Workflow endpoint should return 200")
assert.Contains(t, string(body), "Workflow result", "Should contain workflow result")

resp, err = http.Get("http://localhost:" + testServerPort + "/queue")
require.NoError(t, err, "Failed to trigger queue workflow")
defer resp.Body.Close()
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode, "Queue endpoint should return 200")

// Parse JSON response to get workflow ID
var response map[string]string
err = json.Unmarshal(body, &response)
require.NoError(t, err, "Should be valid JSON response")

workflowID, exists := response["workflow_id"]
assert.True(t, exists, "Response should contain workflow_id")
assert.NotEmpty(t, workflowID, "Workflow ID should not be empty")
// Get the current time for time-based filtering
currentTime := time.Now()

Expand Down Expand Up @@ -724,14 +690,14 @@ func buildCLI(t *testing.T) string {
// Build output path in the cmd directory
cliPath := filepath.Join(cmdDir, "dbos-cli-test")

// Check if already built
if _, err := os.Stat(cliPath); os.IsNotExist(err) {
// Build the CLI from the cmd directory
buildCmd := exec.Command("go", "build", "-o", "dbos-cli-test", ".")
buildCmd.Dir = cmdDir
buildOutput, buildErr := buildCmd.CombinedOutput()
require.NoError(t, buildErr, "Failed to build CLI: %s", string(buildOutput))
}
// Delete any existing binary before building
os.Remove(cliPath)

// Build the CLI from the cmd directory
buildCmd := exec.Command("go", "build", "-o", "dbos-cli-test", ".")
buildCmd.Dir = cmdDir
buildOutput, buildErr := buildCmd.CombinedOutput()
require.NoError(t, buildErr, "Failed to build CLI: %s", string(buildOutput))

// Return absolute path
absPath, err := filepath.Abs(cliPath)
Expand Down
18 changes: 15 additions & 3 deletions cmd/dbos/cli_test_app.go.test
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/dbos-inc/dbos-transact-golang/dbos"
Expand Down Expand Up @@ -68,7 +70,7 @@ func QueueWorkflow(ctx dbos.DBOSContext, _ string) (string, error) {
}
handles[i] = handle
}
time.Sleep(10 * time.Second) // give some time for our tests to do wf management
time.Sleep(10 * time.Second) // give some time for our tests to do wf management
return fmt.Sprintf("Successfully enqueued %d steps", len(handles)), nil
}

Expand Down Expand Up @@ -114,6 +116,16 @@ func main() {
http.HandleFunc("/queue", queueHandler)
http.HandleFunc("/", healthHandler)

// Set up signal handling
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)

go func() {
<-sigChan
fmt.Println("Received interrupt signal, shutting down...")
os.Exit(0)
}()

fmt.Println("Server starting on http://localhost:8080")
err = http.ListenAndServe(":8080", nil)
if err != nil {
Expand Down Expand Up @@ -156,5 +168,5 @@ func queueHandler(w http.ResponseWriter, r *http.Request) {
}

func healthHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "healthy")
}
fmt.Fprintf(w, "healthy")
}
8 changes: 7 additions & 1 deletion cmd/dbos/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"runtime"
"syscall"
"time"

"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -73,7 +74,7 @@ func runStart(cmd *cobra.Command, args []string) error {
return fmt.Errorf("command failed: %w", err)
}
case sig := <-sigChan:
logger.Info("Received signal, stopping...", "signal", sig)
logger.Info("Received signal, stopping...", "signal", sig.String())

// Kill the process group on Unix-like systems
if runtime.GOOS != "windows" {
Expand All @@ -90,6 +91,11 @@ func runStart(cmd *cobra.Command, args []string) error {
if runtime.GOOS != "windows" {
syscall.Kill(-process.Process.Pid, syscall.SIGKILL)
}
case <-time.After(10 * time.Second):
// Force kill after timeout
if runtime.GOOS != "windows" {
syscall.Kill(-process.Process.Pid, syscall.SIGKILL)
}
}

os.Exit(0)
Expand Down
10 changes: 7 additions & 3 deletions dbos/conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,9 @@ func (c *Conductor) handleListWorkflowsRequest(data []byte, requestID string) er
var opts []ListWorkflowsOption
opts = append(opts, WithLoadInput(req.Body.LoadInput))
opts = append(opts, WithLoadOutput(req.Body.LoadOutput))
opts = append(opts, WithSortDesc())
if req.Body.SortDesc {
opts = append(opts, WithSortDesc())
}
if len(req.Body.WorkflowUUIDs) > 0 {
opts = append(opts, WithWorkflowIDs(req.Body.WorkflowUUIDs))
}
Expand Down Expand Up @@ -638,8 +640,7 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
var opts []ListWorkflowsOption
opts = append(opts, WithLoadInput(req.Body.LoadInput))
opts = append(opts, WithLoadOutput(false)) // Don't load output for queued workflows
opts = append(opts, WithSortDesc())
opts = append(opts, WithQueuesOnly()) // Only include workflows that are in queues
opts = append(opts, WithQueuesOnly()) // Only include workflows that are in queues

// Add status filter for queued workflows
queuedStatuses := make([]WorkflowStatusType, 0)
Expand All @@ -656,6 +657,9 @@ func (c *Conductor) handleListQueuedWorkflowsRequest(data []byte, requestID stri
}
opts = append(opts, WithStatus(queuedStatuses))

if req.Body.SortDesc {
opts = append(opts, WithSortDesc())
}
if req.Body.WorkflowName != nil {
opts = append(opts, WithName(*req.Body.WorkflowName))
}
Expand Down
Loading