From 92c39a88a5323a32f4ddeca31de40e1193d75e10 Mon Sep 17 00:00:00 2001 From: Mikhail Klementev Date: Mon, 7 Oct 2024 22:06:04 +0000 Subject: [PATCH] feat: realtime output --- artifact/artifact.go | 28 +++++++++---- artifact/process.go | 10 ++++- cmd/pew.go | 3 +- container/container.go | 45 +++++++++++++++++++++ daemon/process.go | 2 +- qemu/qemu-kernel.go | 90 +++++++++++++++++++++++++++++++----------- 6 files changed, 145 insertions(+), 33 deletions(-) diff --git a/artifact/artifact.go b/artifact/artifact.go index 5cac7c2..f8ec7b8 100644 --- a/artifact/artifact.go +++ b/artifact/artifact.go @@ -240,8 +240,9 @@ func (ka Artifact) Supported(ki distro.KernelInfo) (supported bool, err error) { return } +// TODO too many parameters func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, - outputOnSuccess, endless bool, cBinary, + outputOnSuccess, realtimeOutput, endless bool, cBinary, cEndlessStress string, cEndlessTimeout time.Duration, dump func(q *qemu.System, ka Artifact, ki distro.KernelInfo, result *Result)) { @@ -340,7 +341,7 @@ func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, slog.Error().Err(err).Msgf("build failure\n%v\n", result.Build.Output) return } else { - if outputOnSuccess { + if outputOnSuccess && !realtimeOutput { slog.Info().Msgf("build success\n%v\n", result.Build.Output) } else { slog.Info().Msg("build success") @@ -403,24 +404,37 @@ func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, } var qemuTestOutput string - q.SetOutputHandler(func(s string) { - qemuTestOutput += s + "\n" + q.SetQemuOutputHandler(func(s string) { + if realtimeOutput { + fmt.Printf("kmsg: %s\n", s) + } else { + qemuTestOutput += s + "\n" + } }) + if realtimeOutput { + q.SetCommandsOutputHandler(func(s string) { + fmt.Printf("test: %s\n", s) + }) + } + start := time.Now() - copyArtifactAndTest(slog, q, ka, &result, remoteTest, outputOnSuccess) + copyArtifactAndTest(slog, q, ka, &result, remoteTest, outputOnSuccess, realtimeOutput) slog.Debug().Str("duration", time.Since(start).String()). Msgf("test completed (success: %v)", result.Test.Ok) if result.Build.Ok { if !result.Run.Ok || !result.Test.Ok { slog.Error().Msgf("qemu output\n%v\n", qemuTestOutput) - } else if outputOnSuccess { + } else if outputOnSuccess && !realtimeOutput { slog.Info().Msgf("qemu output\n%v\n", qemuTestOutput) } } - q.CloseOutputHandler() + if realtimeOutput { + q.CloseCommandsOutputHandler() + } + q.CloseQemuOutputHandler() if !endless { return diff --git a/artifact/process.go b/artifact/process.go index b7d4b18..26cb729 100644 --- a/artifact/process.go +++ b/artifact/process.go @@ -157,9 +157,15 @@ func Build(flog zerolog.Logger, tmp string, ka Artifact, c.Args = append(c.Args, "--network", "none") + c.SetCommandsOutputHandler(func(s string) { + fmt.Printf("%s\n", s) + }) + output, err = c.Run(outdir, []string{ buildCommand + " && chmod -R 777 /work", }) + + c.CloseCommandsOutputHandler() } else { cmd := exec.Command("bash", "-c", "cd "+outdir+" && "+ buildCommand) @@ -281,7 +287,7 @@ func CopyFile(sourcePath, destinationPath string) (err error) { } func copyArtifactAndTest(slog zerolog.Logger, q *qemu.System, ka Artifact, - res *Result, remoteTest string, outputOnSuccess bool) (err error) { + res *Result, remoteTest string, outputOnSuccess, realtimeOutput bool) (err error) { // Copy all test files to the remote machine for _, f := range ka.TestFiles { @@ -346,7 +352,7 @@ func copyArtifactAndTest(slog zerolog.Logger, q *qemu.System, ka Artifact, return } - if outputOnSuccess { + if outputOnSuccess && !realtimeOutput { slog.Info().Msgf("test success\n%v\n", res.Test.Output) } else { slog.Info().Msg("test success") diff --git a/cmd/pew.go b/cmd/pew.go index 2ff2883..fae9c55 100644 --- a/cmd/pew.go +++ b/cmd/pew.go @@ -88,6 +88,7 @@ type PewCmd struct { IncludeInternalErrors bool `help:"count internal errors as part of the success rate"` OutputOnSuccess bool `help:"show output on success"` + RealtimeOutput bool `help:"show realtime output"` Endless bool `help:"endless tests"` EndlessTimeout time.Duration `help:"timeout between tests" default:"1m"` @@ -447,7 +448,7 @@ func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup, Str("kernel", ki.KernelRelease). Logger() - ka.Process(slog, ki, cmd.OutputOnSuccess, + ka.Process(slog, ki, cmd.OutputOnSuccess, cmd.RealtimeOutput, cmd.Endless, cmd.Binary, cmd.EndlessStress, cmd.EndlessTimeout, func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, result *artifact.Result) { dumpResult(q, ka, ki, result, cmd.Dist, cmd.Tag, cmd.Binary, cmd.DB) diff --git a/container/container.go b/container/container.go index 9e9d119..ebc6ced 100644 --- a/container/container.go +++ b/container/container.go @@ -14,6 +14,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/cavaliergopher/grab/v3" @@ -176,6 +177,11 @@ type Container struct { Args []string Log zerolog.Logger + + commandsOutput struct { + listener chan string + mu sync.Mutex + } } func New(dist distro.Distro) (c Container, err error) { @@ -234,6 +240,43 @@ func NewFromKernelInfo(ki distro.KernelInfo) ( return } +// c.SetCommandsOutputHandler(func(s string) { fmt.Println(s) }) +// defer c.CloseCommandsOutputHandler() +func (c *Container) SetCommandsOutputHandler(handler func(s string)) { + c.commandsOutput.mu.Lock() + defer c.commandsOutput.mu.Unlock() + + c.commandsOutput.listener = make(chan string) + + go func(l chan string) { + for m := range l { + if m != "" { + handler(m) + } + } + }(c.commandsOutput.listener) +} + +func (c *Container) CloseCommandsOutputHandler() { + c.commandsOutput.mu.Lock() + defer c.commandsOutput.mu.Unlock() + + close(c.commandsOutput.listener) + c.commandsOutput.listener = nil +} + +func (c *Container) handleCommandsOutput(m string) { + if c.commandsOutput.listener == nil { + return + } + c.commandsOutput.mu.Lock() + defer c.commandsOutput.mu.Unlock() + + if c.commandsOutput.listener != nil { + c.commandsOutput.listener <- m + } +} + func (c Container) Name() string { return c.name } @@ -408,6 +451,7 @@ func (c Container) build(imagePath string) (output string, err error) { scanner := bufio.NewScanner(stdout) for scanner.Scan() { m := scanner.Text() + c.handleCommandsOutput(m) output += m + "\n" flog.Trace().Str("stdout", m).Msg("") } @@ -481,6 +525,7 @@ func (c Container) Run(workdir string, cmds []string) (out string, err error) { scanner := bufio.NewScanner(stdout) for scanner.Scan() { m := scanner.Text() + c.handleCommandsOutput(m) out += m + "\n" flog.Trace().Str("stdout", m).Msg("") } diff --git a/daemon/process.go b/daemon/process.go index b7572d4..d01f9e7 100644 --- a/daemon/process.go +++ b/daemon/process.go @@ -131,7 +131,7 @@ func (pj *jobProcessor) Process(res *Resources) (err error) { var result *artifact.Result var dq *qemu.System - pj.job.Artifact.Process(pj.log, pj.job.Target, false, false, "", "", 0, + pj.job.Artifact.Process(pj.log, pj.job.Target, false, false, false, "", "", 0, func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, res *artifact.Result) { diff --git a/qemu/qemu-kernel.go b/qemu/qemu-kernel.go index cd65b4c..037a8be 100644 --- a/qemu/qemu-kernel.go +++ b/qemu/qemu-kernel.go @@ -101,7 +101,12 @@ type System struct { Stdout, Stderr string - output struct { + qemuOutput struct { + listener chan string + mu sync.Mutex + } + + commandsOutput struct { listener chan string mu sync.Mutex } @@ -143,38 +148,77 @@ func NewSystem(arch arch, kernel Kernel, drivePath string) (q *System, err error return } -// q.SetOutputHandler(func(s string) { fmt.Println(s) }) -// defer q.CloseOutputHandler() -func (q *System) SetOutputHandler(handler func(s string)) { - q.output.mu.Lock() - defer q.output.mu.Unlock() +// q.SetQemuOutputHandler(func(s string) { fmt.Println(s) }) +// defer q.CloseQemuOutputHandler() +func (q *System) SetQemuOutputHandler(handler func(s string)) { + q.qemuOutput.mu.Lock() + defer q.qemuOutput.mu.Unlock() - q.output.listener = make(chan string) + q.qemuOutput.listener = make(chan string) go func(l chan string) { for m := range l { - handler(m) + if m != "" { + handler(m) + } } - }(q.output.listener) + }(q.qemuOutput.listener) } -func (q *System) CloseOutputHandler() { - q.output.mu.Lock() - defer q.output.mu.Unlock() +func (q *System) CloseQemuOutputHandler() { + q.qemuOutput.mu.Lock() + defer q.qemuOutput.mu.Unlock() - close(q.output.listener) - q.output.listener = nil + close(q.qemuOutput.listener) + q.qemuOutput.listener = nil } -func (q *System) handleOutput(m string) { - if q.output.listener == nil { +func (q *System) handleQemuOutput(m string) { + if q.qemuOutput.listener == nil { return } - q.output.mu.Lock() - defer q.output.mu.Unlock() + q.qemuOutput.mu.Lock() + defer q.qemuOutput.mu.Unlock() - if q.output.listener != nil { - q.output.listener <- m + if q.qemuOutput.listener != nil { + q.qemuOutput.listener <- m + } +} + +// q.SetCommandsOutputHandler(func(s string) { fmt.Println(s) }) +// defer q.CloseCommandsOutputHandler() +func (q *System) SetCommandsOutputHandler(handler func(s string)) { + q.commandsOutput.mu.Lock() + defer q.commandsOutput.mu.Unlock() + + q.commandsOutput.listener = make(chan string) + + go func(l chan string) { + for m := range l { + if m != "" { + handler(m) + } + } + }(q.commandsOutput.listener) +} + +func (q *System) CloseCommandsOutputHandler() { + q.commandsOutput.mu.Lock() + defer q.commandsOutput.mu.Unlock() + + close(q.commandsOutput.listener) + q.commandsOutput.listener = nil +} + +func (q *System) handleCommandsOutput(m string) { + if q.commandsOutput.listener == nil { + return + } + q.commandsOutput.mu.Lock() + defer q.commandsOutput.mu.Unlock() + + if q.commandsOutput.listener != nil { + q.commandsOutput.listener <- m } } @@ -353,7 +397,7 @@ func (q *System) Start() (err error) { scanner := bufio.NewScanner(q.pipe.stdout) for scanner.Scan() { m := scanner.Text() - q.handleOutput(m) + q.handleQemuOutput(m) q.Stdout += m + "\n" q.Log.Trace().Str("stdout", m).Msg("qemu") go q.checkOopsPanic(m) @@ -364,7 +408,7 @@ func (q *System) Start() (err error) { scanner := bufio.NewScanner(q.pipe.stderr) for scanner.Scan() { m := scanner.Text() - q.handleOutput(m) + q.handleQemuOutput(m) q.Stderr += m + "\n" q.Log.Trace().Str("stderr", m).Msg("qemu") } @@ -517,6 +561,7 @@ func (q System) Command(user, cmd string) (output string, err error) { scanner := bufio.NewScanner(stdout) for scanner.Scan() { m := scanner.Text() + q.handleCommandsOutput(m) output += m + "\n" flog.Trace().Str("stdout", m).Msg("qemu command") } @@ -530,6 +575,7 @@ func (q System) Command(user, cmd string) (output string, err error) { scanner := bufio.NewScanner(stderr) for scanner.Scan() { m := scanner.Text() + q.handleCommandsOutput(m) output += m + "\n" // Note: it prints stderr as stdout flog.Trace().Str("stdout", m).Msg("qemu command")