1
0
out-of-tree/daemon/daemon.go

248 lines
4.6 KiB
Go
Raw Permalink Normal View History

2024-02-20 13:25:31 +00:00
package daemon
import (
"crypto/tls"
"database/sql"
"io"
"net"
"os/exec"
2024-02-25 18:04:02 +00:00
"runtime"
2024-02-20 13:25:31 +00:00
"sync"
"time"
2024-02-25 18:04:02 +00:00
"github.com/remeh/sizedwaitgroup"
2024-02-20 13:25:31 +00:00
"github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
"code.dumpstack.io/tools/out-of-tree/config/dotfiles"
"code.dumpstack.io/tools/out-of-tree/daemon/db"
"code.dumpstack.io/tools/out-of-tree/fs"
)
type Daemon struct {
2024-02-25 18:04:02 +00:00
Threads int
Resources *Resources
2024-02-20 13:25:31 +00:00
db *sql.DB
kernelConfig string
shutdown bool
wg sync.WaitGroup
}
func Init(kernelConfig string) (d *Daemon, err error) {
d = &Daemon{}
2024-02-25 18:04:02 +00:00
d.Threads = runtime.NumCPU()
d.Resources = NewResources()
2024-02-20 13:25:31 +00:00
d.kernelConfig = kernelConfig
2024-02-25 18:04:02 +00:00
2024-02-20 13:25:31 +00:00
d.wg.Add(1) // matches with db.Close()
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
if err != nil {
log.Error().Err(err).Msg("cannot open daemon.db")
}
log.Info().Msgf("database %s", dotfiles.File("daemon/daemon.db"))
return
}
func (d *Daemon) Kill() {
d.shutdown = true
d.db.Close()
d.wg.Done()
}
func (d *Daemon) Daemon() {
if d.db == nil {
log.Fatal().Msg("db is not initialized")
}
2024-02-25 18:04:02 +00:00
swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start")
2024-02-20 13:25:31 +00:00
2024-02-27 02:00:07 +00:00
first := true
2024-02-20 13:25:31 +00:00
for !d.shutdown {
d.wg.Add(1)
jobs, err := db.Jobs(d.db, "")
2024-02-25 18:04:02 +00:00
if err != nil && !d.shutdown {
2024-02-20 13:25:31 +00:00
log.Error().Err(err).Msg("")
2024-02-25 18:04:02 +00:00
d.wg.Done()
2024-02-20 13:25:31 +00:00
time.Sleep(time.Minute)
continue
}
for _, job := range jobs {
2024-02-25 18:04:02 +00:00
if d.shutdown {
break
2024-02-20 13:25:31 +00:00
}
2024-02-25 18:04:02 +00:00
pj := newJobProcessor(job, d.db)
2024-02-27 02:00:07 +00:00
if first && job.Status == api.StatusRunning {
pj.SetStatus(api.StatusWaiting)
continue
}
2024-02-25 18:04:02 +00:00
if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status != api.StatusWaiting {
continue
}
swg.Add()
go func(pj jobProcessor) {
defer swg.Done()
pj.Process(d.Resources)
time.Sleep(time.Second)
}(pj)
2024-02-20 13:25:31 +00:00
}
2024-02-27 02:00:07 +00:00
first = false
2024-02-20 13:25:31 +00:00
d.wg.Done()
time.Sleep(time.Second)
}
2024-02-25 18:04:02 +00:00
swg.Wait()
2024-02-20 13:25:31 +00:00
}
func handler(conn net.Conn, e cmdenv) {
defer conn.Close()
resp := api.NewResp()
e.Log = log.With().
Str("resp_uuid", resp.UUID).
Str("remote_addr", conn.RemoteAddr().String()).
Logger()
e.Log.Info().Msg("")
var req api.Req
defer func() {
if req.Command != api.RawMode {
resp.Encode(conn)
} else {
log.Debug().Msg("raw mode, not encode response")
}
}()
err := req.Decode(conn)
if err != nil {
e.Log.Error().Err(err).Msg("cannot decode")
return
}
err = command(&req, &resp, e)
if err != nil {
e.Log.Error().Err(err).Msg("")
return
}
}
func (d *Daemon) Listen(addr string) {
if d.db == nil {
log.Fatal().Msg("db is not initialized")
}
go func() {
repodir := dotfiles.Dir("daemon/repos")
git := exec.Command("git", "daemon", "--port=9418", "--verbose",
"--reuseaddr",
"--export-all", "--base-path="+repodir,
"--enable=receive-pack",
"--enable=upload-pack",
repodir)
stdout, err := git.StdoutPipe()
if err != nil {
log.Fatal().Err(err).Msgf("%v", git)
return
}
go io.Copy(logWriter{log: log.Logger}, stdout)
stderr, err := git.StderrPipe()
if err != nil {
log.Fatal().Err(err).Msgf("%v", git)
return
}
go io.Copy(logWriter{log: log.Logger}, stderr)
2024-02-25 18:04:02 +00:00
log.Debug().Msgf("start %v", git)
2024-02-20 13:25:31 +00:00
git.Start()
defer func() {
2024-02-25 18:04:02 +00:00
log.Debug().Msgf("stop %v", git)
2024-02-20 13:25:31 +00:00
}()
err = git.Wait()
if err != nil {
log.Fatal().Err(err).Msgf("%v", git)
return
}
}()
if !fs.PathExists(dotfiles.File("daemon/cert.pem")) {
log.Info().Msg("No cert.pem, generating...")
cmd := exec.Command("openssl",
"req", "-batch", "-newkey", "rsa:2048",
"-new", "-nodes", "-x509",
"-subj", "/CN=*",
"-addext", "subjectAltName = DNS:*",
"-out", dotfiles.File("daemon/cert.pem"),
"-keyout", dotfiles.File("daemon/key.pem"))
out, err := cmd.Output()
if err != nil {
log.Error().Err(err).Msg(string(out))
return
}
}
log.Info().Msg("copy to client:")
log.Info().Msgf("cert: %s, key: %s",
dotfiles.File("daemon/cert.pem"),
dotfiles.File("daemon/key.pem"))
cert, err := tls.LoadX509KeyPair(dotfiles.File("daemon/cert.pem"),
dotfiles.File("daemon/key.pem"))
if err != nil {
log.Fatal().Err(err).Msg("LoadX509KeyPair")
}
tlscfg := &tls.Config{Certificates: []tls.Certificate{cert}}
l, err := tls.Listen("tcp", addr, tlscfg)
if err != nil {
log.Fatal().Err(err).Msg("listen")
}
2024-02-25 18:04:02 +00:00
log.Info().Str("addr", ":9418").Msg("git")
log.Info().Str("addr", addr).Msg("daemon")
2024-02-20 13:25:31 +00:00
for {
conn, err := l.Accept()
if err != nil {
log.Fatal().Err(err).Msg("accept")
}
log.Info().Msgf("accept %s", conn.RemoteAddr())
e := cmdenv{
DB: d.db,
2024-02-25 18:04:02 +00:00
WG: &d.wg,
2024-02-20 13:25:31 +00:00
Conn: conn,
KernelConfig: d.kernelConfig,
}
go handler(conn, e)
}
}