Compare commits
No commits in common. "a608d89cdc042530cc430eb8682972650ef561c4" and "bb8344958eb19526cffe1ba46bda1bd7929be078" have entirely different histories.
a608d89cdc
...
bb8344958e
@ -17,11 +17,6 @@ import (
|
||||
type DaemonCmd struct {
|
||||
Addr string `default:":63527"`
|
||||
|
||||
Threads int `help:"number of threads to use"`
|
||||
|
||||
OvercommitMemory float64 `help:"overcommit memory factor"`
|
||||
OvercommitCPU float64 `help:"overcommit CPU factor"`
|
||||
|
||||
Serve DaemonServeCmd `cmd:"" help:"start daemon"`
|
||||
|
||||
Job DaemonJobCmd `cmd:"" aliases:"jobs" help:"manage jobs"`
|
||||
@ -37,18 +32,6 @@ func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
|
||||
}
|
||||
defer d.Kill()
|
||||
|
||||
if dm.Threads > 0 {
|
||||
d.Threads = dm.Threads
|
||||
}
|
||||
|
||||
if dm.OvercommitMemory > 0 {
|
||||
d.Resources.CPU.SetOvercommit(dm.OvercommitMemory)
|
||||
}
|
||||
|
||||
if dm.OvercommitCPU > 0 {
|
||||
d.Resources.CPU.SetOvercommit(dm.OvercommitCPU)
|
||||
}
|
||||
|
||||
go d.Daemon()
|
||||
d.Listen(dm.Addr)
|
||||
return
|
||||
|
@ -91,8 +91,6 @@ type PewCmd struct {
|
||||
Kcfg config.KernelConfig `kong:"-" json:"-"`
|
||||
TimeoutDeadline time.Time `kong:"-" json:"-"`
|
||||
|
||||
Watch bool `help:"watch job status"`
|
||||
|
||||
repoName string
|
||||
commit string
|
||||
|
||||
@ -318,8 +316,6 @@ func (cmd PewCmd) watchJob(swg *sizedwaitgroup.SizedWaitGroup,
|
||||
func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
|
||||
ka artifact.Artifact, ki distro.KernelInfo) {
|
||||
|
||||
defer swg.Done()
|
||||
|
||||
slog := log.With().
|
||||
Str("distro_type", ki.Distro.ID.String()).
|
||||
Str("distro_release", ki.Distro.Release).
|
||||
@ -341,15 +337,14 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
|
||||
slog = slog.With().Str("uuid", uuid).Logger()
|
||||
if err != nil {
|
||||
slog.Error().Err(err).Msg("cannot add job")
|
||||
swg.Done() // FIXME
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info().Msg("add")
|
||||
|
||||
if cmd.Watch {
|
||||
// FIXME dummy (almost)
|
||||
go cmd.watchJob(swg, slog, uuid)
|
||||
}
|
||||
}
|
||||
|
||||
func (cmd PewCmd) testArtifact(swg *sizedwaitgroup.SizedWaitGroup,
|
||||
|
@ -29,7 +29,7 @@ type cmdenv struct {
|
||||
|
||||
DB *sql.DB
|
||||
|
||||
WG *sync.WaitGroup
|
||||
WG sync.WaitGroup
|
||||
|
||||
KernelConfig string
|
||||
}
|
||||
|
@ -6,11 +6,9 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"os/exec"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/remeh/sizedwaitgroup"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
@ -20,9 +18,6 @@ import (
|
||||
)
|
||||
|
||||
type Daemon struct {
|
||||
Threads int
|
||||
Resources *Resources
|
||||
|
||||
db *sql.DB
|
||||
kernelConfig string
|
||||
|
||||
@ -32,11 +27,7 @@ type Daemon struct {
|
||||
|
||||
func Init(kernelConfig string) (d *Daemon, err error) {
|
||||
d = &Daemon{}
|
||||
d.Threads = runtime.NumCPU()
|
||||
d.Resources = NewResources()
|
||||
|
||||
d.kernelConfig = kernelConfig
|
||||
|
||||
d.wg.Add(1) // matches with db.Close()
|
||||
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
|
||||
if err != nil {
|
||||
@ -59,49 +50,28 @@ func (d *Daemon) Daemon() {
|
||||
log.Fatal().Msg("db is not initialized")
|
||||
}
|
||||
|
||||
swg := sizedwaitgroup.New(d.Threads)
|
||||
log.Info().Int("threads", d.Threads).Msg("start")
|
||||
log.Info().Msg("start daemon loop")
|
||||
|
||||
for !d.shutdown {
|
||||
d.wg.Add(1)
|
||||
|
||||
jobs, err := db.Jobs(d.db)
|
||||
if err != nil && !d.shutdown {
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("")
|
||||
d.wg.Done()
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
if d.shutdown {
|
||||
break
|
||||
err = newPjob(job, d.db).Process()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("%v", job)
|
||||
}
|
||||
|
||||
pj := newJobProcessor(job, d.db)
|
||||
|
||||
if job.Status == api.StatusNew {
|
||||
pj.SetStatus(api.StatusWaiting)
|
||||
continue
|
||||
}
|
||||
|
||||
if job.Status != api.StatusWaiting {
|
||||
continue
|
||||
}
|
||||
|
||||
swg.Add()
|
||||
go func(pj jobProcessor) {
|
||||
defer swg.Done()
|
||||
pj.Process(d.Resources)
|
||||
time.Sleep(time.Second)
|
||||
}(pj)
|
||||
}
|
||||
|
||||
d.wg.Done()
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
swg.Wait()
|
||||
}
|
||||
|
||||
func handler(conn net.Conn, e cmdenv) {
|
||||
@ -169,10 +139,10 @@ func (d *Daemon) Listen(addr string) {
|
||||
|
||||
go io.Copy(logWriter{log: log.Logger}, stderr)
|
||||
|
||||
log.Debug().Msgf("start %v", git)
|
||||
log.Info().Msgf("start %v", git)
|
||||
git.Start()
|
||||
defer func() {
|
||||
log.Debug().Msgf("stop %v", git)
|
||||
log.Info().Msgf("stop %v", git)
|
||||
}()
|
||||
|
||||
err = git.Wait()
|
||||
@ -216,8 +186,7 @@ func (d *Daemon) Listen(addr string) {
|
||||
log.Fatal().Err(err).Msg("listen")
|
||||
}
|
||||
|
||||
log.Info().Str("addr", ":9418").Msg("git")
|
||||
log.Info().Str("addr", addr).Msg("daemon")
|
||||
log.Info().Msgf("listen on %v", addr)
|
||||
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
@ -228,7 +197,7 @@ func (d *Daemon) Listen(addr string) {
|
||||
|
||||
e := cmdenv{
|
||||
DB: d.db,
|
||||
WG: &d.wg,
|
||||
WG: d.wg,
|
||||
Conn: conn,
|
||||
KernelConfig: d.kernelConfig,
|
||||
}
|
||||
|
@ -19,20 +19,20 @@ import (
|
||||
"code.dumpstack.io/tools/out-of-tree/qemu"
|
||||
)
|
||||
|
||||
type jobProcessor struct {
|
||||
type pjob struct {
|
||||
job api.Job
|
||||
log zerolog.Logger
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
|
||||
func newPjob(job api.Job, db *sql.DB) (pj pjob) {
|
||||
pj.job = job
|
||||
pj.db = db
|
||||
pj.log = log.With().Str("uuid", job.UUID).Logger()
|
||||
return
|
||||
}
|
||||
|
||||
func (pj jobProcessor) Update() (err error) {
|
||||
func (pj pjob) Update() (err error) {
|
||||
err = db.UpdateJob(pj.db, pj.job)
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msgf("update job %v", pj.job)
|
||||
@ -40,38 +40,21 @@ func (pj jobProcessor) Update() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (pj jobProcessor) SetStatus(status api.Status) (err error) {
|
||||
func (pj pjob) SetStatus(status api.Status) (err error) {
|
||||
pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
|
||||
pj.job.Status = status
|
||||
err = pj.Update()
|
||||
return
|
||||
}
|
||||
|
||||
func (pj *jobProcessor) Process(res *Resources) (err error) {
|
||||
if pj.job.Status != api.StatusWaiting {
|
||||
err = errors.New("job is not available to process")
|
||||
func (pj pjob) Process() (err error) {
|
||||
switch pj.job.Status {
|
||||
case api.StatusNew:
|
||||
pj.log.Info().Msgf(`%v`, pj.job.Status)
|
||||
pj.SetStatus(api.StatusWaiting)
|
||||
return
|
||||
}
|
||||
|
||||
if pj.job.Artifact.Qemu.Cpus == 0 {
|
||||
pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs
|
||||
}
|
||||
|
||||
if pj.job.Artifact.Qemu.Memory == 0 {
|
||||
pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory
|
||||
}
|
||||
|
||||
err = res.Allocate(pj.job)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
res.Release(pj.job)
|
||||
}()
|
||||
|
||||
log.Info().Msgf("process job %v", pj.job.UUID)
|
||||
|
||||
case api.StatusWaiting:
|
||||
pj.SetStatus(api.StatusRunning)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@ -166,5 +149,6 @@ func (pj *jobProcessor) Process(res *Resources) (err error) {
|
||||
err = errors.New("tests failed")
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@ -1,206 +0,0 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"runtime"
|
||||
"sync"
|
||||
"syscall"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
)
|
||||
|
||||
type Resources struct {
|
||||
initialized bool
|
||||
|
||||
CPU *CPUResource
|
||||
RAM *RAMResources
|
||||
}
|
||||
|
||||
func NewResources() (r *Resources) {
|
||||
r = &Resources{}
|
||||
r.CPU = NewCPUResources()
|
||||
r.RAM = NewRAMResources()
|
||||
r.initialized = true
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Resources) Allocate(job api.Job) (err error) {
|
||||
if !r.initialized {
|
||||
err = errors.New("resources not initialized")
|
||||
return
|
||||
}
|
||||
|
||||
if job.Artifact.Qemu.Cpus == 0 {
|
||||
err = errors.New("no cpus requested")
|
||||
return
|
||||
}
|
||||
|
||||
if job.Artifact.Qemu.Memory == 0 {
|
||||
err = errors.New("no memory requested")
|
||||
return
|
||||
}
|
||||
|
||||
origRam := r.RAM.GetSpent()
|
||||
origCPU := r.CPU.GetSpent()
|
||||
|
||||
err = r.CPU.Allocate(job.Artifact.Qemu.Cpus)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = r.RAM.Allocate(job.Artifact.Qemu.Memory)
|
||||
if err != nil {
|
||||
r.CPU.Release(job.Artifact.Qemu.Cpus)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug().Msgf("allocated %d cpus, %d MB ram",
|
||||
r.CPU.GetSpent()-origCPU,
|
||||
r.RAM.GetSpent()-origRam)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Resources) Release(job api.Job) {
|
||||
if !r.initialized {
|
||||
log.Error().Msg("resources not initialized")
|
||||
return
|
||||
}
|
||||
|
||||
r.CPU.Release(job.Artifact.Qemu.Cpus)
|
||||
r.RAM.Release(job.Artifact.Qemu.Memory)
|
||||
|
||||
log.Debug().Msgf("released %d cpus, %d MB ram",
|
||||
job.Artifact.Qemu.Cpus,
|
||||
job.Artifact.Qemu.Memory)
|
||||
}
|
||||
|
||||
type CPUResource struct {
|
||||
num int
|
||||
overcommit float64
|
||||
|
||||
mu *sync.Mutex
|
||||
spent int
|
||||
}
|
||||
|
||||
const (
|
||||
Allocation = iota
|
||||
Release
|
||||
)
|
||||
|
||||
func NewCPUResources() (cpur *CPUResource) {
|
||||
cpur = &CPUResource{}
|
||||
cpur.mu = &sync.Mutex{}
|
||||
cpur.num = runtime.NumCPU()
|
||||
cpur.overcommit = 1
|
||||
log.Debug().Msgf("total cpus: %d", cpur.num)
|
||||
return
|
||||
}
|
||||
|
||||
func (cpur *CPUResource) SetOvercommit(oc float64) {
|
||||
log.Info().Int("cpus", cpur.num).
|
||||
Int("result", int(float64(cpur.num)*oc)).
|
||||
Msgf("%.02f", oc)
|
||||
cpur.overcommit = oc
|
||||
}
|
||||
|
||||
func (cpur *CPUResource) GetSpent() int {
|
||||
cpur.mu.Lock()
|
||||
defer cpur.mu.Unlock()
|
||||
return cpur.spent
|
||||
}
|
||||
|
||||
var ErrNotEnoughCpu = errors.New("not enough cpu")
|
||||
|
||||
func (cpur *CPUResource) Allocate(cpu int) (err error) {
|
||||
cpur.mu.Lock()
|
||||
defer cpur.mu.Unlock()
|
||||
|
||||
if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) {
|
||||
err = ErrNotEnoughCpu
|
||||
return
|
||||
}
|
||||
|
||||
cpur.spent += cpu
|
||||
return
|
||||
}
|
||||
|
||||
func (cpur *CPUResource) Release(cpu int) (err error) {
|
||||
cpur.mu.Lock()
|
||||
defer cpur.mu.Unlock()
|
||||
|
||||
if cpur.spent < cpu {
|
||||
err = ErrFreeingMoreThanAllocated
|
||||
return
|
||||
}
|
||||
|
||||
cpur.spent -= cpu
|
||||
return
|
||||
}
|
||||
|
||||
type RAMResources struct {
|
||||
mb int
|
||||
overcommit float64
|
||||
|
||||
mu *sync.Mutex
|
||||
spent int
|
||||
}
|
||||
|
||||
func NewRAMResources() (ramr *RAMResources) {
|
||||
ramr = &RAMResources{}
|
||||
ramr.mu = &sync.Mutex{}
|
||||
ramr.overcommit = 1
|
||||
|
||||
var info syscall.Sysinfo_t
|
||||
syscall.Sysinfo(&info)
|
||||
ramr.mb = int(info.Totalram / 1024 / 1024)
|
||||
log.Debug().Msgf("total ram: %d MB", ramr.mb)
|
||||
return
|
||||
}
|
||||
|
||||
func (ramr *RAMResources) SetOvercommit(oc float64) {
|
||||
log.Info().Int("ram", ramr.mb).
|
||||
Int("result", int(float64(ramr.mb)*oc)).
|
||||
Msgf("%.02f", oc)
|
||||
ramr.overcommit = oc
|
||||
}
|
||||
|
||||
func (ramr RAMResources) GetSpent() int {
|
||||
ramr.mu.Lock()
|
||||
defer ramr.mu.Unlock()
|
||||
return ramr.spent
|
||||
}
|
||||
|
||||
var ErrNotEnoughRam = errors.New("not enough ram")
|
||||
|
||||
func (ramr *RAMResources) Allocate(mb int) (err error) {
|
||||
ramr.mu.Lock()
|
||||
defer ramr.mu.Unlock()
|
||||
|
||||
ocmem := int(float64(ramr.mb) * ramr.overcommit)
|
||||
|
||||
if mb > ocmem-ramr.spent {
|
||||
err = ErrNotEnoughRam
|
||||
return
|
||||
}
|
||||
|
||||
ramr.spent += mb
|
||||
return
|
||||
}
|
||||
|
||||
var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated")
|
||||
|
||||
func (ramr *RAMResources) Release(mb int) (err error) {
|
||||
ramr.mu.Lock()
|
||||
defer ramr.mu.Unlock()
|
||||
|
||||
if ramr.spent < mb {
|
||||
err = ErrFreeingMoreThanAllocated
|
||||
return
|
||||
}
|
||||
|
||||
ramr.spent -= mb
|
||||
return
|
||||
}
|
@ -37,13 +37,6 @@ const (
|
||||
unsupported = "unsupported" // for test purposes
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultCPUs = 1
|
||||
DefaultMemory = 512 // megabytes
|
||||
DefaultSSHRetries = 4
|
||||
DefaultSSHRetryTimeout = time.Second / 4
|
||||
)
|
||||
|
||||
// Kernel describe kernel parameters for qemu
|
||||
type Kernel struct {
|
||||
Name string
|
||||
@ -130,10 +123,11 @@ func NewSystem(arch arch, kernel Kernel, drivePath string) (q *System, err error
|
||||
}
|
||||
q.drivePath = drivePath
|
||||
|
||||
q.Cpus = DefaultCPUs
|
||||
q.Memory = DefaultMemory
|
||||
q.SSH.Retries = DefaultSSHRetries
|
||||
q.SSH.RetryTimeout = DefaultSSHRetryTimeout
|
||||
// Default values
|
||||
q.Cpus = 1
|
||||
q.Memory = 512 // megabytes
|
||||
q.SSH.Retries = 4
|
||||
q.SSH.RetryTimeout = time.Second / 4
|
||||
|
||||
return
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user