1
0

feat(daemon): parallel execution

This commit is contained in:
dump_stack() 2024-02-25 18:04:02 +00:00
parent 6a9bfb503f
commit 2b4db95166
Signed by: dump_stack
GPG Key ID: C9905BA72B5E02BB
7 changed files with 424 additions and 133 deletions

View File

@ -11,32 +11,15 @@ import (
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/client"
"code.dumpstack.io/tools/out-of-tree/daemon"
)
type DaemonCmd struct {
type daemonCmd struct {
Addr string `default:":63527"`
Serve DaemonServeCmd `cmd:"" help:"start daemon"`
Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"`
Repo DaemonRepoCmd `cmd:"" aliases:"repos" help:"manage repositories"`
}
type DaemonServeCmd struct{}
func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
d, err := daemon.Init(g.Config.Kernels)
if err != nil {
log.Fatal().Err(err).Msg("")
}
defer d.Kill()
go d.Daemon()
d.Listen(dm.Addr)
return
}
type DaemonJobCmd struct {
List DaemonJobsListCmd `cmd:"" help:"list jobs"`
Status DaemonJobsStatusCmd `cmd:"" help:"show job status"`

47
cmd/daemon_linux.go Normal file
View File

@ -0,0 +1,47 @@
//go:build linux
// +build linux
package cmd
import (
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/daemon"
)
type DaemonCmd struct {
daemonCmd
Threads int `help:"number of threads to use"`
OvercommitMemory float64 `help:"overcommit memory factor"`
OvercommitCPU float64 `help:"overcommit CPU factor"`
Serve DaemonServeCmd `cmd:"" help:"start daemon"`
}
type DaemonServeCmd struct{}
func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
d, err := daemon.Init(g.Config.Kernels)
if err != nil {
log.Fatal().Err(err).Msg("")
}
defer d.Kill()
if dm.Threads > 0 {
d.Threads = dm.Threads
}
if dm.OvercommitMemory > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitMemory)
}
if dm.OvercommitCPU > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitCPU)
}
go d.Daemon()
d.Listen(dm.Addr)
return
}

8
cmd/daemon_macos.go Normal file
View File

@ -0,0 +1,8 @@
//go:build darwin
// +build darwin
package cmd
type DaemonCmd struct {
daemonCmd
}

View File

@ -29,7 +29,7 @@ type cmdenv struct {
DB *sql.DB
WG sync.WaitGroup
WG *sync.WaitGroup
KernelConfig string
}

View File

