1
0

Compare commits

..

No commits in common. "a608d89cdc042530cc430eb8682972650ef561c4" and "bb8344958eb19526cffe1ba46bda1bd7929be078" have entirely different histories.

7 changed files with 118 additions and 399 deletions

View File

@ -17,11 +17,6 @@ import (
type DaemonCmd struct { type DaemonCmd struct {
Addr string `default:":63527"` Addr string `default:":63527"`
Threads int `help:"number of threads to use"`
OvercommitMemory float64 `help:"overcommit memory factor"`
OvercommitCPU float64 `help:"overcommit CPU factor"`
Serve DaemonServeCmd `cmd:"" help:"start daemon"` Serve DaemonServeCmd `cmd:"" help:"start daemon"`
Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"` Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"`
@ -37,18 +32,6 @@ func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
} }
defer d.Kill() defer d.Kill()
if dm.Threads > 0 {
d.Threads = dm.Threads
}
if dm.OvercommitMemory > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitMemory)
}
if dm.OvercommitCPU > 0 {
d.Resources.CPU.SetOvercommit(dm.OvercommitCPU)
}
go d.Daemon() go d.Daemon()
d.Listen(dm.Addr) d.Listen(dm.Addr)
return return

View File

@ -91,8 +91,6 @@ type PewCmd struct {
Kcfg config.KernelConfig `kong:"-" json:"-"` Kcfg config.KernelConfig `kong:"-" json:"-"`
TimeoutDeadline time.Time `kong:"-" json:"-"` TimeoutDeadline time.Time `kong:"-" json:"-"`
Watch bool `help:"watch job status"`
repoName string repoName string
commit string commit string
@ -318,8 +316,6 @@ func (cmd PewCmd) watchJob(swg *sizedwaitgroup.SizedWaitGroup,
func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup, func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
ka artifact.Artifact, ki distro.KernelInfo) { ka artifact.Artifact, ki distro.KernelInfo) {
defer swg.Done()
slog := log.With(). slog := log.With().
Str("distro_type", ki.Distro.ID.String()). Str("distro_type", ki.Distro.ID.String()).
Str("distro_release", ki.Distro.Release). Str("distro_release", ki.Distro.Release).
@ -341,15 +337,14 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
slog = slog.With().Str("uuid", uuid).Logger() slog = slog.With().Str("uuid", uuid).Logger()
if err != nil { if err != nil {
slog.Error().Err(err).Msg("cannot add job") slog.Error().Err(err).Msg("cannot add job")
swg.Done() // FIXME
return return
} }
slog.Info().Msg("add") slog.Info().Msg("add")
if cmd.Watch { // FIXME dummy (almost)
// FIXME dummy (almost) go cmd.watchJob(swg, slog, uuid)
go cmd.watchJob(swg, slog, uuid)
}
} }
func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup, func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup,

View File

@ -29,7 +29,7 @@ type cmdenv struct {
DB *sql.DB DB *sql.DB
WG *sync.WaitGroup WG sync.WaitGroup
KernelConfig string KernelConfig string
} }

View File

@ -6,11 +6,9 @@ import (
"io" "io"
"net" "net"
"os/exec" "os/exec"
"runtime"
"sync" "sync"
"time" "time"
"github.com/remeh/sizedwaitgroup"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api" "code.dumpstack.io/tools/out-of-tree/api"
@ -20,9 +18,6 @@ import (
) )
type Daemon struct { type Daemon struct {
Threads int
Resources *Resources
db *sql.DB db *sql.DB
kernelConfig string kernelConfig string
@ -32,11 +27,7 @@ type Daemon struct {
func Init(kernelConfig string) (d *Daemon, err error) { func Init(kernelConfig string) (d *Daemon, err error) {
d = &Daemon{} d = &Daemon{}
d.Threads = runtime.NumCPU()
d.Resources = NewResources()
d.kernelConfig = kernelConfig d.kernelConfig = kernelConfig
d.wg.Add(1) // matches with db.Close() d.wg.Add(1) // matches with db.Close()
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db")) d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
if err != nil { if err != nil {
@ -59,49 +50,28 @@ func (d *Daemon) Daemon() {
log.Fatal().Msg("db is not initialized") log.Fatal().Msg("db is not initialized")
} }
swg := sizedwaitgroup.New(d.Threads) log.Info().Msg("start daemon loop")
log.Info().Int("threads", d.Threads).Msg("start")
for !d.shutdown { for !d.shutdown {
d.wg.Add(1) d.wg.Add(1)
jobs, err := db.Jobs(d.db) jobs, err := db.Jobs(d.db)
if err != nil && !d.shutdown { if err != nil {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")
d.wg.Done()
time.Sleep(time.Minute) time.Sleep(time.Minute)
continue continue
} }
for _, job := range jobs { for _, job := range jobs {
if d.shutdown { err = newPjob(job, d.db).Process()
break if err != nil {
log.Error().Err(err).Msgf("%v", job)
} }
pj := newJobProcessor(job, d.db)
if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status != api.StatusWaiting {
continue
}
swg.Add()
go func(pj jobProcessor) {
defer swg.Done()
pj.Process(d.Resources)
time.Sleep(time.Second)
}(pj)
} }
d.wg.Done() d.wg.Done()
time.Sleep(time.Second) time.Sleep(time.Second)
} }
swg.Wait()
} }
func handler(conn net.Conn, e cmdenv) { func handler(conn net.Conn, e cmdenv) {
@ -169,10 +139,10 @@ func (d *Daemon) Listen(addr string) {
go io.Copy(logWriter{log: log.Logger}, stderr) go io.Copy(logWriter{log: log.Logger}, stderr)
log.Debug().Msgf("start %v", git) log.Info().Msgf("start %v", git)
git.Start() git.Start()
defer func() { defer func() {
log.Debug().Msgf("stop %v", git) log.Info().Msgf("stop %v", git)
}() }()
err = git.Wait() err = git.Wait()
@ -216,8 +186,7 @@ func (d *Daemon) Listen(addr string) {
log.Fatal().Err(err).Msg("listen") log.Fatal().Err(err).Msg("listen")
} }
log.Info().Str("addr", ":9418").Msg("git") log.Info().Msgf("listen on %v", addr)
log.Info().Str("addr", addr).Msg("daemon")
for { for {
conn, err := l.Accept() conn, err := l.Accept()
@ -228,7 +197,7 @@ func (d *Daemon) Listen(addr string) {
e := cmdenv{ e := cmdenv{
DB: d.db, DB: d.db,
WG: &d.wg, WG: d.wg,
Conn: conn, Conn: conn,
KernelConfig: d.kernelConfig, KernelConfig: d.kernelConfig,
} }

View File

@ -19,20 +19,20 @@ import (
"code.dumpstack.io/tools/out-of-tree/qemu" "code.dumpstack.io/tools/out-of-tree/qemu"
) )
type jobProcessor struct { type pjob struct {
job api.Job job api.Job
log zerolog.Logger log zerolog.Logger
db *sql.DB db *sql.DB
} }
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) { func newPjob(job api.Job, db *sql.DB) (pj pjob) {
pj.job = job pj.job = job
pj.db = db pj.db = db
pj.log = log.With().Str("uuid", job.UUID).Logger() pj.log = log.With().Str("uuid", job.UUID).Logger()
return return
} }
func (pj jobProcessor) Update() (err error) { func (pj pjob) Update() (err error) {
err = db.UpdateJob(pj.db, pj.job) err = db.UpdateJob(pj.db, pj.job)
if err != nil { if err != nil {
pj.log.Error().Err(err).Msgf("update job %v", pj.job) pj.log.Error().Err(err).Msgf("update job %v", pj.job)
@ -40,131 +40,115 @@ func (pj jobProcessor) Update() (err error) {
return return
} }
func (pj jobProcessor) SetStatus(status api.Status) (err error) { func (pj pjob) SetStatus(status api.Status) (err error) {
pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status) pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
pj.job.Status = status pj.job.Status = status
err = pj.Update() err = pj.Update()
return return
} }
func (pj *jobProcessor) Process(res *Resources) (err error) { func (pj pjob) Process() (err error) {
if pj.job.Status != api.StatusWaiting { switch pj.job.Status {
err = errors.New("job is not available to process") case api.StatusNew:
pj.log.Info().Msgf(`%v`, pj.job.Status)
pj.SetStatus(api.StatusWaiting)
return return
}
if pj.job.Artifact.Qemu.Cpus == 0 { case api.StatusWaiting:
pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs pj.SetStatus(api.StatusRunning)
} defer func() {
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {
pj.SetStatus(api.StatusSuccess)
}
}()
if pj.job.Artifact.Qemu.Memory == 0 { var tmp string
pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
}
err = res.Allocate(pj.job)
if err != nil {
return
}
defer func() {
res.Release(pj.job)
}()
log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning)
defer func() {
if err != nil { if err != nil {
pj.SetStatus(api.StatusFailure) pj.log.Error().Err(err).Msg("mktemp")
} else { return
pj.SetStatus(api.StatusSuccess) }
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
} }
}()
var tmp string
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
if err != nil {
pj.log.Error().Err(err).Msg("mktemp")
return
} }
defer os.RemoveAll(tmp)
tmprepo := filepath.Join(tmp, "repo")
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
pj.log.Debug().Msgf("remote: %v", remote)
var raw []byte
cmd := exec.Command("git", "clone", remote, tmprepo)
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
cmd = exec.Command("git", "checkout", pj.job.Commit)
cmd.Dir = tmprepo
raw, err = cmd.CombinedOutput()
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
if err != nil {
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
return
}
pj.job.Artifact.SourcePath = tmprepo
var result *artifact.Result
var dq *qemu.System
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
res *artifact.Result) {
result = res
dq = q
},
)
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
err = os.WriteFile(filepath.Join(logdir, "build.log"),
[]byte(result.Build.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "run.log"),
[]byte(result.Run.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "test.log"),
[]byte(result.Test.Output), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
[]byte(dq.Stdout), 0644)
if err != nil {
pj.log.Error().Err(err).Msg("")
}
pj.log.Info().Msgf("build %v, run %v, test %v",
result.Build.Ok, result.Run.Ok, result.Test.Ok)
if !result.Test.Ok {
err = errors.New("tests failed")
}
return return
} }

View File

@ -1,206 +0,0 @@
package daemon
import (
"errors"
"runtime"
"sync"
"syscall"
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
)
type Resources struct {
initialized bool
CPU *CPUResource
RAM *RAMResources
}
func NewResources() (r *Resources) {
r = &Resources{}
r.CPU = NewCPUResources()
r.RAM = NewRAMResources()
r.initialized = true
return
}
func (r *Resources) Allocate(job api.Job) (err error) {
if !r.initialized {
err = errors.New("resources not initialized")
return
}
if job.Artifact.Qemu.Cpus == 0 {
err = errors.New("no cpus requested")
return
}
if job.Artifact.Qemu.Memory == 0 {
err = errors.New("no memory requested")
return
}
origRam := r.RAM.GetSpent()
origCPU := r.CPU.GetSpent()
err = r.CPU.Allocate(job.Artifact.Qemu.Cpus)
if err != nil {
return
}
err = r.RAM.Allocate(job.Artifact.Qemu.Memory)
if err != nil {
r.CPU.Release(job.Artifact.Qemu.Cpus)
return
}
log.Debug().Msgf("allocated %d cpus, %d MB ram",
r.CPU.GetSpent()-origCPU,
r.RAM.GetSpent()-origRam)
return
}
func (r *Resources) Release(job api.Job) {
if !r.initialized {
log.Error().Msg("resources not initialized")
return
}
r.CPU.Release(job.Artifact.Qemu.Cpus)
r.RAM.Release(job.Artifact.Qemu.Memory)
log.Debug().Msgf("released %d cpus, %d MB ram",
job.Artifact.Qemu.Cpus,
job.Artifact.Qemu.Memory)
}
type CPUResource struct {
num int
overcommit float64
mu *sync.Mutex
spent int
}
const (
Allocation = iota
Release
)
func NewCPUResources() (cpur *CPUResource) {
cpur = &CPUResource{}
cpur.mu = &sync.Mutex{}
cpur.num = runtime.NumCPU()
cpur.overcommit = 1
log.Debug().Msgf("total cpus: %d", cpur.num)
return
}
func (cpur *CPUResource) SetOvercommit(oc float64) {
log.Info().Int("cpus", cpur.num).
Int("result", int(float64(cpur.num)*oc)).
Msgf("%.02f", oc)
cpur.overcommit = oc
}
func (cpur *CPUResource) GetSpent() int {
cpur.mu.Lock()
defer cpur.mu.Unlock()
return cpur.spent
}
var ErrNotEnoughCpu = errors.New("not enough cpu")
func (cpur *CPUResource) Allocate(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) {
err = ErrNotEnoughCpu
return
}
cpur.spent += cpu
return
}
func (cpur *CPUResource) Release(cpu int) (err error) {
cpur.mu.Lock()
defer cpur.mu.Unlock()
if cpur.spent < cpu {
err = ErrFreeingMoreThanAllocated
return
}
cpur.spent -= cpu
return
}
type RAMResources struct {
mb int
overcommit float64
mu *sync.Mutex
spent int
}
func NewRAMResources() (ramr *RAMResources) {
ramr = &RAMResources{}
ramr.mu = &sync.Mutex{}
ramr.overcommit = 1
var info syscall.Sysinfo_t
syscall.Sysinfo(&info)
ramr.mb = int(info.Totalram / 1024 / 1024)
log.Debug().Msgf("total ram: %d MB", ramr.mb)
return
}
func (ramr *RAMResources) SetOvercommit(oc float64) {
log.Info().Int("ram", ramr.mb).
Int("result", int(float64(ramr.mb)*oc)).
Msgf("%.02f", oc)
ramr.overcommit = oc
}
func (ramr RAMResources) GetSpent() int {
ramr.mu.Lock()
defer ramr.mu.Unlock()
return ramr.spent
}
var ErrNotEnoughRam = errors.New("not enough ram")
func (ramr *RAMResources) Allocate(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
ocmem := int(float64(ramr.mb) * ramr.overcommit)
if mb > ocmem-ramr.spent {
err = ErrNotEnoughRam
return
}
ramr.spent += mb
return
}
var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated")
func (ramr *RAMResources) Release(mb int) (err error) {
ramr.mu.Lock()
defer ramr.mu.Unlock()
if ramr.spent < mb {
err = ErrFreeingMoreThanAllocated
return
}
ramr.spent -= mb
return
}

View File

@ -37,13 +37,6 @@ const (
unsupported = "unsupported" // for test purposes unsupported = "unsupported" // for test purposes
) )
const (
DefaultCPUs = 1
DefaultMemory = 512 // megabytes
DefaultSSHRetries = 4
DefaultSSHRetryTimeout = time.Second / 4
)
// Kernel describe kernel parameters for qemu // Kernel describe kernel parameters for qemu
type Kernel struct { type Kernel struct {
Name string Name string
@ -130,10 +123,11 @@ func NewSystem(arch arch, kernel Kernel, drivePath string) (q *System, err error
} }
q.drivePath = drivePath q.drivePath = drivePath
q.Cpus = DefaultCPUs // Default values
q.Memory = DefaultMemory q.Cpus = 1
q.SSH.Retries = DefaultSSHRetries q.Memory = 512 // megabytes
q.SSH.RetryTimeout = DefaultSSHRetryTimeout q.SSH.Retries = 4
q.SSH.RetryTimeout = time.Second / 4
return return
} }