1
0

feat(daemon): parallel execution

This commit is contained in:
dump_stack() 2024-02-25 18:04:02 +00:00
parent 6a9bfb503f
commit a608d89cdc
Signed by: dump_stack
GPG Key ID: C9905BA72B5E02BB
5 changed files with 385 additions and 115 deletions

View File

@ -17,6 +17,11 @@ import (
type DaemonCmd struct { type DaemonCmd struct {
Addr string `default:":63527"` Addr string `default:":63527"`
Threads int `help:"number of threads to use"`
OvercommitMemory float64 `help:"overcommit memory factor"`
OvercommitCPU float64 `help:"overcommit CPU factor"`
Serve DaemonServeCmd `cmd:"" help:"start daemon"` Serve DaemonServeCmd `cmd:"" help:"start daemon"`
Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"` Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"`
@ -32,6 +37,18 @@ func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
} }
defer d.Kill() defer d.Kill()
if dm.Threads > 0 {
d.Threads = dm.Threads
}
if dm.OvercommitMemory > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitMemory)
}
if dm.OvercommitCPU > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitCPU)
}
go d.Daemon() go d.Daemon()
d.Listen(dm.Addr) d.Listen(dm.Addr)
return return

View File

@ -29,7 +29,7 @@ type cmdenv struct {
DB *sql.DB DB *sql.DB
WG sync.WaitGroup WG *sync.WaitGroup
KernelConfig string KernelConfig string
} }

View File

@ -6,9 +6,11 @@ import (
"io" "io"
"net" "net"
"os/exec" "os/exec"
"runtime"
"sync" "sync"
"time" "time"
"github.com/remeh/sizedwaitgroup"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api" "code.dumpstack.io/tools/out-of-tree/api"
@ -18,6 +20,9 @@ import (
) )
type Daemon struct { type Daemon struct {
Threads int
Resources *Resources
db *sql.DB db *sql.DB
kernelConfig string kernelConfig string
@ -27,7 +32,11 @@ type Daemon struct {
func Init(kernelConfig string) (d *Daemon, err error) { func Init(kernelConfig string) (d *Daemon, err error) {
d = &Daemon{} d = &Daemon{}
d.Threads = runtime.NumCPU()
d.Resources = NewResources()
d.kernelConfig = kernelConfig d.kernelConfig = kernelConfig
d.wg.Add(1) // matches with db.Close() d.wg.Add(1) // matches with db.Close()
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db")) d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
if err != nil { if err != nil {
@ -50,28 +59,49 @@ func (d *Daemon) Daemon() {
log.Fatal().Msg("db is not initialized") log.Fatal().Msg("db is not initialized")
} }
log.Info().Msg("start daemon loop") swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start")
for !d.shutdown { for !d.shutdown {
d.wg.Add(1) d.wg.Add(1)
jobs, err := db.Jobs(d.db) jobs, err := db.Jobs(d.db)
if err != nil { if err != nil && !d.shutdown {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")
d.wg.Done()
time.Sleep(time.Minute) time.Sleep(time.Minute)
continue continue
} }
for _, job := range jobs { for _, job := range jobs {
err = newPjob(job, d.db).Process() if d.shutdown {
if err != nil { break
log.Error().Err(err).Msgf("%v", job)
} }
pj := newJobProcessor(job, d.db)
if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status != api.StatusWaiting {
continue
}
swg.Add()
go func(pj jobProcessor) {
defer swg.Done()
pj.Process(d.Resources)
time.Sleep(time.Second)
}(pj)
} }
d.wg.Done() d.wg.Done()
time.Sleep(time.Second) time.Sleep(time.Second)
} }
swg.Wait()
} }
func handler(conn net.Conn, e cmdenv) { func handler(conn net.Conn, e cmdenv) {
@ -139,10 +169,10 @@ func (d *Daemon) Listen(addr string) {
go io.Copy(logWriter{log: log.Logger}, stderr) go io.Copy(logWriter{log: log.Logger}, stderr)
log.Info().Msgf("start %v", git) log.Debug().Msgf("start %v", git)
git.Start() git.Start()
defer func() { defer func() {
log.Info().Msgf("stop %v", git) log.Debug().Msgf("stop %v", git)
}() }()
err = git.Wait() err = git.Wait()
@ -186,7 +216,8 @@ func (d *Daemon) Listen(addr string) {
log.Fatal().Err(err).Msg("listen") log.Fatal().Err(err).Msg("listen")
} }
log.Info().Msgf("listen on %v", addr) log.Info().Str("addr", ":9418").Msg("git")
log.Info().Str("addr", addr).Msg("daemon")
for { for {
conn, err := l.Accept() conn, err := l.Accept()
@ -197,7 +228,7 @@ func (d *Daemon) Listen(addr string) {
e := cmdenv{ e := cmdenv{
DB: d.db, DB: d.db,
WG: d.wg, WG: &d.wg,
Conn: conn, Conn: conn,
KernelConfig: d.kernelConfig, KernelConfig: d.kernelConfig,
} }

View File

@ -19,20 +19,20 @@ import (
"code.dumpstack.io/tools/out-of-tree/qemu" "code.dumpstack.io/tools/out-of-tree/qemu"
) )
type pjob struct { type jobProcessor struct {
job api.Job job api.Job
log zerolog.Logger log zerolog.Logger
db *sql.DB db *sql.DB
} }
func newPjob(job api.Job, db *sql.DB) (pj pjob) { func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
pj.job = job pj.job = job
pj.db = db pj.db = db
pj.log = log.With().Str("uuid", job.UUID).Logger() pj.log = log.With().Str("uuid", job.UUID).Logger()
return return
} }
func (pj pjob) Update() (err error) { func (pj jobProcessor) Update() (err error) {
err = db.UpdateJob(pj.db, pj.job) err = db.UpdateJob(pj.db, pj.job)
if err != nil { if err != nil {
pj.log.Error().Err(err).Msgf("update job %v", pj.job) pj.log.Error().Err(err).Msgf("update job %v", pj.job)
@ -40,115 +40,131 @@ func (pj pjob) Update() (err error) {
return return
} }
func (pj pjob) SetStatus(status api.Status) (err error) { func (pj jobProcessor) SetStatus(status api.Status) (err error) {
pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status) pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
pj.job.Status = status pj.job.Status = status
err = pj.Update() err = pj.Update()
return return
} }
func (pj pjob) Process() (err error) { func (pj *jobProcessor) Process(res *Resources) (err error) {
switch pj.job.Status { if pj.job.Status != api.StatusWaiting {
case api.StatusNew: err = errors.New("job is not available to process")
pj.log.Info().Msgf(`%v`, pj.job.Status)
pj.SetStatus(api.StatusWaiting)
return return
case api.StatusWaiting:
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
}
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
} }
if pj.job.Artifact.Qemu.Cpus == 0 {
pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs
}
if pj.job.Artifact.Qemu.Memory == 0 {
pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory
}
err = res.Allocate(pj.job)
if err != nil {
return
}
defer func() {
res.Release(pj.job)
}()
log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
}
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
return return
} }

206
daemon/resources.go Normal file
View File

@ -0,0 +1,206 @@
package daemon
import (
"errors"
"runtime"
"sync"
"syscall"
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
)
type Resources struct {
initialized bool
CPU *CPUResource
RAM *RAMResources
}
func NewResources() (r *Resources) {
r = &Resources{}
r.CPU = NewCPUResources()
r.RAM = NewRAMResources()
r.initialized = true
return
}
func (r *Resources) Allocate(job api.Job) (err error) {
if !r.initialized {
err = errors.New("resources not initialized")
return
}
if job.Artifact.Qemu.Cpus == 0 {
err = errors.New("no cpus requested")
return
}
if job.Artifact.Qemu.Memory == 0 {
err = errors.New("no memory requested")
return
}
origRam := r.RAM.GetSpent()
origCPU := r.CPU.GetSpent()
err = r.CPU.Allocate(job.Artifact.Qemu.Cpus)
if err != nil {
return
}
err = r.RAM.Allocate(job.Artifact.Qemu.Memory)
if err != nil {
r.CPU.Release(job.Artifact.Qemu.Cpus)
return
}
log.Debug().Msgf("allocated %d cpus, %d MB ram",
r.CPU.GetSpent()-origCPU,
r.RAM.GetSpent()-origRam)
return
}
func (r *Resources) Release(job api.Job) {
if !r.initialized {
log.Error().Msg("resources not initialized")
return
}
r.CPU.Release(job.Artifact.Qemu.Cpus)
r.RAM.Release(job.Artifact.Qemu.Memory)
log.Debug().Msgf("released %d cpus, %d MB ram",
job.Artifact.Qemu.Cpus,
job.Artifact.Qemu.Memory)
}
type CPUResource struct {
num int
overcommit float64
mu *sync.Mutex
spent int
}
const (
Allocation = iota
Release
)
func NewCPUResources() (cpur *CPUResource) {
cpur = &CPUResource{}
cpur.mu = &sync.Mutex{}
cpur.num = runtime.NumCPU()
cpur.overcommit = 1
log.Debug().Msgf("total cpus: %d", cpur.num)
return
}
func (cpur *CPUResource) SetOvercommit(oc float64) {
log.Info().Int("cpus", cpur.num).
Int("result", int(float64(cpur.num)*oc)).
Msgf("%.02f", oc)
cpur.overcommit = oc
}
func (cpur *CPUResource) GetSpent() int {
cpur.mu.Lock()
defer cpur.mu.Unlock()
return cpur.spent
}
var ErrNotEnoughCpu = errors.New("not enough cpu")
func (cpur *CPUResource) Allocate(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) {
err = ErrNotEnoughCpu
return
}
cpur.spent += cpu
return
}
func (cpur *CPUResource) Release(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent < cpu {
err = ErrFreeingMoreThanAllocated
return
}
cpur.spent -= cpu
return
}
type RAMResources struct {
mb int
overcommit float64
mu *sync.Mutex
spent int
}
func NewRAMResources() (ramr *RAMResources) {
ramr = &RAMResources{}
ramr.mu = &sync.Mutex{}
ramr.overcommit = 1
var info syscall.Sysinfo_t
syscall.Sysinfo(&info)
ramr.mb = int(info.Totalram / 1024 / 1024)
log.Debug().Msgf("total ram: %d MB", ramr.mb)
return
}
func (ramr *RAMResources) SetOvercommit(oc float64) {
log.Info().Int("ram", ramr.mb).
Int("result", int(float64(ramr.mb)*oc)).
Msgf("%.02f", oc)
ramr.overcommit = oc
}
func (ramr RAMResources) GetSpent() int {
ramr.mu.Lock()
defer ramr.mu.Unlock()
return ramr.spent
}
var ErrNotEnoughRam = errors.New("not enough ram")
func (ramr *RAMResources) Allocate(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
ocmem := int(float64(ramr.mb) * ramr.overcommit)
if mb > ocmem-ramr.spent {
err = ErrNotEnoughRam
return
}
ramr.spent += mb
return
}
var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated")
func (ramr *RAMResources) Release(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
if ramr.spent < mb {
err = ErrFreeingMoreThanAllocated
return
}
ramr.spent -= mb
return
}