Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 65 additions & 18 deletions internal/temporalcli/commands.server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package temporalcli_test

import (
"context"
"fmt"
"net"
"os"
"path/filepath"
Expand Down Expand Up @@ -139,58 +140,104 @@ func startDevServerAndRunSimpleTest(t *testing.T, args []string, dialAddress str
}

func TestServer_StartDev_ConcurrentStarts(t *testing.T) {
startOne := func() {
h := NewCommandHarness(t)
defer h.Close()
h := NewCommandHarness(t)

startOne := func() error {
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

// Start in background, then wait for client to be able to connect
port := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
httpPort := strconv.Itoa(devserver.MustGetFreePort("127.0.0.1"))
resCh := make(chan *CommandResult, 1)
go func() {
resCh <- h.Execute("server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never")
resCh <- h.ExecuteWithContext(ctx, "server", "start-dev", "-p", port, "--http-port", httpPort, "--headless", "--log-level", "never")
}()

// Try to connect for a bit while checking for error
var cl client.Client
h.EventuallyWithT(func(t *assert.CollectT) {
var lastDialErr error
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
timeout := time.NewTimer(3 * time.Second)
defer timeout.Stop()

waitForServer:
for {
select {
case res := <-resCh:
require.NoError(t, res.Err)
require.Fail(t, "got early server result")
default:
return concurrentStartCommandResultError("got early server result", res)
case <-ticker.C:
var err error
cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port})
if err == nil {
break waitForServer
}
lastDialErr = err
case <-timeout.C:
break waitForServer
}
var err error
cl, err = client.Dial(client.Options{HostPort: "127.0.0.1:" + port, Logger: testLogger{t: h.t}})
assert.NoError(t, err)
}, 3*time.Second, 200*time.Millisecond)
}
if cl == nil {
cancel()
select {
case <-time.After(20 * time.Second):
return fmt.Errorf("server was not reachable after 3 seconds: %w; also did not clean up within 20 seconds", lastDialErr)
case res := <-resCh:
if res.Err != nil {
return fmt.Errorf(
"server was not reachable after 3 seconds: %w; cleanup failed: %w",
lastDialErr,
res.Err,
)
}
return fmt.Errorf("server was not reachable after 3 seconds: %w", lastDialErr)
}
}
defer cl.Close()

// Send an interrupt by cancelling context
h.CancelContext()
cancel()

select {
case <-time.After(20 * time.Second):
h.Fail("didn't cleanup after 20 seconds")
return fmt.Errorf("didn't cleanup after 20 seconds")
case res := <-resCh:
h.NoError(res.Err)
if res.Err != nil {
return concurrentStartCommandResultError("server returned error", res)
}
}
return nil
}

// Start 40 dev server instances, with 8 concurrent executions
// Start 40 dev server instances, with 6 concurrent executions.
instanceCounter := atomic.Int32{}
instanceCounter.Store(40)
errCh := make(chan error, 40)
wg := &sync.WaitGroup{}
for i := 0; i < 6; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for instanceCounter.Add(-1) >= 0 {
startOne()
if err := startOne(); err != nil {
errCh <- err
}
}
wg.Done()
}()
}
wg.Wait()
close(errCh)
for err := range errCh {
require.NoError(t, err)
}
}

func concurrentStartCommandResultError(msg string, res *CommandResult) error {
if res.Err != nil {
return fmt.Errorf("%s: %w (stdout: %q, stderr: %q)", msg, res.Err, res.Stdout.String(), res.Stderr.String())
}
return fmt.Errorf("%s (stdout: %q, stderr: %q)", msg, res.Stdout.String(), res.Stderr.String())
}

func TestServer_StartDev_WithSearchAttributes(t *testing.T) {
Expand Down
18 changes: 13 additions & 5 deletions internal/temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,23 @@ type CommandResult struct {
}

func (h *CommandHarness) Execute(args ...string) *CommandResult {
ctx, cancel := context.WithCancel(h.Context)
h.t.Cleanup(cancel)
defer cancel()
return h.execute(ctx, &h.Stdin, args...)
}

func (h *CommandHarness) ExecuteWithContext(ctx context.Context, args ...string) *CommandResult {
var stdin bytes.Buffer
return h.execute(ctx, &stdin, args...)
}

func (h *CommandHarness) execute(ctx context.Context, stdin io.Reader, args ...string) *CommandResult {
// Copy options, update as needed
res := &CommandResult{}
options := h.Options
// Set stdio
options.Stdin = &h.Stdin
options.Stdin = stdin
options.Stdout = &res.Stdout
options.Stderr = &res.Stderr
// Set args
Expand All @@ -175,10 +187,6 @@ func (h *CommandHarness) Execute(args ...string) *CommandResult {
res.Err = err
}

// Run
ctx, cancel := context.WithCancel(h.Context)
h.t.Cleanup(cancel)
defer cancel()
h.t.Logf("Calling: %v", strings.Join(args, " "))
temporalcli.Execute(ctx, options)
if res.Stdout.Len() > 0 {
Expand Down