From 5bb4e3ff45c0914d933f5210b0aa01541ded3d65 Mon Sep 17 00:00:00 2001 From: Mikhail Klementev Date: Mon, 7 Oct 2024 22:06:04 +0000 Subject: [PATCH] feat: realtime output --- artifact/artifact.go | 30 ++++++++++---- artifact/preload.go | 2 +- artifact/process.go | 16 ++++++-- cmd/container.go | 9 +++++ cmd/debug.go | 3 +- cmd/kernel.go | 2 + cmd/pew.go | 3 +- container/container.go | 53 +++++++++++++++++++++++++ daemon/process.go | 2 +- qemu/qemu-kernel.go | 90 +++++++++++++++++++++++++++++++----------- 10 files changed, 173 insertions(+), 37 deletions(-) diff --git a/artifact/artifact.go b/artifact/artifact.go index 5cac7c2..18a69ea 100644 --- a/artifact/artifact.go +++ b/artifact/artifact.go @@ -240,8 +240,9 @@ func (ka Artifact) Supported(ki distro.KernelInfo) (supported bool, err error) { return } +// TODO too many parameters func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, - outputOnSuccess, endless bool, cBinary, + outputOnSuccess, realtimeOutput, endless bool, cBinary, cEndlessStress string, cEndlessTimeout time.Duration, dump func(q *qemu.System, ka Artifact, ki distro.KernelInfo, result *Result)) { @@ -333,14 +334,14 @@ func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, // TODO: build should return structure start := time.Now() result.BuildDir, result.BuildArtifact, result.Build.Output, err = - Build(slog, tmp, ka, ki, ka.Docker.Timeout.Duration) + Build(slog, tmp, ka, ki, ka.Docker.Timeout.Duration, realtimeOutput) slog.Debug().Str("duration", time.Since(start).String()). Msg("build done") if err != nil { slog.Error().Err(err).Msgf("build failure\n%v\n", result.Build.Output) return } else { - if outputOnSuccess { + if outputOnSuccess && !realtimeOutput { slog.Info().Msgf("build success\n%v\n", result.Build.Output) } else { slog.Info().Msg("build success") @@ -403,24 +404,37 @@ func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, } var qemuTestOutput string - q.SetOutputHandler(func(s string) { - qemuTestOutput += s + "\n" + q.SetQemuOutputHandler(func(s string) { + if realtimeOutput { + fmt.Printf("kmsg: %s\n", s) + } else { + qemuTestOutput += s + "\n" + } }) + if realtimeOutput { + q.SetCommandsOutputHandler(func(s string) { + fmt.Printf("test: %s\n", s) + }) + } + start := time.Now() - copyArtifactAndTest(slog, q, ka, &result, remoteTest, outputOnSuccess) + copyArtifactAndTest(slog, q, ka, &result, remoteTest, outputOnSuccess, realtimeOutput) slog.Debug().Str("duration", time.Since(start).String()). Msgf("test completed (success: %v)", result.Test.Ok) if result.Build.Ok { if !result.Run.Ok || !result.Test.Ok { slog.Error().Msgf("qemu output\n%v\n", qemuTestOutput) - } else if outputOnSuccess { + } else if outputOnSuccess && !realtimeOutput { slog.Info().Msgf("qemu output\n%v\n", qemuTestOutput) } } - q.CloseOutputHandler() + if realtimeOutput { + q.CloseCommandsOutputHandler() + } + q.CloseQemuOutputHandler() if !endless { return diff --git a/artifact/preload.go b/artifact/preload.go index f247ba7..92d8596 100644 --- a/artifact/preload.go +++ b/artifact/preload.go @@ -111,7 +111,7 @@ func buildPreload(workPath, tmp string, ki distro.KernelInfo, dockerTimeout = ka.Docker.Timeout.Duration } - _, af, _, err = Build(log.Logger, tmp, ka, ki, dockerTimeout) + _, af, _, err = Build(log.Logger, tmp, ka, ki, dockerTimeout, false) return } diff --git a/artifact/process.go b/artifact/process.go index b7d4b18..edd3f2f 100644 --- a/artifact/process.go +++ b/artifact/process.go @@ -103,7 +103,7 @@ func applyPatches(src string, ka Artifact) (err error) { } func Build(flog zerolog.Logger, tmp string, ka Artifact, - ki distro.KernelInfo, dockerTimeout time.Duration) ( + ki distro.KernelInfo, dockerTimeout time.Duration, realtimeOutput bool) ( outdir, outpath, output string, err error) { target := strings.Replace(ka.Name, " ", "_", -1) @@ -157,9 +157,19 @@ func Build(flog zerolog.Logger, tmp string, ka Artifact, c.Args = append(c.Args, "--network", "none") + if realtimeOutput { + c.SetCommandsOutputHandler(func(s string) { + fmt.Printf("%s\n", s) + }) + } + output, err = c.Run(outdir, []string{ buildCommand + " && chmod -R 777 /work", }) + + if realtimeOutput { + c.CloseCommandsOutputHandler() + } } else { cmd := exec.Command("bash", "-c", "cd "+outdir+" && "+ buildCommand) @@ -281,7 +291,7 @@ func CopyFile(sourcePath, destinationPath string) (err error) { } func copyArtifactAndTest(slog zerolog.Logger, q *qemu.System, ka Artifact, - res *Result, remoteTest string, outputOnSuccess bool) (err error) { + res *Result, remoteTest string, outputOnSuccess, realtimeOutput bool) (err error) { // Copy all test files to the remote machine for _, f := range ka.TestFiles { @@ -346,7 +356,7 @@ func copyArtifactAndTest(slog zerolog.Logger, q *qemu.System, ka Artifact, return } - if outputOnSuccess { + if outputOnSuccess && !realtimeOutput { slog.Info().Msgf("test success\n%v\n", res.Test.Output) } else { slog.Info().Msg("test success") diff --git a/cmd/container.go b/cmd/container.go index 0060eda..d38946b 100644 --- a/cmd/container.go +++ b/cmd/container.go @@ -24,6 +24,15 @@ type ContainerCmd struct { Update ContainerUpdateCmd `cmd:"" help:"update containers"` Save ContainerSaveCmd `cmd:"" help:"save containers"` Cleanup ContainerCleanupCmd `cmd:"" help:"cleanup containers"` + + RealtimeOutput RealtimeContainerOutputFlag `help:"show realtime output"` +} + +type RealtimeContainerOutputFlag bool + +func (f RealtimeContainerOutputFlag) AfterApply() (err error) { + container.Stdout = bool(f) + return } func (cmd ContainerCmd) Containers() (diis []container.Image, err error) { diff --git a/cmd/debug.go b/cmd/debug.go index dfb3b92..5db58ab 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -193,7 +193,8 @@ func (cmd *DebugCmd) Run(g *Globals) (err error) { return } } else { - buildDir, outFile, output, err = artifact.Build(log.Logger, tmp, ka, ki, g.Config.Docker.Timeout.Duration) + buildDir, outFile, output, err = artifact.Build( + log.Logger, tmp, ka, ki, g.Config.Docker.Timeout.Duration, false) if err != nil { log.Error().Err(err).Msg(output) return diff --git a/cmd/kernel.go b/cmd/kernel.go index 3286bde..c29adc2 100644 --- a/cmd/kernel.go +++ b/cmd/kernel.go @@ -39,6 +39,8 @@ type KernelCmd struct { ContainerTimeout time.Duration `help:"container timeout"` + RealtimeOutput RealtimeContainerOutputFlag `help:"show realtime output"` + List KernelListCmd `cmd:"" help:"list kernels"` ListRemote KernelListRemoteCmd `cmd:"" help:"list remote kernels"` Autogen KernelAutogenCmd `cmd:"" help:"generate kernels based on the current config"` diff --git a/cmd/pew.go b/cmd/pew.go index 2ff2883..fae9c55 100644 --- a/cmd/pew.go +++ b/cmd/pew.go @@ -88,6 +88,7 @@ type PewCmd struct { IncludeInternalErrors bool `help:"count internal errors as part of the success rate"` OutputOnSuccess bool `help:"show output on success"` + RealtimeOutput bool `help:"show realtime output"` Endless bool `help:"endless tests"` EndlessTimeout time.Duration `help:"timeout between tests" default:"1m"` @@ -447,7 +448,7 @@ func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup, Str("kernel", ki.KernelRelease). Logger() - ka.Process(slog, ki, cmd.OutputOnSuccess, + ka.Process(slog, ki, cmd.OutputOnSuccess, cmd.RealtimeOutput, cmd.Endless, cmd.Binary, cmd.EndlessStress, cmd.EndlessTimeout, func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, result *artifact.Result) { dumpResult(q, ka, ki, result, cmd.Dist, cmd.Tag, cmd.Binary, cmd.DB) diff --git a/container/container.go b/container/container.go index 9e9d119..c0e1dd7 100644 --- a/container/container.go +++ b/container/container.go @@ -14,6 +14,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "time" "github.com/cavaliergopher/grab/v3" @@ -45,6 +46,8 @@ var UsePrebuilt = true var Prune = true +var Stdout = false + type Image struct { Name string Distro distro.Distro @@ -176,6 +179,11 @@ type Container struct { Args []string Log zerolog.Logger + + commandsOutput struct { + listener chan string + mu sync.Mutex + } } func New(dist distro.Distro) (c Container, err error) { @@ -234,6 +242,43 @@ func NewFromKernelInfo(ki distro.KernelInfo) ( return } +// c.SetCommandsOutputHandler(func(s string) { fmt.Println(s) }) +// defer c.CloseCommandsOutputHandler() +func (c *Container) SetCommandsOutputHandler(handler func(s string)) { + c.commandsOutput.mu.Lock() + defer c.commandsOutput.mu.Unlock() + + c.commandsOutput.listener = make(chan string) + + go func(l chan string) { + for m := range l { + if m != "" { + handler(m) + } + } + }(c.commandsOutput.listener) +} + +func (c *Container) CloseCommandsOutputHandler() { + c.commandsOutput.mu.Lock() + defer c.commandsOutput.mu.Unlock() + + close(c.commandsOutput.listener) + c.commandsOutput.listener = nil +} + +func (c *Container) handleCommandsOutput(m string) { + if c.commandsOutput.listener == nil { + return + } + c.commandsOutput.mu.Lock() + defer c.commandsOutput.mu.Unlock() + + if c.commandsOutput.listener != nil { + c.commandsOutput.listener <- m + } +} + func (c Container) Name() string { return c.name } @@ -408,6 +453,10 @@ func (c Container) build(imagePath string) (output string, err error) { scanner := bufio.NewScanner(stdout) for scanner.Scan() { m := scanner.Text() + if Stdout { + fmt.Println(m) + } + c.handleCommandsOutput(m) output += m + "\n" flog.Trace().Str("stdout", m).Msg("") } @@ -481,6 +530,10 @@ func (c Container) Run(workdir string, cmds []string) (out string, err error) { scanner := bufio.NewScanner(stdout) for scanner.Scan() { m := scanner.Text() + if Stdout { + fmt.Println(m) + } + c.handleCommandsOutput(m) out += m + "\n" flog.Trace().Str("stdout", m).Msg("") } diff --git a/daemon/process.go b/daemon/process.go index b7572d4..d01f9e7 100644 --- a/daemon/process.go +++ b/daemon/process.go @@ -131,7 +131,7 @@ func (pj *jobProcessor) Process(res *Resources) (err error) { var result *artifact.Result var dq *qemu.System - pj.job.Artifact.Process(pj.log, pj.job.Target, false, false, "", "", 0, + pj.job.Artifact.Process(pj.log, pj.job.Target, false, false, false, "", "", 0, func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, res *artifact.Result) { diff --git a/qemu/qemu-kernel.go b/qemu/qemu-kernel.go index cd65b4c..037a8be 100644 --- a/qemu/qemu-kernel.go +++ b/qemu/qemu-kernel.go @@ -101,7 +101,12 @@ type System struct { Stdout, Stderr string - output struct { + qemuOutput struct { + listener chan string + mu sync.Mutex + } + + commandsOutput struct { listener chan string mu sync.Mutex } @@ -143,38 +148,77 @@ func NewSystem(arch arch, kernel Kernel, drivePath string) (q *System, err error return } -// q.SetOutputHandler(func(s string) { fmt.Println(s) }) -// defer q.CloseOutputHandler() -func (q *System) SetOutputHandler(handler func(s string)) { - q.output.mu.Lock() - defer q.output.mu.Unlock() +// q.SetQemuOutputHandler(func(s string) { fmt.Println(s) }) +// defer q.CloseQemuOutputHandler() +func (q *System) SetQemuOutputHandler(handler func(s string)) { + q.qemuOutput.mu.Lock() + defer q.qemuOutput.mu.Unlock() - q.output.listener = make(chan string) + q.qemuOutput.listener = make(chan string) go func(l chan string) { for m := range l { - handler(m) + if m != "" { + handler(m) + } } - }(q.output.listener) + }(q.qemuOutput.listener) } -func (q *System) CloseOutputHandler() { - q.output.mu.Lock() - defer q.output.mu.Unlock() +func (q *System) CloseQemuOutputHandler() { + q.qemuOutput.mu.Lock() + defer q.qemuOutput.mu.Unlock() - close(q.output.listener) - q.output.listener = nil + close(q.qemuOutput.listener) + q.qemuOutput.listener = nil } -func (q *System) handleOutput(m string) { - if q.output.listener == nil { +func (q *System) handleQemuOutput(m string) { + if q.qemuOutput.listener == nil { return } - q.output.mu.Lock() - defer q.output.mu.Unlock() + q.qemuOutput.mu.Lock() + defer q.qemuOutput.mu.Unlock() - if q.output.listener != nil { - q.output.listener <- m + if q.qemuOutput.listener != nil { + q.qemuOutput.listener <- m + } +} + +// q.SetCommandsOutputHandler(func(s string) { fmt.Println(s) }) +// defer q.CloseCommandsOutputHandler() +func (q *System) SetCommandsOutputHandler(handler func(s string)) { + q.commandsOutput.mu.Lock() + defer q.commandsOutput.mu.Unlock() + + q.commandsOutput.listener = make(chan string) + + go func(l chan string) { + for m := range l { + if m != "" { + handler(m) + } + } + }(q.commandsOutput.listener) +} + +func (q *System) CloseCommandsOutputHandler() { + q.commandsOutput.mu.Lock() + defer q.commandsOutput.mu.Unlock() + + close(q.commandsOutput.listener) + q.commandsOutput.listener = nil +} + +func (q *System) handleCommandsOutput(m string) { + if q.commandsOutput.listener == nil { + return + } + q.commandsOutput.mu.Lock() + defer q.commandsOutput.mu.Unlock() + + if q.commandsOutput.listener != nil { + q.commandsOutput.listener <- m } } @@ -353,7 +397,7 @@ func (q *System) Start() (err error) { scanner := bufio.NewScanner(q.pipe.stdout) for scanner.Scan() { m := scanner.Text() - q.handleOutput(m) + q.handleQemuOutput(m) q.Stdout += m + "\n" q.Log.Trace().Str("stdout", m).Msg("qemu") go q.checkOopsPanic(m) @@ -364,7 +408,7 @@ func (q *System) Start() (err error) { scanner := bufio.NewScanner(q.pipe.stderr) for scanner.Scan() { m := scanner.Text() - q.handleOutput(m) + q.handleQemuOutput(m) q.Stderr += m + "\n" q.Log.Trace().Str("stderr", m).Msg("qemu") } @@ -517,6 +561,7 @@ func (q System) Command(user, cmd string) (output string, err error) { scanner := bufio.NewScanner(stdout) for scanner.Scan() { m := scanner.Text() + q.handleCommandsOutput(m) output += m + "\n" flog.Trace().Str("stdout", m).Msg("qemu command") } @@ -530,6 +575,7 @@ func (q System) Command(user, cmd string) (output string, err error) { scanner := bufio.NewScanner(stderr) for scanner.Scan() { m := scanner.Text() + q.handleCommandsOutput(m) output += m + "\n" // Note: it prints stderr as stdout flog.Trace().Str("stdout", m).Msg("qemu command")