1
0

feat(daemon): task groups

This commit is contained in:
dump_stack() 2024-02-26 08:55:27 +00:00
parent 2b4db95166
commit e633fd2e79
Signed by: dump_stack
GPG Key ID: C9905BA72B5E02BB
9 changed files with 117 additions and 55 deletions

View File

@ -45,12 +45,15 @@ const (
type Job struct { type Job struct {
ID int64 ID int64
// Job UUID
UUID string UUID string
// Group UUID
Group string
RepoName string RepoName string
Commit string Commit string
Params string
Artifact artifact.Artifact Artifact artifact.Artifact
Target distro.KernelInfo Target distro.KernelInfo
@ -61,6 +64,21 @@ func (job *Job) GenUUID() {
job.UUID = uuid.New().String() job.UUID = uuid.New().String()
} }
// ListJobsParams is the parameters for ListJobs command
type ListJobsParams struct {
// Group UUID
Group string
// Repo name
Repo string
// Commit hash
Commit string
// Status of the job
Status Status
}
type Repo struct { type Repo struct {
ID int64 ID int64
Name string Name string

View File

@ -85,8 +85,8 @@ func (c Client) request(cmd api.Command, data any) (resp api.Resp, err error) {
return return
} }
func (c Client) Jobs() (jobs []api.Job, err error) { func (c Client) Jobs(params api.ListJobsParams) (jobs []api.Job, err error) {
resp, _ := c.request(api.ListJobs, nil) resp, _ := c.request(api.ListJobs, &params)
err = resp.GetData(&jobs) err = resp.GetData(&jobs)
if err != nil { if err != nil {

View File

@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/api"
"code.dumpstack.io/tools/out-of-tree/client" "code.dumpstack.io/tools/out-of-tree/client"
) )
@ -26,11 +27,24 @@ type DaemonJobCmd struct {
Log DaemonJobsLogsCmd `cmd:"" help:"job logs"` Log DaemonJobsLogsCmd `cmd:"" help:"job logs"`
} }
type DaemonJobsListCmd struct{} type DaemonJobsListCmd struct {
Group string `help:"group uuid"`
Repo string `help:"repo name"`
Commit string `help:"commit sha"`
Status string `help:"job status"`
}
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
c := client.Client{RemoteAddr: g.RemoteAddr} c := client.Client{RemoteAddr: g.RemoteAddr}
jobs, err := c.Jobs()
params := api.ListJobsParams{
Group: cmd.Group,
Repo: cmd.Repo,
Commit: cmd.Commit,
Status: api.Status(cmd.Status),
}
jobs, err := c.Jobs(params)
if err != nil { if err != nil {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")
return return

View File

@ -16,6 +16,7 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
"github.com/remeh/sizedwaitgroup" "github.com/remeh/sizedwaitgroup"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -98,6 +99,9 @@ type PewCmd struct {
useRemote bool useRemote bool
remoteAddr string remoteAddr string
// UUID of the job set
groupUUID string
} }
func (cmd *PewCmd) getRepoName(worktree string, ka artifact.Artifact) { func (cmd *PewCmd) getRepoName(worktree string, ka artifact.Artifact) {
@ -149,6 +153,8 @@ func (cmd *PewCmd) syncRepo(worktree string, ka artifact.Artifact) (err error) {
} }
func (cmd *PewCmd) Run(g *Globals) (err error) { func (cmd *PewCmd) Run(g *Globals) (err error) {
cmd.groupUUID = uuid.New().String()
log.Info().Str("group", cmd.groupUUID).Msg("")
cmd.useRemote = g.Remote cmd.useRemote = g.Remote
cmd.remoteAddr = g.RemoteAddr cmd.remoteAddr = g.RemoteAddr
@ -326,10 +332,8 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
Str("kernel", ki.KernelRelease). Str("kernel", ki.KernelRelease).
Logger() Logger()
log.Trace().Msgf("artifact: %v", spew.Sdump(ka))
log.Trace().Msgf("kernelinfo: %v", spew.Sdump(ki))
job := api.Job{} job := api.Job{}
job.Group = cmd.groupUUID
job.RepoName = cmd.repoName job.RepoName = cmd.repoName
job.Commit = cmd.commit job.Commit = cmd.commit

View File

@ -49,7 +49,7 @@ func command(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
case api.AddJob: case api.AddJob:
err = addJob(req, resp, e) err = addJob(req, resp, e)
case api.ListJobs: case api.ListJobs:
err = listJobs(resp, e) err = listJobs(req, resp, e)
case api.AddRepo: case api.AddRepo:
err = addRepo(req, resp, e) err = addRepo(req, resp, e)
case api.ListRepos: case api.ListRepos:
@ -96,13 +96,37 @@ func rawMode(req *api.Req, e cmdenv) (err error) {
return return
} }
func listJobs(resp *api.Resp, e cmdenv) (err error) { func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
var params api.ListJobsParams
err = req.GetData(&params)
if err != nil {
return
}
jobs, err := db.Jobs(e.DB) jobs, err := db.Jobs(e.DB)
if err != nil { if err != nil {
return return
} }
resp.SetData(&jobs) var result []api.Job
for _, j := range jobs {
if params.Group != "" && j.Group != params.Group {
continue
}
if params.Repo != "" && j.RepoName != params.Repo {
continue
}
if params.Commit != "" && j.Commit != params.Commit {
continue
}
if params.Status != "" && j.Status != params.Status {
continue
}
result = append(result, j)
}
resp.SetData(&result)
return return
} }

View File

@ -1,22 +1,31 @@
package db package db
import ( import (
"database/sql"
"os" "os"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestOpenDatabase(t *testing.T) { func tmpdb(t *testing.T) (file *os.File, db *sql.DB) {
file, err := os.CreateTemp("", "temp-sqlite.db") file, err := os.CreateTemp("", "temp-sqlite.db")
assert.Nil(t, err) assert.Nil(t, err)
// defer os.Remove(file.Name())
db, err = OpenDatabase(file.Name())
assert.Nil(t, err)
// defer db.Close()
return
}
func TestOpenDatabase(t *testing.T) {
file, db := tmpdb(t)
defer os.Remove(file.Name()) defer os.Remove(file.Name())
db.Close()
db, err := OpenDatabase(file.Name()) db, err := OpenDatabase(file.Name())
assert.Nil(t, err) assert.Nil(t, err)
db.Close() db.Close()
db, err = OpenDatabase(file.Name())
assert.Nil(t, err)
db.Close()
} }

View File

@ -12,9 +12,9 @@ func createJobTable(db *sql.DB) (err error) {
CREATE TABLE IF NOT EXISTS job ( CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
uuid TEXT, uuid TEXT,
group_uuid TEXT,
repo TEXT, repo TEXT,
"commit" TEXT, "commit" TEXT,
params TEXT,
config TEXT, config TEXT,
target TEXT, target TEXT,
status TEXT DEFAULT "new" status TEXT DEFAULT "new"
@ -23,7 +23,8 @@ func createJobTable(db *sql.DB) (err error) {
} }
func AddJob(db *sql.DB, job *api.Job) (err error) { func AddJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`INSERT INTO job (uuid, repo, "commit", params, config, target) ` + stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` +
`config, target) ` +
`VALUES ($1, $2, $3, $4, $5, $6);`) `VALUES ($1, $2, $3, $4, $5, $6);`)
if err != nil { if err != nil {
return return
@ -34,7 +35,8 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
config := api.Marshal(job.Artifact) config := api.Marshal(job.Artifact)
target := api.Marshal(job.Target) target := api.Marshal(job.Target)
res, err := stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params, res, err := stmt.Exec(job.UUID, job.Group,
job.RepoName, job.Commit,
config, target, config, target,
) )
if err != nil { if err != nil {
@ -45,9 +47,9 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
return return
} }
func UpdateJob(db *sql.DB, job api.Job) (err error) { func UpdateJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, repo=$2, "commit"=$3, params=$4, ` + stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` +
`config=$5, target=$6, status=$7 WHERE id=$8`) `"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`)
if err != nil { if err != nil {
return return
} }
@ -56,14 +58,16 @@ func UpdateJob(db *sql.DB, job api.Job) (err error) {
config := api.Marshal(job.Artifact) config := api.Marshal(job.Artifact)
target := api.Marshal(job.Target) target := api.Marshal(job.Target)
_, err = stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params, _, err = stmt.Exec(job.UUID, job.Group,
job.RepoName, job.Commit,
config, target, config, target,
job.Status, job.ID) job.Status, job.ID)
return return
} }
func Jobs(db *sql.DB) (jobs []api.Job, err error) { func Jobs(db *sql.DB) (jobs []api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", params, config, target, status FROM job`) stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` +
`config, target, status FROM job`)
if err != nil { if err != nil {
return return
} }
@ -79,7 +83,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
for rows.Next() { for rows.Next() {
var job api.Job var job api.Job
var config, target []byte var config, target []byte
err = rows.Scan(&job.ID, &job.UUID, &job.RepoName, &job.Commit, &job.Params, &config, &target, &job.Status) err = rows.Scan(&job.ID, &job.UUID, &job.Group,
&job.RepoName, &job.Commit,
&config, &target, &job.Status)
if err != nil { if err != nil {
return return
} }
@ -101,8 +107,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
} }
func Job(db *sql.DB, uuid string) (job api.Job, err error) { func Job(db *sql.DB, uuid string) (job api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", ` + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` +
`params, config, target, status ` + `repo, "commit", ` +
`config, target, status ` +
`FROM job WHERE uuid=$1`) `FROM job WHERE uuid=$1`)
if err != nil { if err != nil {
return return
@ -110,7 +117,7 @@ func Job(db *sql.DB, uuid string) (job api.Job, err error) {
defer stmt.Close() defer stmt.Close()
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID, err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
&job.RepoName, &job.Commit, &job.Params, &job.Group, &job.RepoName, &job.Commit,
&job.Artifact, &job.Target, &job.Status) &job.Artifact, &job.Target, &job.Status)
if err != nil { if err != nil {
return return

View File

@ -1,49 +1,32 @@
package db package db
import ( import (
"database/sql"
"os" "os"
"testing" "testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"code.dumpstack.io/tools/out-of-tree/api" "code.dumpstack.io/tools/out-of-tree/api"
) )
func testCreateJobTable(t *testing.T) (file *os.File, db *sql.DB) {
file, err := os.CreateTemp("", "temp-sqlite.db")
assert.Nil(t, err)
// defer os.Remove(file.Name())
db, err = sql.Open("sqlite3", file.Name())
assert.Nil(t, err)
// defer db.Close()
db.SetMaxOpenConns(1)
err = createJobTable(db)
assert.Nil(t, err)
return
}
func TestJobTable(t *testing.T) { func TestJobTable(t *testing.T) {
file, db := testCreateJobTable(t) file, db := tmpdb(t)
defer db.Close()
defer os.Remove(file.Name()) defer os.Remove(file.Name())
defer db.Close()
job := api.Job{ job := api.Job{
RepoName: "testname", RepoName: "testname",
Commit: "test", Commit: "test",
Params: "none", Group: uuid.New().String(),
} }
err := AddJob(db, &job) err := AddJob(db, &job)
assert.Nil(t, err) assert.Nil(t, err)
job.Params = "changed" job.Group = uuid.New().String()
err = UpdateJob(db, job) err = UpdateJob(db, &job)
assert.Nil(t, err) assert.Nil(t, err)
jobs, err := Jobs(db) jobs, err := Jobs(db)
@ -51,5 +34,5 @@ func TestJobTable(t *testing.T) {
assert.Equal(t, 1, len(jobs)) assert.Equal(t, 1, len(jobs))
assert.Equal(t, job.Params, jobs[0].Params) assert.Equal(t, job.Group, jobs[0].Group)
} }

View File

@ -28,12 +28,15 @@ type jobProcessor struct {
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) { func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
pj.job = job pj.job = job
pj.db = db pj.db = db
pj.log = log.With().Str("uuid", job.UUID).Logger() pj.log = log.With().
Str("uuid", job.UUID).
Str("group", job.Group).
Logger()
return return
} }
func (pj jobProcessor) Update() (err error) { func (pj jobProcessor) Update() (err error) {
err = db.UpdateJob(pj.db, pj.job) err = db.UpdateJob(pj.db, &pj.job)
if err != nil { if err != nil {
pj.log.Error().Err(err).Msgf("update job %v", pj.job) pj.log.Error().Err(err).Msgf("update job %v", pj.job)
} }