From a608d89cdc042530cc430eb8682972650ef561c4 Mon Sep 17 00:00:00 2001 From: Mikhail Klementev Date: Sun, 25 Feb 2024 18:04:02 +0000 Subject: [PATCH] feat(daemon): parallel execution --- cmd/daemon.go | 17 ++++ daemon/commands.go | 2 +- daemon/daemon.go | 49 ++++++++-- daemon/process.go | 226 ++++++++++++++++++++++++-------------------- daemon/resources.go | 206 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 385 insertions(+), 115 deletions(-) create mode 100644 daemon/resources.go diff --git a/cmd/daemon.go b/cmd/daemon.go index 03a8049..f80db26 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -17,6 +17,11 @@ import ( type DaemonCmd struct { Addr string `default:":63527"` + Threads int `help:"number of threads to use"` + + OvercommitMemory float64 `help:"overcommit memory factor"` + OvercommitCPU float64 `help:"overcommit CPU factor"` + Serve DaemonServeCmd `cmd:"" help:"start daemon"` Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"` @@ -32,6 +37,18 @@ func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) { } defer d.Kill() + if dm.Threads > 0 { + d.Threads = dm.Threads + } + + if dm.OvercommitMemory > 0 { + d.Resources.CPU.SetOvercommit(dm.OvercommitMemory) + } + + if dm.OvercommitCPU > 0 { + d.Resources.CPU.SetOvercommit(dm.OvercommitCPU) + } + go d.Daemon() d.Listen(dm.Addr) return diff --git a/daemon/commands.go b/daemon/commands.go index 348efa6..1c353a5 100644 --- a/daemon/commands.go +++ b/daemon/commands.go @@ -29,7 +29,7 @@ type cmdenv struct { DB *sql.DB - WG sync.WaitGroup + WG *sync.WaitGroup KernelConfig string } diff --git a/daemon/daemon.go b/daemon/daemon.go index a24c3ad..a0811b9 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -6,9 +6,11 @@ import ( "io" "net" "os/exec" + "runtime" "sync" "time" + "github.com/remeh/sizedwaitgroup" "github.com/rs/zerolog/log" "code.dumpstack.io/tools/out-of-tree/api" @@ -18,6 +20,9 @@ import ( ) type Daemon struct { + Threads int + Resources *Resources + db *sql.DB kernelConfig string @@ -27,7 +32,11 @@ type Daemon struct { func Init(kernelConfig string) (d *Daemon, err error) { d = &Daemon{} + d.Threads = runtime.NumCPU() + d.Resources = NewResources() + d.kernelConfig = kernelConfig + d.wg.Add(1) // matches with db.Close() d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db")) if err != nil { @@ -50,28 +59,49 @@ func (d *Daemon) Daemon() { log.Fatal().Msg("db is not initialized") } - log.Info().Msg("start daemon loop") + swg := sizedwaitgroup.New(d.Threads) + log.Info().Int("threads", d.Threads).Msg("start") for !d.shutdown { d.wg.Add(1) jobs, err := db.Jobs(d.db) - if err != nil { + if err != nil && !d.shutdown { log.Error().Err(err).Msg("") + d.wg.Done() time.Sleep(time.Minute) continue } for _, job := range jobs { - err = newPjob(job, d.db).Process() - if err != nil { - log.Error().Err(err).Msgf("%v", job) + if d.shutdown { + break } + + pj := newJobProcessor(job, d.db) + + if job.Status == api.StatusNew { + pj.SetStatus(api.StatusWaiting) + continue + } + + if job.Status != api.StatusWaiting { + continue + } + + swg.Add() + go func(pj jobProcessor) { + defer swg.Done() + pj.Process(d.Resources) + time.Sleep(time.Second) + }(pj) } d.wg.Done() time.Sleep(time.Second) } + + swg.Wait() } func handler(conn net.Conn, e cmdenv) { @@ -139,10 +169,10 @@ func (d *Daemon) Listen(addr string) { go io.Copy(logWriter{log: log.Logger}, stderr) - log.Info().Msgf("start %v", git) + log.Debug().Msgf("start %v", git) git.Start() defer func() { - log.Info().Msgf("stop %v", git) + log.Debug().Msgf("stop %v", git) }() err = git.Wait() @@ -186,7 +216,8 @@ func (d *Daemon) Listen(addr string) { log.Fatal().Err(err).Msg("listen") } - log.Info().Msgf("listen on %v", addr) + log.Info().Str("addr", ":9418").Msg("git") + log.Info().Str("addr", addr).Msg("daemon") for { conn, err := l.Accept() @@ -197,7 +228,7 @@ func (d *Daemon) Listen(addr string) { e := cmdenv{ DB: d.db, - WG: d.wg, + WG: &d.wg, Conn: conn, KernelConfig: d.kernelConfig, } diff --git a/daemon/process.go b/daemon/process.go index c9e30b5..d3d42dc 100644 --- a/daemon/process.go +++ b/daemon/process.go @@ -19,20 +19,20 @@ import ( "code.dumpstack.io/tools/out-of-tree/qemu" ) -type pjob struct { +type jobProcessor struct { job api.Job log zerolog.Logger db *sql.DB } -func newPjob(job api.Job, db *sql.DB) (pj pjob) { +func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) { pj.job = job pj.db = db pj.log = log.With().Str("uuid", job.UUID).Logger() return } -func (pj pjob) Update() (err error) { +func (pj jobProcessor) Update() (err error) { err = db.UpdateJob(pj.db, pj.job) if err != nil { pj.log.Error().Err(err).Msgf("update job %v", pj.job) @@ -40,115 +40,131 @@ func (pj pjob) Update() (err error) { return } -func (pj pjob) SetStatus(status api.Status) (err error) { +func (pj jobProcessor) SetStatus(status api.Status) (err error) { pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status) pj.job.Status = status err = pj.Update() return } -func (pj pjob) Process() (err error) { - switch pj.job.Status { - case api.StatusNew: - pj.log.Info().Msgf(`%v`, pj.job.Status) - pj.SetStatus(api.StatusWaiting) +func (pj *jobProcessor) Process(res *Resources) (err error) { + if pj.job.Status != api.StatusWaiting { + err = errors.New("job is not available to process") return - - case api.StatusWaiting: - pj.SetStatus(api.StatusRunning) - defer func() { - if err != nil { - pj.SetStatus(api.StatusFailure) - } else { - pj.SetStatus(api.StatusSuccess) - } - }() - - var tmp string - tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "") - if err != nil { - pj.log.Error().Err(err).Msg("mktemp") - return - } - defer os.RemoveAll(tmp) - - tmprepo := filepath.Join(tmp, "repo") - - pj.log.Debug().Msgf("temp repo: %v", tmprepo) - - remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName) - - pj.log.Debug().Msgf("remote: %v", remote) - - var raw []byte - - cmd := exec.Command("git", "clone", remote, tmprepo) - - raw, err = cmd.CombinedOutput() - pj.log.Trace().Msgf("%v\n%v", cmd, string(raw)) - if err != nil { - pj.log.Error().Msgf("%v\n%v", cmd, string(raw)) - return - } - - cmd = exec.Command("git", "checkout", pj.job.Commit) - - cmd.Dir = tmprepo - - raw, err = cmd.CombinedOutput() - pj.log.Trace().Msgf("%v\n%v", cmd, string(raw)) - if err != nil { - pj.log.Error().Msgf("%v\n%v", cmd, string(raw)) - return - } - - pj.job.Artifact.SourcePath = tmprepo - - var result *artifact.Result - var dq *qemu.System - - pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0, - func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, - res *artifact.Result) { - - result = res - dq = q - }, - ) - - logdir := dotfiles.Dir("daemon/logs", pj.job.UUID) - - err = os.WriteFile(filepath.Join(logdir, "build.log"), - []byte(result.Build.Output), 0644) - if err != nil { - pj.log.Error().Err(err).Msg("") - } - - err = os.WriteFile(filepath.Join(logdir, "run.log"), - []byte(result.Run.Output), 0644) - if err != nil { - pj.log.Error().Err(err).Msg("") - } - - err = os.WriteFile(filepath.Join(logdir, "test.log"), - []byte(result.Test.Output), 0644) - if err != nil { - pj.log.Error().Err(err).Msg("") - } - - err = os.WriteFile(filepath.Join(logdir, "qemu.log"), - []byte(dq.Stdout), 0644) - if err != nil { - pj.log.Error().Err(err).Msg("") - } - - pj.log.Info().Msgf("build %v, run %v, test %v", - result.Build.Ok, result.Run.Ok, result.Test.Ok) - - if !result.Test.Ok { - err = errors.New("tests failed") - } - } + + if pj.job.Artifact.Qemu.Cpus == 0 { + pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs + } + + if pj.job.Artifact.Qemu.Memory == 0 { + pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory + } + + err = res.Allocate(pj.job) + if err != nil { + return + } + + defer func() { + res.Release(pj.job) + }() + + log.Info().Msgf("process job %v", pj.job.UUID) + + pj.SetStatus(api.StatusRunning) + defer func() { + if err != nil { + pj.SetStatus(api.StatusFailure) + } else { + pj.SetStatus(api.StatusSuccess) + } + }() + + var tmp string + tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "") + if err != nil { + pj.log.Error().Err(err).Msg("mktemp") + return + } + defer os.RemoveAll(tmp) + + tmprepo := filepath.Join(tmp, "repo") + + pj.log.Debug().Msgf("temp repo: %v", tmprepo) + + remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName) + + pj.log.Debug().Msgf("remote: %v", remote) + + var raw []byte + + cmd := exec.Command("git", "clone", remote, tmprepo) + + raw, err = cmd.CombinedOutput() + pj.log.Trace().Msgf("%v\n%v", cmd, string(raw)) + if err != nil { + pj.log.Error().Msgf("%v\n%v", cmd, string(raw)) + return + } + + cmd = exec.Command("git", "checkout", pj.job.Commit) + + cmd.Dir = tmprepo + + raw, err = cmd.CombinedOutput() + pj.log.Trace().Msgf("%v\n%v", cmd, string(raw)) + if err != nil { + pj.log.Error().Msgf("%v\n%v", cmd, string(raw)) + return + } + + pj.job.Artifact.SourcePath = tmprepo + + var result *artifact.Result + var dq *qemu.System + + pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0, + func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo, + res *artifact.Result) { + + result = res + dq = q + }, + ) + + logdir := dotfiles.Dir("daemon/logs", pj.job.UUID) + + err = os.WriteFile(filepath.Join(logdir, "build.log"), + []byte(result.Build.Output), 0644) + if err != nil { + pj.log.Error().Err(err).Msg("") + } + + err = os.WriteFile(filepath.Join(logdir, "run.log"), + []byte(result.Run.Output), 0644) + if err != nil { + pj.log.Error().Err(err).Msg("") + } + + err = os.WriteFile(filepath.Join(logdir, "test.log"), + []byte(result.Test.Output), 0644) + if err != nil { + pj.log.Error().Err(err).Msg("") + } + + err = os.WriteFile(filepath.Join(logdir, "qemu.log"), + []byte(dq.Stdout), 0644) + if err != nil { + pj.log.Error().Err(err).Msg("") + } + + pj.log.Info().Msgf("build %v, run %v, test %v", + result.Build.Ok, result.Run.Ok, result.Test.Ok) + + if !result.Test.Ok { + err = errors.New("tests failed") + } + return } diff --git a/daemon/resources.go b/daemon/resources.go new file mode 100644 index 0000000..82635c9 --- /dev/null +++ b/daemon/resources.go @@ -0,0 +1,206 @@ +package daemon + +import ( + "errors" + "runtime" + "sync" + "syscall" + + "github.com/rs/zerolog/log" + + "code.dumpstack.io/tools/out-of-tree/api" +) + +type Resources struct { + initialized bool + + CPU *CPUResource + RAM *RAMResources +} + +func NewResources() (r *Resources) { + r = &Resources{} + r.CPU = NewCPUResources() + r.RAM = NewRAMResources() + r.initialized = true + return +} + +func (r *Resources) Allocate(job api.Job) (err error) { + if !r.initialized { + err = errors.New("resources not initialized") + return + } + + if job.Artifact.Qemu.Cpus == 0 { + err = errors.New("no cpus requested") + return + } + + if job.Artifact.Qemu.Memory == 0 { + err = errors.New("no memory requested") + return + } + + origRam := r.RAM.GetSpent() + origCPU := r.CPU.GetSpent() + + err = r.CPU.Allocate(job.Artifact.Qemu.Cpus) + if err != nil { + return + } + + err = r.RAM.Allocate(job.Artifact.Qemu.Memory) + if err != nil { + r.CPU.Release(job.Artifact.Qemu.Cpus) + return + } + + log.Debug().Msgf("allocated %d cpus, %d MB ram", + r.CPU.GetSpent()-origCPU, + r.RAM.GetSpent()-origRam) + + return +} + +func (r *Resources) Release(job api.Job) { + if !r.initialized { + log.Error().Msg("resources not initialized") + return + } + + r.CPU.Release(job.Artifact.Qemu.Cpus) + r.RAM.Release(job.Artifact.Qemu.Memory) + + log.Debug().Msgf("released %d cpus, %d MB ram", + job.Artifact.Qemu.Cpus, + job.Artifact.Qemu.Memory) +} + +type CPUResource struct { + num int + overcommit float64 + + mu *sync.Mutex + spent int +} + +const ( + Allocation = iota + Release +) + +func NewCPUResources() (cpur *CPUResource) { + cpur = &CPUResource{} + cpur.mu = &sync.Mutex{} + cpur.num = runtime.NumCPU() + cpur.overcommit = 1 + log.Debug().Msgf("total cpus: %d", cpur.num) + return +} + +func (cpur *CPUResource) SetOvercommit(oc float64) { + log.Info().Int("cpus", cpur.num). + Int("result", int(float64(cpur.num)*oc)). + Msgf("%.02f", oc) + cpur.overcommit = oc +} + +func (cpur *CPUResource) GetSpent() int { + cpur.mu.Lock() + defer cpur.mu.Unlock() + return cpur.spent +} + +var ErrNotEnoughCpu = errors.New("not enough cpu") + +func (cpur *CPUResource) Allocate(cpu int) (err error) { + cpur.mu.Lock() + defer cpur.mu.Unlock() + + if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) { + err = ErrNotEnoughCpu + return + } + + cpur.spent += cpu + return +} + +func (cpur *CPUResource) Release(cpu int) (err error) { + cpur.mu.Lock() + defer cpur.mu.Unlock() + + if cpur.spent < cpu { + err = ErrFreeingMoreThanAllocated + return + } + + cpur.spent -= cpu + return +} + +type RAMResources struct { + mb int + overcommit float64 + + mu *sync.Mutex + spent int +} + +func NewRAMResources() (ramr *RAMResources) { + ramr = &RAMResources{} + ramr.mu = &sync.Mutex{} + ramr.overcommit = 1 + + var info syscall.Sysinfo_t + syscall.Sysinfo(&info) + ramr.mb = int(info.Totalram / 1024 / 1024) + log.Debug().Msgf("total ram: %d MB", ramr.mb) + return +} + +func (ramr *RAMResources) SetOvercommit(oc float64) { + log.Info().Int("ram", ramr.mb). + Int("result", int(float64(ramr.mb)*oc)). + Msgf("%.02f", oc) + ramr.overcommit = oc +} + +func (ramr RAMResources) GetSpent() int { + ramr.mu.Lock() + defer ramr.mu.Unlock() + return ramr.spent +} + +var ErrNotEnoughRam = errors.New("not enough ram") + +func (ramr *RAMResources) Allocate(mb int) (err error) { + ramr.mu.Lock() + defer ramr.mu.Unlock() + + ocmem := int(float64(ramr.mb) * ramr.overcommit) + + if mb > ocmem-ramr.spent { + err = ErrNotEnoughRam + return + } + + ramr.spent += mb + return +} + +var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated") + +func (ramr *RAMResources) Release(mb int) (err error) { + ramr.mu.Lock() + defer ramr.mu.Unlock() + + if ramr.spent < mb { + err = ErrFreeingMoreThanAllocated + return + } + + ramr.spent -= mb + return +}