Compare commits
	
		
			1 Commits
		
	
	
		
			master
			...
			a608d89cdc
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						
						
							
						
						a608d89cdc
	
				 | 
					
					
						
@@ -17,6 +17,11 @@ import (
 | 
			
		||||
type DaemonCmd struct {
 | 
			
		||||
	Addr string `default:":63527"`
 | 
			
		||||
 | 
			
		||||
	Threads int `help:"number of threads to use"`
 | 
			
		||||
 | 
			
		||||
	OvercommitMemory float64 `help:"overcommit memory factor"`
 | 
			
		||||
	OvercommitCPU    float64 `help:"overcommit CPU factor"`
 | 
			
		||||
 | 
			
		||||
	Serve DaemonServeCmd `cmd:"" help:"start daemon"`
 | 
			
		||||
 | 
			
		||||
	Job  DaemonJobCmd  `cmd:"" aliases:"jobs" help:"manage jobs"`
 | 
			
		||||
@@ -32,6 +37,18 @@ func (cmd *DaemonServeCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
 | 
			
		||||
	}
 | 
			
		||||
	defer d.Kill()
 | 
			
		||||
 | 
			
		||||
	if dm.Threads > 0 {
 | 
			
		||||
		d.Threads = dm.Threads
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if dm.OvercommitMemory > 0 {
 | 
			
		||||
		d.Resources.CPU.SetOvercommit(dm.OvercommitMemory)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if dm.OvercommitCPU > 0 {
 | 
			
		||||
		d.Resources.CPU.SetOvercommit(dm.OvercommitCPU)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go d.Daemon()
 | 
			
		||||
	d.Listen(dm.Addr)
 | 
			
		||||
	return
 | 
			
		||||
 
 | 
			
		||||
@@ -29,7 +29,7 @@ type cmdenv struct {
 | 
			
		||||
 | 
			
		||||
	DB *sql.DB
 | 
			
		||||
 | 
			
		||||
	WG sync.WaitGroup
 | 
			
		||||
	WG *sync.WaitGroup
 | 
			
		||||
 | 
			
		||||
	KernelConfig string
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -6,9 +6,11 @@ import (
 | 
			
		||||
	"io"
 | 
			
		||||
	"net"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/remeh/sizedwaitgroup"
 | 
			
		||||
	"github.com/rs/zerolog/log"
 | 
			
		||||
 | 
			
		||||
	"code.dumpstack.io/tools/out-of-tree/api"
 | 
			
		||||
@@ -18,6 +20,9 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Daemon struct {
 | 
			
		||||
	Threads   int
 | 
			
		||||
	Resources *Resources
 | 
			
		||||
 | 
			
		||||
	db           *sql.DB
 | 
			
		||||
	kernelConfig string
 | 
			
		||||
 | 
			
		||||
@@ -27,7 +32,11 @@ type Daemon struct {
 | 
			
		||||
 | 
			
		||||
func Init(kernelConfig string) (d *Daemon, err error) {
 | 
			
		||||
	d = &Daemon{}
 | 
			
		||||
	d.Threads = runtime.NumCPU()
 | 
			
		||||
	d.Resources = NewResources()
 | 
			
		||||
 | 
			
		||||
	d.kernelConfig = kernelConfig
 | 
			
		||||
 | 
			
		||||
	d.wg.Add(1) // matches with db.Close()
 | 
			
		||||
	d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
@@ -50,28 +59,49 @@ func (d *Daemon) Daemon() {
 | 
			
		||||
		log.Fatal().Msg("db is not initialized")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Info().Msg("start daemon loop")
 | 
			
		||||
	swg := sizedwaitgroup.New(d.Threads)
 | 
			
		||||
	log.Info().Int("threads", d.Threads).Msg("start")
 | 
			
		||||
 | 
			
		||||
	for !d.shutdown {
 | 
			
		||||
		d.wg.Add(1)
 | 
			
		||||
 | 
			
		||||
		jobs, err := db.Jobs(d.db)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		if err != nil && !d.shutdown {
 | 
			
		||||
			log.Error().Err(err).Msg("")
 | 
			
		||||
			d.wg.Done()
 | 
			
		||||
			time.Sleep(time.Minute)
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		for _, job := range jobs {
 | 
			
		||||
			err = newPjob(job, d.db).Process()
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				log.Error().Err(err).Msgf("%v", job)
 | 
			
		||||
			if d.shutdown {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			pj := newJobProcessor(job, d.db)
 | 
			
		||||
 | 
			
		||||
			if job.Status == api.StatusNew {
 | 
			
		||||
				pj.SetStatus(api.StatusWaiting)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			if job.Status != api.StatusWaiting {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			swg.Add()
 | 
			
		||||
			go func(pj jobProcessor) {
 | 
			
		||||
				defer swg.Done()
 | 
			
		||||
				pj.Process(d.Resources)
 | 
			
		||||
				time.Sleep(time.Second)
 | 
			
		||||
			}(pj)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		d.wg.Done()
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	swg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func handler(conn net.Conn, e cmdenv) {
 | 
			
		||||
@@ -139,10 +169,10 @@ func (d *Daemon) Listen(addr string) {
 | 
			
		||||
 | 
			
		||||
		go io.Copy(logWriter{log: log.Logger}, stderr)
 | 
			
		||||
 | 
			
		||||
		log.Info().Msgf("start %v", git)
 | 
			
		||||
		log.Debug().Msgf("start %v", git)
 | 
			
		||||
		git.Start()
 | 
			
		||||
		defer func() {
 | 
			
		||||
			log.Info().Msgf("stop %v", git)
 | 
			
		||||
			log.Debug().Msgf("stop %v", git)
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		err = git.Wait()
 | 
			
		||||
@@ -186,7 +216,8 @@ func (d *Daemon) Listen(addr string) {
 | 
			
		||||
		log.Fatal().Err(err).Msg("listen")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Info().Msgf("listen on %v", addr)
 | 
			
		||||
	log.Info().Str("addr", ":9418").Msg("git")
 | 
			
		||||
	log.Info().Str("addr", addr).Msg("daemon")
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		conn, err := l.Accept()
 | 
			
		||||
@@ -197,7 +228,7 @@ func (d *Daemon) Listen(addr string) {
 | 
			
		||||
 | 
			
		||||
		e := cmdenv{
 | 
			
		||||
			DB:           d.db,
 | 
			
		||||
			WG:           d.wg,
 | 
			
		||||
			WG:           &d.wg,
 | 
			
		||||
			Conn:         conn,
 | 
			
		||||
			KernelConfig: d.kernelConfig,
 | 
			
		||||
		}
 | 
			
		||||
 
 | 
			
		||||
@@ -19,20 +19,20 @@ import (
 | 
			
		||||
	"code.dumpstack.io/tools/out-of-tree/qemu"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type pjob struct {
 | 
			
		||||
type jobProcessor struct {
 | 
			
		||||
	job api.Job
 | 
			
		||||
	log zerolog.Logger
 | 
			
		||||
	db  *sql.DB
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newPjob(job api.Job, db *sql.DB) (pj pjob) {
 | 
			
		||||
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
 | 
			
		||||
	pj.job = job
 | 
			
		||||
	pj.db = db
 | 
			
		||||
	pj.log = log.With().Str("uuid", job.UUID).Logger()
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pj pjob) Update() (err error) {
 | 
			
		||||
func (pj jobProcessor) Update() (err error) {
 | 
			
		||||
	err = db.UpdateJob(pj.db, pj.job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		pj.log.Error().Err(err).Msgf("update job %v", pj.job)
 | 
			
		||||
@@ -40,21 +40,38 @@ func (pj pjob) Update() (err error) {
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pj pjob) SetStatus(status api.Status) (err error) {
 | 
			
		||||
func (pj jobProcessor) SetStatus(status api.Status) (err error) {
 | 
			
		||||
	pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
 | 
			
		||||
	pj.job.Status = status
 | 
			
		||||
	err = pj.Update()
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (pj pjob) Process() (err error) {
 | 
			
		||||
	switch pj.job.Status {
 | 
			
		||||
	case api.StatusNew:
 | 
			
		||||
		pj.log.Info().Msgf(`%v`, pj.job.Status)
 | 
			
		||||
		pj.SetStatus(api.StatusWaiting)
 | 
			
		||||
func (pj *jobProcessor) Process(res *Resources) (err error) {
 | 
			
		||||
	if pj.job.Status != api.StatusWaiting {
 | 
			
		||||
		err = errors.New("job is not available to process")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if pj.job.Artifact.Qemu.Cpus == 0 {
 | 
			
		||||
		pj.job.Artifact.Qemu.Cpus = qemu.DefaultCPUs
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if pj.job.Artifact.Qemu.Memory == 0 {
 | 
			
		||||
		pj.job.Artifact.Qemu.Memory = qemu.DefaultMemory
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = res.Allocate(pj.job)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	defer func() {
 | 
			
		||||
		res.Release(pj.job)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	log.Info().Msgf("process job %v", pj.job.UUID)
 | 
			
		||||
 | 
			
		||||
	case api.StatusWaiting:
 | 
			
		||||
	pj.SetStatus(api.StatusRunning)
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err != nil {
 | 
			
		||||
@@ -149,6 +166,5 @@ func (pj pjob) Process() (err error) {
 | 
			
		||||
		err = errors.New("tests failed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										206
									
								
								daemon/resources.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										206
									
								
								daemon/resources.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,206 @@
 | 
			
		||||
package daemon
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"syscall"
 | 
			
		||||
 | 
			
		||||
	"github.com/rs/zerolog/log"
 | 
			
		||||
 | 
			
		||||
	"code.dumpstack.io/tools/out-of-tree/api"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Resources struct {
 | 
			
		||||
	initialized bool
 | 
			
		||||
 | 
			
		||||
	CPU *CPUResource
 | 
			
		||||
	RAM *RAMResources
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewResources() (r *Resources) {
 | 
			
		||||
	r = &Resources{}
 | 
			
		||||
	r.CPU = NewCPUResources()
 | 
			
		||||
	r.RAM = NewRAMResources()
 | 
			
		||||
	r.initialized = true
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Resources) Allocate(job api.Job) (err error) {
 | 
			
		||||
	if !r.initialized {
 | 
			
		||||
		err = errors.New("resources not initialized")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if job.Artifact.Qemu.Cpus == 0 {
 | 
			
		||||
		err = errors.New("no cpus requested")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if job.Artifact.Qemu.Memory == 0 {
 | 
			
		||||
		err = errors.New("no memory requested")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	origRam := r.RAM.GetSpent()
 | 
			
		||||
	origCPU := r.CPU.GetSpent()
 | 
			
		||||
 | 
			
		||||
	err = r.CPU.Allocate(job.Artifact.Qemu.Cpus)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = r.RAM.Allocate(job.Artifact.Qemu.Memory)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		r.CPU.Release(job.Artifact.Qemu.Cpus)
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Debug().Msgf("allocated %d cpus, %d MB ram",
 | 
			
		||||
		r.CPU.GetSpent()-origCPU,
 | 
			
		||||
		r.RAM.GetSpent()-origRam)
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *Resources) Release(job api.Job) {
 | 
			
		||||
	if !r.initialized {
 | 
			
		||||
		log.Error().Msg("resources not initialized")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	r.CPU.Release(job.Artifact.Qemu.Cpus)
 | 
			
		||||
	r.RAM.Release(job.Artifact.Qemu.Memory)
 | 
			
		||||
 | 
			
		||||
	log.Debug().Msgf("released %d cpus, %d MB ram",
 | 
			
		||||
		job.Artifact.Qemu.Cpus,
 | 
			
		||||
		job.Artifact.Qemu.Memory)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type CPUResource struct {
 | 
			
		||||
	num        int
 | 
			
		||||
	overcommit float64
 | 
			
		||||
 | 
			
		||||
	mu    *sync.Mutex
 | 
			
		||||
	spent int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	Allocation = iota
 | 
			
		||||
	Release
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func NewCPUResources() (cpur *CPUResource) {
 | 
			
		||||
	cpur = &CPUResource{}
 | 
			
		||||
	cpur.mu = &sync.Mutex{}
 | 
			
		||||
	cpur.num = runtime.NumCPU()
 | 
			
		||||
	cpur.overcommit = 1
 | 
			
		||||
	log.Debug().Msgf("total cpus: %d", cpur.num)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cpur *CPUResource) SetOvercommit(oc float64) {
 | 
			
		||||
	log.Info().Int("cpus", cpur.num).
 | 
			
		||||
		Int("result", int(float64(cpur.num)*oc)).
 | 
			
		||||
		Msgf("%.02f", oc)
 | 
			
		||||
	cpur.overcommit = oc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cpur *CPUResource) GetSpent() int {
 | 
			
		||||
	cpur.mu.Lock()
 | 
			
		||||
	defer cpur.mu.Unlock()
 | 
			
		||||
	return cpur.spent
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var ErrNotEnoughCpu = errors.New("not enough cpu")
 | 
			
		||||
 | 
			
		||||
func (cpur *CPUResource) Allocate(cpu int) (err error) {
 | 
			
		||||
	cpur.mu.Lock()
 | 
			
		||||
	defer cpur.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if cpur.spent+cpu > int(float64(cpur.num)*cpur.overcommit) {
 | 
			
		||||
		err = ErrNotEnoughCpu
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cpur.spent += cpu
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (cpur *CPUResource) Release(cpu int) (err error) {
 | 
			
		||||
	cpur.mu.Lock()
 | 
			
		||||
	defer cpur.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if cpur.spent < cpu {
 | 
			
		||||
		err = ErrFreeingMoreThanAllocated
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cpur.spent -= cpu
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type RAMResources struct {
 | 
			
		||||
	mb         int
 | 
			
		||||
	overcommit float64
 | 
			
		||||
 | 
			
		||||
	mu    *sync.Mutex
 | 
			
		||||
	spent int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewRAMResources() (ramr *RAMResources) {
 | 
			
		||||
	ramr = &RAMResources{}
 | 
			
		||||
	ramr.mu = &sync.Mutex{}
 | 
			
		||||
	ramr.overcommit = 1
 | 
			
		||||
 | 
			
		||||
	var info syscall.Sysinfo_t
 | 
			
		||||
	syscall.Sysinfo(&info)
 | 
			
		||||
	ramr.mb = int(info.Totalram / 1024 / 1024)
 | 
			
		||||
	log.Debug().Msgf("total ram: %d MB", ramr.mb)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ramr *RAMResources) SetOvercommit(oc float64) {
 | 
			
		||||
	log.Info().Int("ram", ramr.mb).
 | 
			
		||||
		Int("result", int(float64(ramr.mb)*oc)).
 | 
			
		||||
		Msgf("%.02f", oc)
 | 
			
		||||
	ramr.overcommit = oc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ramr RAMResources) GetSpent() int {
 | 
			
		||||
	ramr.mu.Lock()
 | 
			
		||||
	defer ramr.mu.Unlock()
 | 
			
		||||
	return ramr.spent
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var ErrNotEnoughRam = errors.New("not enough ram")
 | 
			
		||||
 | 
			
		||||
func (ramr *RAMResources) Allocate(mb int) (err error) {
 | 
			
		||||
	ramr.mu.Lock()
 | 
			
		||||
	defer ramr.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	ocmem := int(float64(ramr.mb) * ramr.overcommit)
 | 
			
		||||
 | 
			
		||||
	if mb > ocmem-ramr.spent {
 | 
			
		||||
		err = ErrNotEnoughRam
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ramr.spent += mb
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var ErrFreeingMoreThanAllocated = errors.New("freeing more than allocated")
 | 
			
		||||
 | 
			
		||||
func (ramr *RAMResources) Release(mb int) (err error) {
 | 
			
		||||
	ramr.mu.Lock()
 | 
			
		||||
	defer ramr.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	if ramr.spent < mb {
 | 
			
		||||
		err = ErrFreeingMoreThanAllocated
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ramr.spent -= mb
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user