feat: initial daemon implementation
This commit is contained in:
275
daemon/commands.go
Normal file
275
daemon/commands.go
Normal file
@ -0,0 +1,275 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
"code.dumpstack.io/tools/out-of-tree/config"
|
||||
"code.dumpstack.io/tools/out-of-tree/config/dotfiles"
|
||||
"code.dumpstack.io/tools/out-of-tree/daemon/db"
|
||||
)
|
||||
|
||||
type cmdenv struct {
|
||||
Conn net.Conn
|
||||
|
||||
Log zerolog.Logger
|
||||
|
||||
DB *sql.DB
|
||||
|
||||
WG sync.WaitGroup
|
||||
|
||||
KernelConfig string
|
||||
}
|
||||
|
||||
func command(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
||||
e.Log.Trace().Msgf("%v", spew.Sdump(req))
|
||||
defer e.Log.Trace().Msgf("%v", spew.Sdump(resp))
|
||||
|
||||
e.WG.Add(1)
|
||||
defer e.WG.Done()
|
||||
|
||||
e.Log.Debug().Msgf("%v", req.Command)
|
||||
|
||||
switch req.Command {
|
||||
case api.RawMode:
|
||||
err = rawMode(req, e)
|
||||
case api.AddJob:
|
||||
err = addJob(req, resp, e)
|
||||
case api.ListJobs:
|
||||
err = listJobs(resp, e)
|
||||
case api.AddRepo:
|
||||
err = addRepo(req, resp, e)
|
||||
case api.ListRepos:
|
||||
err = listRepos(resp, e)
|
||||
case api.Kernels:
|
||||
err = kernels(resp, e)
|
||||
case api.JobStatus:
|
||||
err = jobStatus(req, resp, e)
|
||||
case api.JobLogs:
|
||||
err = jobLogs(req, resp, e)
|
||||
default:
|
||||
err = errors.New("unknown command")
|
||||
}
|
||||
|
||||
resp.Err = err
|
||||
return
|
||||
}
|
||||
|
||||
type logWriter struct {
|
||||
log zerolog.Logger
|
||||
}
|
||||
|
||||
func (lw logWriter) Write(p []byte) (n int, err error) {
|
||||
n = len(p)
|
||||
//lw.log.Trace().Msgf("%v", strconv.Quote(string(p)))
|
||||
return
|
||||
}
|
||||
|
||||
func rawMode(req *api.Req, e cmdenv) (err error) {
|
||||
uuid := uuid.New().String()
|
||||
|
||||
lwsend := logWriter{log.With().Str("uuid", uuid).Str("git", "send").Logger()}
|
||||
lwrecv := logWriter{log.With().Str("uuid", uuid).Str("git", "recv").Logger()}
|
||||
|
||||
conn, err := net.Dial("tcp", ":9418")
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("dial")
|
||||
return
|
||||
}
|
||||
|
||||
go io.Copy(e.Conn, io.TeeReader(conn, lwrecv))
|
||||
io.Copy(conn, io.TeeReader(e.Conn, lwsend))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func listJobs(resp *api.Resp, e cmdenv) (err error) {
|
||||
jobs, err := db.Jobs(e.DB)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp.SetData(&jobs)
|
||||
return
|
||||
}
|
||||
|
||||
func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
||||
var job api.Job
|
||||
err = req.GetData(&job)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
job.GenUUID()
|
||||
|
||||
var repos []api.Repo
|
||||
repos, err = db.Repos(e.DB)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var found bool
|
||||
for _, r := range repos {
|
||||
if job.RepoName == r.Name {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
err = errors.New("repo does not exist")
|
||||
return
|
||||
}
|
||||
|
||||
if job.RepoName == "" {
|
||||
err = errors.New("repo name cannot be empty")
|
||||
return
|
||||
}
|
||||
|
||||
if job.Commit == "" {
|
||||
err = errors.New("invalid commit")
|
||||
return
|
||||
}
|
||||
|
||||
err = db.AddJob(e.DB, &job)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp.SetData(&job.UUID)
|
||||
return
|
||||
}
|
||||
|
||||
func listRepos(resp *api.Resp, e cmdenv) (err error) {
|
||||
repos, err := db.Repos(e.DB)
|
||||
|
||||
if err != nil {
|
||||
e.Log.Error().Err(err).Msg("")
|
||||
return
|
||||
}
|
||||
|
||||
for i := range repos {
|
||||
repos[i].Path = dotfiles.Dir("daemon/repos",
|
||||
repos[i].Name)
|
||||
}
|
||||
|
||||
log.Trace().Msgf("%v", spew.Sdump(repos))
|
||||
resp.SetData(&repos)
|
||||
return
|
||||
}
|
||||
|
||||
func addRepo(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
||||
var repo api.Repo
|
||||
err = req.GetData(&repo)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var repos []api.Repo
|
||||
repos, err = db.Repos(e.DB)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, r := range repos {
|
||||
log.Debug().Msgf("%v, %v", r, repo.Name)
|
||||
if repo.Name == r.Name {
|
||||
err = fmt.Errorf("repo already exist")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
cmd := exec.Command("git", "init", "--bare")
|
||||
|
||||
cmd.Dir = dotfiles.Dir("daemon/repos", repo.Name)
|
||||
|
||||
var out []byte
|
||||
out, err = cmd.Output()
|
||||
e.Log.Debug().Msgf("%v -> %v\n%v", cmd, err, string(out))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = db.AddRepo(e.DB, &repo)
|
||||
return
|
||||
}
|
||||
|
||||
func kernels(resp *api.Resp, e cmdenv) (err error) {
|
||||
kcfg, err := config.ReadKernelConfig(e.KernelConfig)
|
||||
if err != nil {
|
||||
e.Log.Error().Err(err).Msg("read kernels config")
|
||||
return
|
||||
}
|
||||
|
||||
e.Log.Info().Msgf("send back %d kernels", len(kcfg.Kernels))
|
||||
resp.SetData(&kcfg.Kernels)
|
||||
return
|
||||
}
|
||||
|
||||
func jobLogs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
||||
var uuid string
|
||||
err = req.GetData(&uuid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
logdir := filepath.Join(dotfiles.File("daemon/logs"), uuid)
|
||||
if _, err = os.Stat(logdir); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
files, err := os.ReadDir(logdir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var logs []api.JobLog
|
||||
|
||||
for _, f := range files {
|
||||
if f.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
logfile := filepath.Join(logdir, f.Name())
|
||||
|
||||
var buf []byte
|
||||
buf, err = os.ReadFile(logfile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
logs = append(logs, api.JobLog{
|
||||
Name: f.Name(),
|
||||
Text: string(buf),
|
||||
})
|
||||
}
|
||||
|
||||
resp.SetData(&logs)
|
||||
return
|
||||
}
|
||||
|
||||
func jobStatus(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
||||
var uuid string
|
||||
err = req.GetData(&uuid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
st, err := db.JobStatus(e.DB, uuid)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
resp.SetData(&st)
|
||||
return
|
||||
}
|
207
daemon/daemon.go
Normal file
207
daemon/daemon.go
Normal file
@ -0,0 +1,207 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"database/sql"
|
||||
"io"
|
||||
"net"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
"code.dumpstack.io/tools/out-of-tree/config/dotfiles"
|
||||
"code.dumpstack.io/tools/out-of-tree/daemon/db"
|
||||
"code.dumpstack.io/tools/out-of-tree/fs"
|
||||
)
|
||||
|
||||
type Daemon struct {
|
||||
db *sql.DB
|
||||
kernelConfig string
|
||||
|
||||
shutdown bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func Init(kernelConfig string) (d *Daemon, err error) {
|
||||
d = &Daemon{}
|
||||
d.kernelConfig = kernelConfig
|
||||
d.wg.Add(1) // matches with db.Close()
|
||||
d.db, err = db.OpenDatabase(dotfiles.File("daemon/daemon.db"))
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("cannot open daemon.db")
|
||||
}
|
||||
|
||||
log.Info().Msgf("database %s", dotfiles.File("daemon/daemon.db"))
|
||||
return
|
||||
}
|
||||
|
||||
func (d *Daemon) Kill() {
|
||||
d.shutdown = true
|
||||
|
||||
d.db.Close()
|
||||
d.wg.Done()
|
||||
}
|
||||
|
||||
func (d *Daemon) Daemon() {
|
||||
if d.db == nil {
|
||||
log.Fatal().Msg("db is not initialized")
|
||||
}
|
||||
|
||||
log.Info().Msg("start daemon loop")
|
||||
|
||||
for !d.shutdown {
|
||||
d.wg.Add(1)
|
||||
|
||||
jobs, err := db.Jobs(d.db)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("")
|
||||
time.Sleep(time.Minute)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, job := range jobs {
|
||||
err = newPjob(job, d.db).Process()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("%v", job)
|
||||
}
|
||||
}
|
||||
|
||||
d.wg.Done()
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func handler(conn net.Conn, e cmdenv) {
|
||||
defer conn.Close()
|
||||
|
||||
resp := api.NewResp()
|
||||
|
||||
e.Log = log.With().
|
||||
Str("resp_uuid", resp.UUID).
|
||||
Str("remote_addr", conn.RemoteAddr().String()).
|
||||
Logger()
|
||||
|
||||
e.Log.Info().Msg("")
|
||||
|
||||
var req api.Req
|
||||
|
||||
defer func() {
|
||||
if req.Command != api.RawMode {
|
||||
resp.Encode(conn)
|
||||
} else {
|
||||
log.Debug().Msg("raw mode, not encode response")
|
||||
}
|
||||
}()
|
||||
|
||||
err := req.Decode(conn)
|
||||
if err != nil {
|
||||
e.Log.Error().Err(err).Msg("cannot decode")
|
||||
return
|
||||
}
|
||||
|
||||
err = command(&req, &resp, e)
|
||||
if err != nil {
|
||||
e.Log.Error().Err(err).Msg("")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) Listen(addr string) {
|
||||
if d.db == nil {
|
||||
log.Fatal().Msg("db is not initialized")
|
||||
}
|
||||
|
||||
go func() {
|
||||
repodir := dotfiles.Dir("daemon/repos")
|
||||
git := exec.Command("git", "daemon", "--port=9418", "--verbose",
|
||||
"--reuseaddr",
|
||||
"--export-all", "--base-path="+repodir,
|
||||
"--enable=receive-pack",
|
||||
"--enable=upload-pack",
|
||||
repodir)
|
||||
|
||||
stdout, err := git.StdoutPipe()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msgf("%v", git)
|
||||
return
|
||||
}
|
||||
|
||||
go io.Copy(logWriter{log: log.Logger}, stdout)
|
||||
|
||||
stderr, err := git.StderrPipe()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msgf("%v", git)
|
||||
return
|
||||
}
|
||||
|
||||
go io.Copy(logWriter{log: log.Logger}, stderr)
|
||||
|
||||
log.Info().Msgf("start %v", git)
|
||||
git.Start()
|
||||
defer func() {
|
||||
log.Info().Msgf("stop %v", git)
|
||||
}()
|
||||
|
||||
err = git.Wait()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msgf("%v", git)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
if !fs.PathExists(dotfiles.File("daemon/cert.pem")) {
|
||||
log.Info().Msg("No cert.pem, generating...")
|
||||
cmd := exec.Command("openssl",
|
||||
"req", "-batch", "-newkey", "rsa:2048",
|
||||
"-new", "-nodes", "-x509",
|
||||
"-subj", "/CN=*",
|
||||
"-addext", "subjectAltName = DNS:*",
|
||||
"-out", dotfiles.File("daemon/cert.pem"),
|
||||
"-keyout", dotfiles.File("daemon/key.pem"))
|
||||
|
||||
out, err := cmd.Output()
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg(string(out))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Msg("copy to client:")
|
||||
log.Info().Msgf("cert: %s, key: %s",
|
||||
dotfiles.File("daemon/cert.pem"),
|
||||
dotfiles.File("daemon/key.pem"))
|
||||
|
||||
cert, err := tls.LoadX509KeyPair(dotfiles.File("daemon/cert.pem"),
|
||||
dotfiles.File("daemon/key.pem"))
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("LoadX509KeyPair")
|
||||
}
|
||||
tlscfg := &tls.Config{Certificates: []tls.Certificate{cert}}
|
||||
|
||||
l, err := tls.Listen("tcp", addr, tlscfg)
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("listen")
|
||||
}
|
||||
|
||||
log.Info().Msgf("listen on %v", addr)
|
||||
|
||||
for {
|
||||
conn, err := l.Accept()
|
||||
if err != nil {
|
||||
log.Fatal().Err(err).Msg("accept")
|
||||
}
|
||||
log.Info().Msgf("accept %s", conn.RemoteAddr())
|
||||
|
||||
e := cmdenv{
|
||||
DB: d.db,
|
||||
WG: d.wg,
|
||||
Conn: conn,
|
||||
KernelConfig: d.kernelConfig,
|
||||
}
|
||||
|
||||
go handler(conn, e)
|
||||
}
|
||||
}
|
15
daemon/daemon_test.go
Normal file
15
daemon/daemon_test.go
Normal file
@ -0,0 +1,15 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.Logger = zerolog.New(zerolog.ConsoleWriter{
|
||||
Out: os.Stdout,
|
||||
NoColor: true,
|
||||
})
|
||||
}
|
123
daemon/db/db.go
Normal file
123
daemon/db/db.go
Normal file
@ -0,0 +1,123 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
// Change on ANY database update
|
||||
const currentDatabaseVersion = 1
|
||||
|
||||
const versionField = "db_version"
|
||||
|
||||
func createMetadataTable(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS metadata (
|
||||
id INTEGER PRIMARY KEY,
|
||||
key TEXT UNIQUE,
|
||||
value TEXT
|
||||
)`)
|
||||
return
|
||||
}
|
||||
|
||||
func metaChkValue(db *sql.DB, key string) (exist bool, err error) {
|
||||
sql := "SELECT EXISTS(SELECT id FROM metadata WHERE key = $1)"
|
||||
stmt, err := db.Prepare(sql)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = stmt.QueryRow(key).Scan(&exist)
|
||||
return
|
||||
}
|
||||
|
||||
func metaGetValue(db *sql.DB, key string) (value string, err error) {
|
||||
stmt, err := db.Prepare("SELECT value FROM metadata " +
|
||||
"WHERE key = $1")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = stmt.QueryRow(key).Scan(&value)
|
||||
return
|
||||
}
|
||||
|
||||
func metaSetValue(db *sql.DB, key, value string) (err error) {
|
||||
stmt, err := db.Prepare("INSERT OR REPLACE INTO metadata " +
|
||||
"(key, value) VALUES ($1, $2)")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
_, err = stmt.Exec(key, value)
|
||||
return
|
||||
}
|
||||
|
||||
func getVersion(db *sql.DB) (version int, err error) {
|
||||
s, err := metaGetValue(db, versionField)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
version, err = strconv.Atoi(s)
|
||||
return
|
||||
}
|
||||
|
||||
func createSchema(db *sql.DB) (err error) {
|
||||
err = createMetadataTable(db)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = createJobTable(db)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = createRepoTable(db)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func OpenDatabase(path string) (db *sql.DB, err error) {
|
||||
db, err = sql.Open("sqlite3", path)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
exists, _ := metaChkValue(db, versionField)
|
||||
if !exists {
|
||||
err = createSchema(db)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = metaSetValue(db, versionField,
|
||||
strconv.Itoa(currentDatabaseVersion))
|
||||
return
|
||||
}
|
||||
|
||||
version, err := getVersion(db)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if version != currentDatabaseVersion {
|
||||
err = fmt.Errorf("database is not supported (%d instead of %d)",
|
||||
version, currentDatabaseVersion)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
22
daemon/db/db_test.go
Normal file
22
daemon/db/db_test.go
Normal file
@ -0,0 +1,22 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestOpenDatabase(t *testing.T) {
|
||||
file, err := os.CreateTemp("", "temp-sqlite.db")
|
||||
assert.Nil(t, err)
|
||||
defer os.Remove(file.Name())
|
||||
|
||||
db, err := OpenDatabase(file.Name())
|
||||
assert.Nil(t, err)
|
||||
db.Close()
|
||||
|
||||
db, err = OpenDatabase(file.Name())
|
||||
assert.Nil(t, err)
|
||||
db.Close()
|
||||
}
|
136
daemon/db/job.go
Normal file
136
daemon/db/job.go
Normal file
@ -0,0 +1,136 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
)
|
||||
|
||||
func createJobTable(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS job (
|
||||
id INTEGER PRIMARY KEY,
|
||||
uuid TEXT,
|
||||
repo TEXT,
|
||||
"commit" TEXT,
|
||||
params TEXT,
|
||||
config TEXT,
|
||||
target TEXT,
|
||||
status TEXT DEFAULT "new"
|
||||
)`)
|
||||
return
|
||||
}
|
||||
|
||||
func AddJob(db *sql.DB, job *api.Job) (err error) {
|
||||
stmt, err := db.Prepare(`INSERT INTO job (uuid, repo, "commit", params, config, target) ` +
|
||||
`VALUES ($1, $2, $3, $4, $5, $6);`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
config := api.Marshal(job.Artifact)
|
||||
target := api.Marshal(job.Target)
|
||||
|
||||
res, err := stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params,
|
||||
config, target,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
job.ID, err = res.LastInsertId()
|
||||
return
|
||||
}
|
||||
|
||||
func UpdateJob(db *sql.DB, job api.Job) (err error) {
|
||||
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, repo=$2, "commit"=$3, params=$4, ` +
|
||||
`config=$5, target=$6, status=$7 WHERE id=$8`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
config := api.Marshal(job.Artifact)
|
||||
target := api.Marshal(job.Target)
|
||||
|
||||
_, err = stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params,
|
||||
config, target,
|
||||
job.Status, job.ID)
|
||||
return
|
||||
}
|
||||
|
||||
func Jobs(db *sql.DB) (jobs []api.Job, err error) {
|
||||
stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", params, config, target, status FROM job`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
rows, err := stmt.Query()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var job api.Job
|
||||
var config, target []byte
|
||||
err = rows.Scan(&job.ID, &job.UUID, &job.RepoName, &job.Commit, &job.Params, &config, &target, &job.Status)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = json.Unmarshal(config, &job.Artifact)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = json.Unmarshal(target, &job.Target)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func Job(db *sql.DB, uuid string) (job api.Job, err error) {
|
||||
stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", ` +
|
||||
`params, config, target, status ` +
|
||||
`FROM job WHERE uuid=$1`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
|
||||
&job.RepoName, &job.Commit, &job.Params,
|
||||
&job.Artifact, &job.Target, &job.Status)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) {
|
||||
stmt, err := db.Prepare(`SELECT status FROM job ` +
|
||||
`WHERE uuid=$1`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
err = stmt.QueryRow(uuid).Scan(&st)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
55
daemon/db/job_test.go
Normal file
55
daemon/db/job_test.go
Normal file
@ -0,0 +1,55 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
)
|
||||
|
||||
func testCreateJobTable(t *testing.T) (file *os.File, db *sql.DB) {
|
||||
file, err := os.CreateTemp("", "temp-sqlite.db")
|
||||
assert.Nil(t, err)
|
||||
// defer os.Remove(file.Name())
|
||||
|
||||
db, err = sql.Open("sqlite3", file.Name())
|
||||
assert.Nil(t, err)
|
||||
// defer db.Close()
|
||||
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
err = createJobTable(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestJobTable(t *testing.T) {
|
||||
file, db := testCreateJobTable(t)
|
||||
defer db.Close()
|
||||
defer os.Remove(file.Name())
|
||||
|
||||
job := api.Job{
|
||||
RepoName: "testname",
|
||||
Commit: "test",
|
||||
Params: "none",
|
||||
}
|
||||
|
||||
err := AddJob(db, &job)
|
||||
assert.Nil(t, err)
|
||||
|
||||
job.Params = "changed"
|
||||
|
||||
err = UpdateJob(db, job)
|
||||
assert.Nil(t, err)
|
||||
|
||||
jobs, err := Jobs(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(jobs))
|
||||
|
||||
assert.Equal(t, job.Params, jobs[0].Params)
|
||||
}
|
61
daemon/db/repo.go
Normal file
61
daemon/db/repo.go
Normal file
@ -0,0 +1,61 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
)
|
||||
|
||||
func createRepoTable(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(`
|
||||
CREATE TABLE IF NOT EXISTS repo (
|
||||
id INTEGER PRIMARY KEY,
|
||||
name TEXT UNIQUE
|
||||
)`)
|
||||
return
|
||||
}
|
||||
|
||||
func AddRepo(db *sql.DB, repo *api.Repo) (err error) {
|
||||
stmt, err := db.Prepare(`INSERT INTO repo (name) ` +
|
||||
`VALUES ($1);`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
res, err := stmt.Exec(repo.Name)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
repo.ID, err = res.LastInsertId()
|
||||
return
|
||||
}
|
||||
|
||||
func Repos(db *sql.DB) (repos []api.Repo, err error) {
|
||||
stmt, err := db.Prepare(`SELECT id, name FROM repo`)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
defer stmt.Close()
|
||||
|
||||
rows, err := stmt.Query()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var repo api.Repo
|
||||
err = rows.Scan(&repo.ID, &repo.Name)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
repos = append(repos, repo)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
46
daemon/db/repo_test.go
Normal file
46
daemon/db/repo_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
)
|
||||
|
||||
func testCreateRepoTable(t *testing.T) (file *os.File, db *sql.DB) {
|
||||
file, err := os.CreateTemp("", "temp-sqlite.db")
|
||||
assert.Nil(t, err)
|
||||
// defer os.Remove(tempDB.Name())
|
||||
|
||||
db, err = sql.Open("sqlite3", file.Name())
|
||||
assert.Nil(t, err)
|
||||
// defer db.Close()
|
||||
|
||||
db.SetMaxOpenConns(1)
|
||||
|
||||
err = createRepoTable(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func TestRepoTable(t *testing.T) {
|
||||
file, db := testCreateRepoTable(t)
|
||||
defer db.Close()
|
||||
defer os.Remove(file.Name())
|
||||
|
||||
repo := api.Repo{Name: "testname"}
|
||||
|
||||
err := AddRepo(db, &repo)
|
||||
assert.Nil(t, err)
|
||||
|
||||
repos, err := Repos(db)
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.Equal(t, 1, len(repos))
|
||||
|
||||
assert.Equal(t, repo, repos[0])
|
||||
}
|
154
daemon/process.go
Normal file
154
daemon/process.go
Normal file
@ -0,0 +1,154 @@
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"code.dumpstack.io/tools/out-of-tree/api"
|
||||
"code.dumpstack.io/tools/out-of-tree/artifact"
|
||||
"code.dumpstack.io/tools/out-of-tree/config/dotfiles"
|
||||
"code.dumpstack.io/tools/out-of-tree/daemon/db"
|
||||
"code.dumpstack.io/tools/out-of-tree/distro"
|
||||
"code.dumpstack.io/tools/out-of-tree/qemu"
|
||||
)
|
||||
|
||||
type pjob struct {
|
||||
job api.Job
|
||||
log zerolog.Logger
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func newPjob(job api.Job, db *sql.DB) (pj pjob) {
|
||||
pj.job = job
|
||||
pj.db = db
|
||||
pj.log = log.With().Str("uuid", job.UUID).Logger()
|
||||
return
|
||||
}
|
||||
|
||||
func (pj pjob) Update() (err error) {
|
||||
err = db.UpdateJob(pj.db, pj.job)
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msgf("update job %v", pj.job)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (pj pjob) SetStatus(status api.Status) (err error) {
|
||||
pj.log.Info().Msgf(`%v -> %v`, pj.job.Status, status)
|
||||
pj.job.Status = status
|
||||
err = pj.Update()
|
||||
return
|
||||
}
|
||||
|
||||
func (pj pjob) Process() (err error) {
|
||||
switch pj.job.Status {
|
||||
case api.StatusNew:
|
||||
pj.log.Info().Msgf(`%v`, pj.job.Status)
|
||||
pj.SetStatus(api.StatusWaiting)
|
||||
return
|
||||
|
||||
case api.StatusWaiting:
|
||||
pj.SetStatus(api.StatusRunning)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
pj.SetStatus(api.StatusFailure)
|
||||
} else {
|
||||
pj.SetStatus(api.StatusSuccess)
|
||||
}
|
||||
}()
|
||||
|
||||
var tmp string
|
||||
tmp, err = os.MkdirTemp(dotfiles.Dir("tmp"), "")
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msg("mktemp")
|
||||
return
|
||||
}
|
||||
defer os.RemoveAll(tmp)
|
||||
|
||||
tmprepo := filepath.Join(tmp, "repo")
|
||||
|
||||
pj.log.Debug().Msgf("temp repo: %v", tmprepo)
|
||||
|
||||
remote := fmt.Sprintf("git://localhost:9418/%s", pj.job.RepoName)
|
||||
|
||||
pj.log.Debug().Msgf("remote: %v", remote)
|
||||
|
||||
var raw []byte
|
||||
|
||||
cmd := exec.Command("git", "clone", remote, tmprepo)
|
||||
|
||||
raw, err = cmd.CombinedOutput()
|
||||
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
|
||||
if err != nil {
|
||||
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
|
||||
return
|
||||
}
|
||||
|
||||
cmd = exec.Command("git", "checkout", pj.job.Commit)
|
||||
|
||||
cmd.Dir = tmprepo
|
||||
|
||||
raw, err = cmd.CombinedOutput()
|
||||
pj.log.Trace().Msgf("%v\n%v", cmd, string(raw))
|
||||
if err != nil {
|
||||
pj.log.Error().Msgf("%v\n%v", cmd, string(raw))
|
||||
return
|
||||
}
|
||||
|
||||
pj.job.Artifact.SourcePath = tmprepo
|
||||
|
||||
var result *artifact.Result
|
||||
var dq *qemu.System
|
||||
|
||||
pj.job.Artifact.Process(pj.log, pj.job.Target, false, "", "", 0,
|
||||
func(q *qemu.System, ka artifact.Artifact, ki distro.KernelInfo,
|
||||
res *artifact.Result) {
|
||||
|
||||
result = res
|
||||
dq = q
|
||||
},
|
||||
)
|
||||
|
||||
logdir := dotfiles.Dir("daemon/logs", pj.job.UUID)
|
||||
|
||||
err = os.WriteFile(filepath.Join(logdir, "build.log"),
|
||||
[]byte(result.Build.Output), 0644)
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msg("")
|
||||
}
|
||||
|
||||
err = os.WriteFile(filepath.Join(logdir, "run.log"),
|
||||
[]byte(result.Run.Output), 0644)
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msg("")
|
||||
}
|
||||
|
||||
err = os.WriteFile(filepath.Join(logdir, "test.log"),
|
||||
[]byte(result.Test.Output), 0644)
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msg("")
|
||||
}
|
||||
|
||||
err = os.WriteFile(filepath.Join(logdir, "qemu.log"),
|
||||
[]byte(dq.Stdout), 0644)
|
||||
if err != nil {
|
||||
pj.log.Error().Err(err).Msg("")
|
||||
}
|
||||
|
||||
pj.log.Info().Msgf("build %v, run %v, test %v",
|
||||
result.Build.Ok, result.Run.Ok, result.Test.Ok)
|
||||
|
||||
if !result.Test.Ok {
|
||||
err = errors.New("tests failed")
|
||||
}
|
||||
|
||||
}
|
||||
return
|
||||
}
|
Reference in New Issue
Block a user