@ -6,9 +6,11 @@ import (
"io"
"net"
"os/exec"
"runtime"
"sync"
"time"
"github.com/remeh/sizedwaitgroup"
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
@ -18,6 +20,9 @@ import (
)
type Daemon struct {
Threads int
Resources *Resources
db *sql.DB
kernelConfig string
@ -27,7 +32,11 @@ type Daemon struct {
func Init(kernelConfig string) (d *Daemon, err error) {
d = &Daemon{}
d.Threads = runtime.NumCPU()
d.Resources = NewResources()
d.kernelConfig = kernelConfig
d.wg.Add(1) // matches with db.Close()
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
if err != nil {
@ -50,28 +59,49 @@ func (d *Daemon) Daemon() {
log.Fatal().Msg("db is not initialized")
}
log.Info().Msg("start daemon loop")
swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start")
for !d.shutdown {
d.wg.Add(1)
jobs, err := db.Jobs(d.db)
if err != nil {
if err != nil && !d.shutdown {
log.Error().Err(err).Msg("")
d.wg.Done()
time.Sleep(time.Minute)
continue
}
for _, job := range jobs {
err = newPjob(job, d.db).Process()
if err != nil {
log.Error().Err(err).Msgf("%v", job)
if d.shutdown {
break
}
pj := newJobProcessor(job, d.db)
if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status != api.StatusWaiting {
continue
}
swg.Add()
go func(pj jobProcessor) {
defer swg.Done()
pj.Process(d.Resources)
time.Sleep(time.Second)
}(pj)
}
d.wg.Done()
time.Sleep(time.Second)
}
swg.Wait()
}
func handler(conn net.Conn, e cmdenv) {
@ -139,10 +169,10 @@ func (d *Daemon) Listen(addr string) {
go io.Copy(logWriter{log: log.Logger}, stderr)
log.Info().Msgf("start %v", git)
log.Debug().Msgf("start %v", git)
git.Start()
defer func() {
log.Info().Msgf("stop %v", git)
log.Debug().Msgf("stop %v", git)
}()
err = git.Wait()
@ -186,7 +216,8 @@ func (d *Daemon) Listen(addr string) {
log.Fatal().Err(err).Msg("listen")
}
log.Info().Msgf("listen on %v", addr)
log.Info().Str("addr", ":9418").Msg("git")
log.Info().Str("addr", addr).Msg("daemon")
for {
conn, err := l.Accept()
@ -197,7 +228,7 @@ func (d *Daemon) Listen(addr string) {
e := cmdenv{
DB: d.db,
WG: d.wg,
WG: &d.wg,
Conn: conn,
KernelConfig: d.kernelConfig,
}

View File

@ -19,20 +19,20 @@ import (
"code.dumpstack.io/tools/out-of-tree/qemu"
)
type pjob struct {
type jobProcessor struct {
job api.Job
log zerolog.Logger
db *sql.DB
}
func newPjob(job api.Job, db *sql.DB) (pj pjob) {
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
pj.job = job
pj.db = db
pj.log = log.With().Str("uuid", job.UUID).Logger()
return
}
func (pj pjob) Update() (err error) {
func (pj jobProcessor) Update() (err error) {
err = db.UpdateJob(pj.db, pj.job)
if err != nil {
pj.log.Error().Err(err).Msgf("update job %v", pj.job)
@ -40,115 +40,131 @@ func (pj pjob) Update() (err error) {
return
}
func (pj pjob) SetStatus(status api.Status) (err error) {
func (pj jobProcessor) SetStatus(status api.Status) (err error) {
pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
pj.job.Status = status
err = pj.Update()
return
}
func (pj pjob) Process() (err error) {
switch pj.job.Status {
case api.StatusNew:
pj.log.Info().Msgf(`%v`, pj.job.Status)
pj.SetStatus(api.StatusWaiting)
func (pj *jobProcessor) Process(res *Resources) (err error) {
if pj.job.Status != api.StatusWaiting {
err = errors.New("job is not available to process")
return
case api.StatusWaiting:
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
}
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
}
if pj.job.Artifact.Qemu.Cpus == 0 {
pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs
}
if pj.job.Artifact.Qemu.Memory == 0 {
pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory
}
err = res.Allocate(pj.job)
if err != nil {
return
}
defer func() {
res.Release(pj.job)
}()
log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
}
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
return
}

206
daemon/resources.go Normal file
View File

@ -0,0 +1,206 @@
package daemon
import (
"errors"
"runtime"
"sync"
"syscall"
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
)
type Resources struct {
initialized bool
CPU *CPUResource
RAM *RAMResources
}
func NewResources() (r *Resources) {
r = &Resources{}
r.CPU = NewCPUResources()
r.RAM = NewRAMResources()
r.initialized = true
return
}
func (r *Resources) Allocate(job api.Job) (err error) {
if !r.initialized {
err = errors.New("resources not initialized")
return
}
if job.Artifact.Qemu.Cpus == 0 {
err = errors.New("no cpus requested")
return
}
if job.Artifact.Qemu.Memory == 0 {
err = errors.New("no memory requested")
return
}
origRam := r.RAM.GetSpent()
origCPU := r.CPU.GetSpent()
err = r.CPU.Allocate(job.Artifact.Qemu.Cpus)
if err != nil {
return
}
err = r.RAM.Allocate(job.Artifact.Qemu.Memory)
if err != nil {
r.CPU.Release(job.Artifact.Qemu.Cpus)
return
}
log.Debug().Msgf("allocated %d cpus, %d MB ram",
r.CPU.GetSpent()-origCPU,
r.RAM.GetSpent()-origRam)
return
}
func (r *Resources) Release(job api.Job) {
if !r.initialized {
log.Error().Msg("resources not initialized")
return
}
r.CPU.Release(job.Artifact.Qemu.Cpus)
r.RAM.Release(job.Artifact.Qemu.Memory)
log.Debug().Msgf("released %d cpus, %d MB ram",
job.Artifact.Qemu.Cpus,
job.Artifact.Qemu.Memory)
}
type CPUResource struct {
num int
overcommit float64
mu *sync.Mutex
spent int
}
const (
Allocation = iota
Release
)
func NewCPUResources() (cpur *CPUResource) {
cpur = &CPUResource{}
cpur.mu = &sync.Mutex{}
cpur.num = runtime.NumCPU()
cpur.overcommit = 1
log.Debug().Msgf("total cpus: %d", cpur.num)
return
}
func (cpur *CPUResource) SetOvercommit(oc float64) {
log.Info().Int("cpus", cpur.num).
Int("result", int(float64(cpur.num)*oc)).
Msgf("%.02f", oc)
cpur.overcommit = oc
}
func (cpur *CPUResource) GetSpent() int {
cpur.mu.Lock()
defer cpur.mu.Unlock()
return cpur.spent
}
var ErrNotEnoughCpu = errors.New("not enough cpu")
func (cpur *CPUResource) Allocate(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) {
err = ErrNotEnoughCpu
return
}
cpur.spent += cpu
return
}
func (cpur *CPUResource) Release(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent < cpu {
err = ErrFreeingMoreThanAllocated
return
}
cpur.spent -= cpu
return
}
type RAMResources struct {
mb int
overcommit float64
mu *sync.Mutex
spent int
}
func NewRAMResources() (ramr *RAMResources) {
ramr = &RAMResources{}
ramr.mu = &sync.Mutex{}
ramr.overcommit = 1
var info syscall.Sysinfo_t
syscall.Sysinfo(&info)
ramr.mb = int(info.Totalram / 1024 / 1024)
log.Debug().Msgf("total ram: %d MB", ramr.mb)
return
}
func (ramr *RAMResources) SetOvercommit(oc float64) {
log.Info().Int("ram", ramr.mb).
Int("result", int(float64(ramr.mb)*oc)).
Msgf("%.02f", oc)
ramr.overcommit = oc
}
func (ramr RAMResources) GetSpent() int {
ramr.mu.Lock()
defer ramr.mu.Unlock()
return ramr.spent
}
var ErrNotEnoughRam = errors.New("not enough ram")
func (ramr *RAMResources) Allocate(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
ocmem := int(float64(ramr.mb) * ramr.overcommit)
if mb > ocmem-ramr.spent {
err = ErrNotEnoughRam
return
}
ramr.spent += mb
return
}
var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated")
func (ramr *RAMResources) Release(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
if ramr.spent < mb {
err = ErrFreeingMoreThanAllocated
return
}
ramr.spent -= mb
return
}