feat: realtime output
This commit is contained in:
		| @@ -240,8 +240,9 @@ func (ka Artifact) Supported(ki distro.KernelInfo) (supported bool, err error) { | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // TODO too many parameters | ||||
| func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, | ||||
| 	outputOnSuccess, endless bool, cBinary, | ||||
| 	outputOnSuccess, realtimeOutput, endless bool, cBinary, | ||||
| 	cEndlessStress string, cEndlessTimeout time.Duration, | ||||
| 	dump func(q *qemu.System, ka Artifact, ki distro.KernelInfo, | ||||
| 		result *Result)) { | ||||
| @@ -333,14 +334,14 @@ func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, | ||||
| 		// TODO: build should return structure | ||||
| 		start := time.Now() | ||||
| 		result.BuildDir, result.BuildArtifact, result.Build.Output, err = | ||||
| 			Build(slog, tmp, ka, ki, ka.Docker.Timeout.Duration) | ||||
| 			Build(slog, tmp, ka, ki, ka.Docker.Timeout.Duration, realtimeOutput) | ||||
| 		slog.Debug().Str("duration", time.Since(start).String()). | ||||
| 			Msg("build done") | ||||
| 		if err != nil { | ||||
| 			slog.Error().Err(err).Msgf("build failure\n%v\n", result.Build.Output) | ||||
| 			return | ||||
| 		} else { | ||||
| 			if outputOnSuccess { | ||||
| 			if outputOnSuccess && !realtimeOutput { | ||||
| 				slog.Info().Msgf("build success\n%v\n", result.Build.Output) | ||||
| 			} else { | ||||
| 				slog.Info().Msg("build success") | ||||
| @@ -403,24 +404,37 @@ func (ka Artifact) Process(slog zerolog.Logger, ki distro.KernelInfo, | ||||
| 	} | ||||
|  | ||||
| 	var qemuTestOutput string | ||||
| 	q.SetOutputHandler(func(s string) { | ||||
| 		qemuTestOutput += s + "\n" | ||||
| 	q.SetQemuOutputHandler(func(s string) { | ||||
| 		if realtimeOutput { | ||||
| 			fmt.Printf("kmsg: %s\n", s) | ||||
| 		} else { | ||||
| 			qemuTestOutput += s + "\n" | ||||
| 		} | ||||
| 	}) | ||||
|  | ||||
| 	if realtimeOutput { | ||||
| 		q.SetCommandsOutputHandler(func(s string) { | ||||
| 			fmt.Printf("test: %s\n", s) | ||||
| 		}) | ||||
| 	} | ||||
|  | ||||
| 	start := time.Now() | ||||
| 	copyArtifactAndTest(slog, q, ka, &result, remoteTest, outputOnSuccess) | ||||
| 	copyArtifactAndTest(slog, q, ka, &result, remoteTest, outputOnSuccess, realtimeOutput) | ||||
| 	slog.Debug().Str("duration", time.Since(start).String()). | ||||
| 		Msgf("test completed (success: %v)", result.Test.Ok) | ||||
|  | ||||
| 	if result.Build.Ok { | ||||
| 		if !result.Run.Ok || !result.Test.Ok { | ||||
| 			slog.Error().Msgf("qemu output\n%v\n", qemuTestOutput) | ||||
| 		} else if outputOnSuccess { | ||||
| 		} else if outputOnSuccess && !realtimeOutput { | ||||
| 			slog.Info().Msgf("qemu output\n%v\n", qemuTestOutput) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	q.CloseOutputHandler() | ||||
| 	if realtimeOutput { | ||||
| 		q.CloseCommandsOutputHandler() | ||||
| 	} | ||||
| 	q.CloseQemuOutputHandler() | ||||
|  | ||||
| 	if !endless { | ||||
| 		return | ||||
|   | ||||
| @@ -111,7 +111,7 @@ func buildPreload(workPath, tmp string, ki distro.KernelInfo, | ||||
| 		dockerTimeout = ka.Docker.Timeout.Duration | ||||
| 	} | ||||
|  | ||||
| 	_, af, _, err = Build(log.Logger, tmp, ka, ki, dockerTimeout) | ||||
| 	_, af, _, err = Build(log.Logger, tmp, ka, ki, dockerTimeout, false) | ||||
| 	return | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -103,7 +103,7 @@ func applyPatches(src string, ka Artifact) (err error) { | ||||
| } | ||||
|  | ||||
| func Build(flog zerolog.Logger, tmp string, ka Artifact, | ||||
| 	ki distro.KernelInfo, dockerTimeout time.Duration) ( | ||||
| 	ki distro.KernelInfo, dockerTimeout time.Duration, realtimeOutput bool) ( | ||||
| 	outdir, outpath, output string, err error) { | ||||
|  | ||||
| 	target := strings.Replace(ka.Name, " ", "_", -1) | ||||
| @@ -157,9 +157,19 @@ func Build(flog zerolog.Logger, tmp string, ka Artifact, | ||||
|  | ||||
| 		c.Args = append(c.Args, "--network", "none") | ||||
|  | ||||
| 		if realtimeOutput { | ||||
| 			c.SetCommandsOutputHandler(func(s string) { | ||||
| 				fmt.Printf("%s\n", s) | ||||
| 			}) | ||||
| 		} | ||||
|  | ||||
| 		output, err = c.Run(outdir, []string{ | ||||
| 			buildCommand + " && chmod -R 777 /work", | ||||
| 		}) | ||||
|  | ||||
| 		if realtimeOutput { | ||||
| 			c.CloseCommandsOutputHandler() | ||||
| 		} | ||||
| 	} else { | ||||
| 		cmd := exec.Command("bash", "-c", "cd "+outdir+" && "+ | ||||
| 			buildCommand) | ||||
| @@ -281,7 +291,7 @@ func CopyFile(sourcePath, destinationPath string) (err error) { | ||||
| } | ||||
|  | ||||
| func copyArtifactAndTest(slog zerolog.Logger, q *qemu.System, ka Artifact, | ||||
| 	res *Result, remoteTest string, outputOnSuccess bool) (err error) { | ||||
| 	res *Result, remoteTest string, outputOnSuccess, realtimeOutput bool) (err error) { | ||||
|  | ||||
| 	// Copy all test files to the remote machine | ||||
| 	for _, f := range ka.TestFiles { | ||||
| @@ -346,7 +356,7 @@ func copyArtifactAndTest(slog zerolog.Logger, q *qemu.System, ka Artifact, | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	if outputOnSuccess { | ||||
| 	if outputOnSuccess && !realtimeOutput { | ||||
| 		slog.Info().Msgf("test success\n%v\n", res.Test.Output) | ||||
| 	} else { | ||||
| 		slog.Info().Msg("test success") | ||||
|   | ||||
| @@ -24,6 +24,15 @@ type ContainerCmd struct { | ||||
| 	Update  ContainerUpdateCmd  `cmd:"" help:"update containers"` | ||||
| 	Save    ContainerSaveCmd    `cmd:"" help:"save containers"` | ||||
| 	Cleanup ContainerCleanupCmd `cmd:"" help:"cleanup containers"` | ||||
|  | ||||
| 	RealtimeOutput RealtimeContainerOutputFlag `help:"show realtime output"` | ||||
| } | ||||
|  | ||||
| type RealtimeContainerOutputFlag bool | ||||
|  | ||||
| func (f RealtimeContainerOutputFlag) AfterApply() (err error) { | ||||
| 	container.Stdout = bool(f) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (cmd ContainerCmd) Containers() (diis []container.Image, err error) { | ||||
|   | ||||
| @@ -193,7 +193,8 @@ func (cmd *DebugCmd) Run(g *Globals) (err error) { | ||||
| 			return | ||||
| 		} | ||||
| 	} else { | ||||
| 		buildDir, outFile, output, err = artifact.Build(log.Logger, tmp, ka, ki, g.Config.Docker.Timeout.Duration) | ||||
| 		buildDir, outFile, output, err = artifact.Build( | ||||
| 			log.Logger, tmp, ka, ki, g.Config.Docker.Timeout.Duration, false) | ||||
| 		if err != nil { | ||||
| 			log.Error().Err(err).Msg(output) | ||||
| 			return | ||||
|   | ||||
| @@ -39,6 +39,8 @@ type KernelCmd struct { | ||||
|  | ||||
| 	ContainerTimeout time.Duration `help:"container timeout"` | ||||
|  | ||||
| 	RealtimeOutput RealtimeContainerOutputFlag `help:"show realtime output"` | ||||
|  | ||||
| 	List        KernelListCmd        `cmd:"" help:"list kernels"` | ||||
| 	ListRemote  KernelListRemoteCmd  `cmd:"" help:"list remote kernels"` | ||||
| 	Autogen     KernelAutogenCmd     `cmd:"" help:"generate kernels based on the current config"` | ||||
|   | ||||
| @@ -88,6 +88,7 @@ type PewCmd struct { | ||||
| 	IncludeInternalErrors bool    `help:"count internal errors as part of the success rate"` | ||||
|  | ||||
| 	OutputOnSuccess bool `help:"show output on success"` | ||||
| 	RealtimeOutput  bool `help:"show realtime output"` | ||||
|  | ||||
| 	Endless        bool          `help:"endless tests"` | ||||
| 	EndlessTimeout time.Duration `help:"timeout between tests" default:"1m"` | ||||
| @@ -447,7 +448,7 @@ func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup, | ||||
| 		Str("kernel", ki.KernelRelease). | ||||
| 		Logger() | ||||
|  | ||||
| 	ka.Process(slog, ki, cmd.OutputOnSuccess, | ||||
| 	ka.Process(slog, ki, cmd.OutputOnSuccess, cmd.RealtimeOutput, | ||||
| 		cmd.Endless, cmd.Binary, cmd.EndlessStress, cmd.EndlessTimeout, | ||||
| 		func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, result *artifact.Result) { | ||||
| 			dumpResult(q, ka, ki, result, cmd.Dist, cmd.Tag, cmd.Binary, cmd.DB) | ||||
|   | ||||
| @@ -14,6 +14,7 @@ import ( | ||||
| 	"path/filepath" | ||||
| 	"regexp" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/cavaliergopher/grab/v3" | ||||
| @@ -45,6 +46,8 @@ var UsePrebuilt = true | ||||
|  | ||||
| var Prune = true | ||||
|  | ||||
| var Stdout = false | ||||
|  | ||||
| type Image struct { | ||||
| 	Name   string | ||||
| 	Distro distro.Distro | ||||
| @@ -176,6 +179,11 @@ type Container struct { | ||||
| 	Args []string | ||||
|  | ||||
| 	Log zerolog.Logger | ||||
|  | ||||
| 	commandsOutput struct { | ||||
| 		listener chan string | ||||
| 		mu       sync.Mutex | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func New(dist distro.Distro) (c Container, err error) { | ||||
| @@ -234,6 +242,43 @@ func NewFromKernelInfo(ki distro.KernelInfo) ( | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // c.SetCommandsOutputHandler(func(s string) { fmt.Println(s) }) | ||||
| // defer c.CloseCommandsOutputHandler() | ||||
| func (c *Container) SetCommandsOutputHandler(handler func(s string)) { | ||||
| 	c.commandsOutput.mu.Lock() | ||||
| 	defer c.commandsOutput.mu.Unlock() | ||||
|  | ||||
| 	c.commandsOutput.listener = make(chan string) | ||||
|  | ||||
| 	go func(l chan string) { | ||||
| 		for m := range l { | ||||
| 			if m != "" { | ||||
| 				handler(m) | ||||
| 			} | ||||
| 		} | ||||
| 	}(c.commandsOutput.listener) | ||||
| } | ||||
|  | ||||
| func (c *Container) CloseCommandsOutputHandler() { | ||||
| 	c.commandsOutput.mu.Lock() | ||||
| 	defer c.commandsOutput.mu.Unlock() | ||||
|  | ||||
| 	close(c.commandsOutput.listener) | ||||
| 	c.commandsOutput.listener = nil | ||||
| } | ||||
|  | ||||
| func (c *Container) handleCommandsOutput(m string) { | ||||
| 	if c.commandsOutput.listener == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	c.commandsOutput.mu.Lock() | ||||
| 	defer c.commandsOutput.mu.Unlock() | ||||
|  | ||||
| 	if c.commandsOutput.listener != nil { | ||||
| 		c.commandsOutput.listener <- m | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (c Container) Name() string { | ||||
| 	return c.name | ||||
| } | ||||
| @@ -408,6 +453,10 @@ func (c Container) build(imagePath string) (output string, err error) { | ||||
| 		scanner := bufio.NewScanner(stdout) | ||||
| 		for scanner.Scan() { | ||||
| 			m := scanner.Text() | ||||
| 			if Stdout { | ||||
| 				fmt.Println(m) | ||||
| 			} | ||||
| 			c.handleCommandsOutput(m) | ||||
| 			output += m + "\n" | ||||
| 			flog.Trace().Str("stdout", m).Msg("") | ||||
| 		} | ||||
| @@ -481,6 +530,10 @@ func (c Container) Run(workdir string, cmds []string) (out string, err error) { | ||||
| 		scanner := bufio.NewScanner(stdout) | ||||
| 		for scanner.Scan() { | ||||
| 			m := scanner.Text() | ||||
| 			if Stdout { | ||||
| 				fmt.Println(m) | ||||
| 			} | ||||
| 			c.handleCommandsOutput(m) | ||||
| 			out += m + "\n" | ||||
| 			flog.Trace().Str("stdout", m).Msg("") | ||||
| 		} | ||||
|   | ||||
| @@ -131,7 +131,7 @@ func (pj *jobProcessor) Process(res *Resources) (err error) { | ||||
| 	var result *artifact.Result | ||||
| 	var dq *qemu.System | ||||
|  | ||||
| 	pj.job.Artifact.Process(pj.log, pj.job.Target, false, false, "", "", 0, | ||||
| 	pj.job.Artifact.Process(pj.log, pj.job.Target, false, false, false, "", "", 0, | ||||
| 		func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, | ||||
| 			res *artifact.Result) { | ||||
|  | ||||
|   | ||||
| @@ -101,7 +101,12 @@ type System struct { | ||||
|  | ||||
| 	Stdout, Stderr string | ||||
|  | ||||
| 	output struct { | ||||
| 	qemuOutput struct { | ||||
| 		listener chan string | ||||
| 		mu       sync.Mutex | ||||
| 	} | ||||
|  | ||||
| 	commandsOutput struct { | ||||
| 		listener chan string | ||||
| 		mu       sync.Mutex | ||||
| 	} | ||||
| @@ -143,38 +148,77 @@ func NewSystem(arch arch, kernel Kernel, drivePath string) (q *System, err error | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // q.SetOutputHandler(func(s string) { fmt.Println(s) }) | ||||
| // defer q.CloseOutputHandler() | ||||
| func (q *System) SetOutputHandler(handler func(s string)) { | ||||
| 	q.output.mu.Lock() | ||||
| 	defer q.output.mu.Unlock() | ||||
| // q.SetQemuOutputHandler(func(s string) { fmt.Println(s) }) | ||||
| // defer q.CloseQemuOutputHandler() | ||||
| func (q *System) SetQemuOutputHandler(handler func(s string)) { | ||||
| 	q.qemuOutput.mu.Lock() | ||||
| 	defer q.qemuOutput.mu.Unlock() | ||||
|  | ||||
| 	q.output.listener = make(chan string) | ||||
| 	q.qemuOutput.listener = make(chan string) | ||||
|  | ||||
| 	go func(l chan string) { | ||||
| 		for m := range l { | ||||
| 			handler(m) | ||||
| 			if m != "" { | ||||
| 				handler(m) | ||||
| 			} | ||||
| 		} | ||||
| 	}(q.output.listener) | ||||
| 	}(q.qemuOutput.listener) | ||||
| } | ||||
|  | ||||
| func (q *System) CloseOutputHandler() { | ||||
| 	q.output.mu.Lock() | ||||
| 	defer q.output.mu.Unlock() | ||||
| func (q *System) CloseQemuOutputHandler() { | ||||
| 	q.qemuOutput.mu.Lock() | ||||
| 	defer q.qemuOutput.mu.Unlock() | ||||
|  | ||||
| 	close(q.output.listener) | ||||
| 	q.output.listener = nil | ||||
| 	close(q.qemuOutput.listener) | ||||
| 	q.qemuOutput.listener = nil | ||||
| } | ||||
|  | ||||
| func (q *System) handleOutput(m string) { | ||||
| 	if q.output.listener == nil { | ||||
| func (q *System) handleQemuOutput(m string) { | ||||
| 	if q.qemuOutput.listener == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	q.output.mu.Lock() | ||||
| 	defer q.output.mu.Unlock() | ||||
| 	q.qemuOutput.mu.Lock() | ||||
| 	defer q.qemuOutput.mu.Unlock() | ||||
|  | ||||
| 	if q.output.listener != nil { | ||||
| 		q.output.listener <- m | ||||
| 	if q.qemuOutput.listener != nil { | ||||
| 		q.qemuOutput.listener <- m | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // q.SetCommandsOutputHandler(func(s string) { fmt.Println(s) }) | ||||
| // defer q.CloseCommandsOutputHandler() | ||||
| func (q *System) SetCommandsOutputHandler(handler func(s string)) { | ||||
| 	q.commandsOutput.mu.Lock() | ||||
| 	defer q.commandsOutput.mu.Unlock() | ||||
|  | ||||
| 	q.commandsOutput.listener = make(chan string) | ||||
|  | ||||
| 	go func(l chan string) { | ||||
| 		for m := range l { | ||||
| 			if m != "" { | ||||
| 				handler(m) | ||||
| 			} | ||||
| 		} | ||||
| 	}(q.commandsOutput.listener) | ||||
| } | ||||
|  | ||||
| func (q *System) CloseCommandsOutputHandler() { | ||||
| 	q.commandsOutput.mu.Lock() | ||||
| 	defer q.commandsOutput.mu.Unlock() | ||||
|  | ||||
| 	close(q.commandsOutput.listener) | ||||
| 	q.commandsOutput.listener = nil | ||||
| } | ||||
|  | ||||
| func (q *System) handleCommandsOutput(m string) { | ||||
| 	if q.commandsOutput.listener == nil { | ||||
| 		return | ||||
| 	} | ||||
| 	q.commandsOutput.mu.Lock() | ||||
| 	defer q.commandsOutput.mu.Unlock() | ||||
|  | ||||
| 	if q.commandsOutput.listener != nil { | ||||
| 		q.commandsOutput.listener <- m | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -353,7 +397,7 @@ func (q *System) Start() (err error) { | ||||
| 		scanner := bufio.NewScanner(q.pipe.stdout) | ||||
| 		for scanner.Scan() { | ||||
| 			m := scanner.Text() | ||||
| 			q.handleOutput(m) | ||||
| 			q.handleQemuOutput(m) | ||||
| 			q.Stdout += m + "\n" | ||||
| 			q.Log.Trace().Str("stdout", m).Msg("qemu") | ||||
| 			go q.checkOopsPanic(m) | ||||
| @@ -364,7 +408,7 @@ func (q *System) Start() (err error) { | ||||
| 		scanner := bufio.NewScanner(q.pipe.stderr) | ||||
| 		for scanner.Scan() { | ||||
| 			m := scanner.Text() | ||||
| 			q.handleOutput(m) | ||||
| 			q.handleQemuOutput(m) | ||||
| 			q.Stderr += m + "\n" | ||||
| 			q.Log.Trace().Str("stderr", m).Msg("qemu") | ||||
| 		} | ||||
| @@ -517,6 +561,7 @@ func (q System) Command(user, cmd string) (output string, err error) { | ||||
| 		scanner := bufio.NewScanner(stdout) | ||||
| 		for scanner.Scan() { | ||||
| 			m := scanner.Text() | ||||
| 			q.handleCommandsOutput(m) | ||||
| 			output += m + "\n" | ||||
| 			flog.Trace().Str("stdout", m).Msg("qemu command") | ||||
| 		} | ||||
| @@ -530,6 +575,7 @@ func (q System) Command(user, cmd string) (output string, err error) { | ||||
| 		scanner := bufio.NewScanner(stderr) | ||||
| 		for scanner.Scan() { | ||||
| 			m := scanner.Text() | ||||
| 			q.handleCommandsOutput(m) | ||||
| 			output += m + "\n" | ||||
| 			// Note: it prints stderr as stdout | ||||
| 			flog.Trace().Str("stdout", m).Msg("qemu command") | ||||
|   | ||||
		Reference in New Issue
	
	Block a user