1
0

Compare commits

...

3 Commits

7 changed files with 404 additions and 123 deletions

View File

@ -17,6 +17,11 @@ import (
type DaemonCmd struct {
Addr string `default:":63527"`
Threads int `help:"number of threads to use"`
OvercommitMemory float64 `help:"overcommit memory factor"`
OvercommitCPU float64 `help:"overcommit CPU factor"`
Serve DaemonServeCmd `cmd:"" help:"start daemon"`
Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"`
@ -32,6 +37,18 @@ func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
}
defer d.Kill()
if dm.Threads > 0 {
d.Threads = dm.Threads
}
if dm.OvercommitMemory > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitMemory)
}
if dm.OvercommitCPU > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitCPU)
}
go d.Daemon()
d.Listen(dm.Addr)
return

View File

@ -91,6 +91,8 @@ type PewCmd struct {
Kcfg config.KernelConfig `kong:"-" json:"-"`
TimeoutDeadline time.Time `kong:"-" json:"-"`
Watch bool `help:"watch job status"`
repoName string
commit string
@ -316,6 +318,8 @@ func (cmd PewCmd) watchJob(swg *sizedwaitgroup.SizedWaitGroup,
func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
ka artifact.Artifact, ki distro.KernelInfo) {
defer swg.Done()
slog := log.With().
Str("distro_type", ki.Distro.ID.String()).
Str("distro_release", ki.Distro.Release).
@ -337,14 +341,15 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
slog = slog.With().Str("uuid", uuid).Logger()
if err != nil {
slog.Error().Err(err).Msg("cannot add job")
swg.Done() // FIXME
return
}
slog.Info().Msg("add")
// FIXME dummy (almost)
go cmd.watchJob(swg, slog, uuid)
if cmd.Watch {
// FIXME dummy (almost)
go cmd.watchJob(swg, slog, uuid)
}
}
func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup,

View File

@ -29,7 +29,7 @@ type cmdenv struct {
DB *sql.DB
WG sync.WaitGroup
WG *sync.WaitGroup
KernelConfig string
}

View File

@ -6,9 +6,11 @@ import (
"io"
"net"
"os/exec"
"runtime"
"sync"
"time"
"github.com/remeh/sizedwaitgroup"
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
@ -18,6 +20,9 @@ import (
)
type Daemon struct {
Threads int
Resources *Resources
db *sql.DB
kernelConfig string
@ -27,7 +32,11 @@ type Daemon struct {
func Init(kernelConfig string) (d *Daemon, err error) {
d = &Daemon{}
d.Threads = runtime.NumCPU()
d.Resources = NewResources()
d.kernelConfig = kernelConfig
d.wg.Add(1) // matches with db.Close()
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
if err != nil {
@ -50,28 +59,49 @@ func (d *Daemon) Daemon() {
log.Fatal().Msg("db is not initialized")
}
log.Info().Msg("start daemon loop")
swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start")
for !d.shutdown {
d.wg.Add(1)
jobs, err := db.Jobs(d.db)
if err != nil {
if err != nil && !d.shutdown {
log.Error().Err(err).Msg("")
d.wg.Done()
time.Sleep(time.Minute)
continue
}
for _, job := range jobs {
err = newPjob(job, d.db).Process()
if err != nil {
log.Error().Err(err).Msgf("%v", job)
if d.shutdown {
break
}
pj := newJobProcessor(job, d.db)
if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status != api.StatusWaiting {
continue
}
swg.Add()
go func(pj jobProcessor) {
defer swg.Done()
pj.Process(d.Resources)
time.Sleep(time.Second)
}(pj)
}
d.wg.Done()
time.Sleep(time.Second)
}
swg.Wait()
}
func handler(conn net.Conn, e cmdenv) {
@ -139,10 +169,10 @@ func (d *Daemon) Listen(addr string) {
go io.Copy(logWriter{log: log.Logger}, stderr)
log.Info().Msgf("start %v", git)
log.Debug().Msgf("start %v", git)
git.Start()
defer func() {
log.Info().Msgf("stop %v", git)
log.Debug().Msgf("stop %v", git)
}()
err = git.Wait()
@ -186,7 +216,8 @@ func (d *Daemon) Listen(addr string) {
log.Fatal().Err(err).Msg("listen")
}
log.Info().Msgf("listen on %v", addr)
log.Info().Str("addr", ":9418").Msg("git")
log.Info().Str("addr", addr).Msg("daemon")
for {
conn, err := l.Accept()
@ -197,7 +228,7 @@ func (d *Daemon) Listen(addr string) {
e := cmdenv{
DB: d.db,
WG: d.wg,
WG: &d.wg,
Conn: conn,
KernelConfig: d.kernelConfig,
}

View File

@ -19,20 +19,20 @@ import (
"code.dumpstack.io/tools/out-of-tree/qemu"
)
type pjob struct {
type jobProcessor struct {
job api.Job
log zerolog.Logger
db *sql.DB
}
func newPjob(job api.Job, db *sql.DB) (pj pjob) {
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
pj.job = job
pj.db = db
pj.log = log.With().Str("uuid", job.UUID).Logger()
return
}
func (pj pjob) Update() (err error) {
func (pj jobProcessor) Update() (err error) {
err = db.UpdateJob(pj.db, pj.job)
if err != nil {
pj.log.Error().Err(err).Msgf("update job %v", pj.job)
@ -40,115 +40,131 @@ func (pj pjob) Update() (err error) {
return
}
func (pj pjob) SetStatus(status api.Status) (err error) {
func (pj jobProcessor) SetStatus(status api.Status) (err error) {
pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
pj.job.Status = status
err = pj.Update()
return
}
func (pj pjob) Process() (err error) {
switch pj.job.Status {
case api.StatusNew:
pj.log.Info().Msgf(`%v`, pj.job.Status)
pj.SetStatus(api.StatusWaiting)
func (pj *jobProcessor) Process(res *Resources) (err error) {
if pj.job.Status != api.StatusWaiting {
err = errors.New("job is not available to process")
return
case api.StatusWaiting:
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
}
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
}
if pj.job.Artifact.Qemu.Cpus == 0 {
pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs
}
if pj.job.Artifact.Qemu.Memory == 0 {
pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory
}
err = res.Allocate(pj.job)
if err != nil {
return
}
defer func() {
res.Release(pj.job)
}()
log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
}
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
return
}

206
daemon/resources.go Normal file
View File

@ -0,0 +1,206 @@
package daemon
import (
"errors"
"runtime"
"sync"
"syscall"
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
)
type Resources struct {
initialized bool
CPU *CPUResource
RAM *RAMResources
}
func NewResources() (r *Resources) {
r = &Resources{}
r.CPU = NewCPUResources()
r.RAM = NewRAMResources()
r.initialized = true
return
}
func (r *Resources) Allocate(job api.Job) (err error) {
if !r.initialized {
err = errors.New("resources not initialized")
return
}
if job.Artifact.Qemu.Cpus == 0 {
err = errors.New("no cpus requested")
return
}
if job.Artifact.Qemu.Memory == 0 {
err = errors.New("no memory requested")
return
}
origRam := r.RAM.GetSpent()
origCPU := r.CPU.GetSpent()
err = r.CPU.Allocate(job.Artifact.Qemu.Cpus)
if err != nil {
return
}
err = r.RAM.Allocate(job.Artifact.Qemu.Memory)
if err != nil {
r.CPU.Release(job.Artifact.Qemu.Cpus)
return
}
log.Debug().Msgf("allocated %d cpus, %d MB ram",
r.CPU.GetSpent()-origCPU,
r.RAM.GetSpent()-origRam)
return
}
func (r *Resources) Release(job api.Job) {
if !r.initialized {
log.Error().Msg("resources not initialized")
return
}
r.CPU.Release(job.Artifact.Qemu.Cpus)
r.RAM.Release(job.Artifact.Qemu.Memory)
log.Debug().Msgf("released %d cpus, %d MB ram",
job.Artifact.Qemu.Cpus,
job.Artifact.Qemu.Memory)
}
type CPUResource struct {
num int
overcommit float64
mu *sync.Mutex
spent int
}
const (
Allocation = iota
Release
)
func NewCPUResources() (cpur *CPUResource) {
cpur = &CPUResource{}
cpur.mu = &sync.Mutex{}
cpur.num = runtime.NumCPU()
cpur.overcommit = 1
log.Debug().Msgf("total cpus: %d", cpur.num)
return
}
func (cpur *CPUResource) SetOvercommit(oc float64) {
log.Info().Int("cpus", cpur.num).
Int("result", int(float64(cpur.num)*oc)).
Msgf("%.02f", oc)
cpur.overcommit = oc
}
func (cpur *CPUResource) GetSpent() int {
cpur.mu.Lock()
defer cpur.mu.Unlock()
return cpur.spent
}
var ErrNotEnoughCpu = errors.New("not enough cpu")
func (cpur *CPUResource) Allocate(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) {
err = ErrNotEnoughCpu
return
}
cpur.spent += cpu
return
}
func (cpur *CPUResource) Release(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent < cpu {
err = ErrFreeingMoreThanAllocated
return
}
cpur.spent -= cpu
return
}
type RAMResources struct {
mb int
overcommit float64
mu *sync.Mutex
spent int
}
func NewRAMResources() (ramr *RAMResources) {
ramr = &RAMResources{}
ramr.mu = &sync.Mutex{}
ramr.overcommit = 1
var info syscall.Sysinfo_t
syscall.Sysinfo(&info)
ramr.mb = int(info.Totalram / 1024 / 1024)
log.Debug().Msgf("total ram: %d MB", ramr.mb)
return
}
func (ramr *RAMResources) SetOvercommit(oc float64) {
log.Info().Int("ram", ramr.mb).
Int("result", int(float64(ramr.mb)*oc)).
Msgf("%.02f", oc)
ramr.overcommit = oc
}
func (ramr RAMResources) GetSpent() int {
ramr.mu.Lock()
defer ramr.mu.Unlock()
return ramr.spent
}
var ErrNotEnoughRam = errors.New("not enough ram")
func (ramr *RAMResources) Allocate(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
ocmem := int(float64(ramr.mb) * ramr.overcommit)
if mb > ocmem-ramr.spent {
err = ErrNotEnoughRam
return
}
ramr.spent += mb
return
}
var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated")
func (ramr *RAMResources) Release(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
if ramr.spent < mb {
err = ErrFreeingMoreThanAllocated
return
}
ramr.spent -= mb
return
}

View File

@ -37,6 +37,13 @@ const (
unsupported = "unsupported" // for test purposes
)
const (
DefaultCPUs = 1
DefaultMemory = 512 // megabytes
DefaultSSHRetries = 4
DefaultSSHRetryTimeout = time.Second / 4
)
// Kernel describe kernel parameters for qemu
type Kernel struct {
Name string
@ -123,11 +130,10 @@ func NewSystem(arch arch, kernel Kernel, drivePath string) (q *System, err error
}
q.drivePath = drivePath
// Default values
q.Cpus = 1
q.Memory = 512 // megabytes
q.SSH.Retries = 4
q.SSH.RetryTimeout = time.Second / 4
q.Cpus = DefaultCPUs
q.Memory = DefaultMemory
q.SSH.Retries = DefaultSSHRetries
q.SSH.RetryTimeout = DefaultSSHRetryTimeout
return
}