From e633fd2e790bf550267aacd1bbbf542c93d507b4 Mon Sep 17 00:00:00 2001 From: Mikhail Klementev Date: Mon, 26 Feb 2024 08:55:27 +0000 Subject: [PATCH] feat(daemon): task groups --- api/api.go | 22 ++++++++++++++++++++-- client/client.go | 4 ++-- cmd/daemon.go | 18 ++++++++++++++++-- cmd/pew.go | 10 +++++++--- daemon/commands.go | 30 +++++++++++++++++++++++++++--- daemon/db/db_test.go | 19 ++++++++++++++----- daemon/db/job.go | 31 +++++++++++++++++++------------ daemon/db/job_test.go | 31 +++++++------------------------ daemon/process.go | 7 +++++-- 9 files changed, 117 insertions(+), 55 deletions(-) diff --git a/api/api.go b/api/api.go index 10fec86..66347d0 100644 --- a/api/api.go +++ b/api/api.go @@ -44,13 +44,16 @@ const ( ) type Job struct { - ID int64 + ID int64 + + // Job UUID UUID string + // Group UUID + Group string RepoName string Commit string - Params string Artifact artifact.Artifact Target distro.KernelInfo @@ -61,6 +64,21 @@ func (job *Job) GenUUID() { job.UUID = uuid.New().String() } +// ListJobsParams is the parameters for ListJobs command +type ListJobsParams struct { + // Group UUID + Group string + + // Repo name + Repo string + + // Commit hash + Commit string + + // Status of the job + Status Status +} + type Repo struct { ID int64 Name string diff --git a/client/client.go b/client/client.go index f4ad73c..cd3043f 100644 --- a/client/client.go +++ b/client/client.go @@ -85,8 +85,8 @@ func (c Client) request(cmd api.Command, data any) (resp api.Resp, err error) { return } -func (c Client) Jobs() (jobs []api.Job, err error) { - resp, _ := c.request(api.ListJobs, nil) +func (c Client) Jobs(params api.ListJobsParams) (jobs []api.Job, err error) { + resp, _ := c.request(api.ListJobs, ¶ms) err = resp.GetData(&jobs) if err != nil { diff --git a/cmd/daemon.go b/cmd/daemon.go index 580752f..4630ec7 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" + "code.dumpstack.io/tools/out-of-tree/api" "code.dumpstack.io/tools/out-of-tree/client" ) @@ -26,11 +27,24 @@ type DaemonJobCmd struct { Log DaemonJobsLogsCmd `cmd:"" help:"job logs"` } -type DaemonJobsListCmd struct{} +type DaemonJobsListCmd struct { + Group string `help:"group uuid"` + Repo string `help:"repo name"` + Commit string `help:"commit sha"` + Status string `help:"job status"` +} func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { c := client.Client{RemoteAddr: g.RemoteAddr} - jobs, err := c.Jobs() + + params := api.ListJobsParams{ + Group: cmd.Group, + Repo: cmd.Repo, + Commit: cmd.Commit, + Status: api.Status(cmd.Status), + } + + jobs, err := c.Jobs(params) if err != nil { log.Error().Err(err).Msg("") return diff --git a/cmd/pew.go b/cmd/pew.go index 6e1ac39..000a4e5 100644 --- a/cmd/pew.go +++ b/cmd/pew.go @@ -16,6 +16,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/google/uuid" "github.com/remeh/sizedwaitgroup" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -98,6 +99,9 @@ type PewCmd struct { useRemote bool remoteAddr string + + // UUID of the job set + groupUUID string } func (cmd *PewCmd) getRepoName(worktree string, ka artifact.Artifact) { @@ -149,6 +153,8 @@ func (cmd *PewCmd) syncRepo(worktree string, ka artifact.Artifact) (err error) { } func (cmd *PewCmd) Run(g *Globals) (err error) { + cmd.groupUUID = uuid.New().String() + log.Info().Str("group", cmd.groupUUID).Msg("") cmd.useRemote = g.Remote cmd.remoteAddr = g.RemoteAddr @@ -326,10 +332,8 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup, Str("kernel", ki.KernelRelease). Logger() - log.Trace().Msgf("artifact: %v", spew.Sdump(ka)) - log.Trace().Msgf("kernelinfo: %v", spew.Sdump(ki)) - job := api.Job{} + job.Group = cmd.groupUUID job.RepoName = cmd.repoName job.Commit = cmd.commit diff --git a/daemon/commands.go b/daemon/commands.go index 1c353a5..b73322d 100644 --- a/daemon/commands.go +++ b/daemon/commands.go @@ -49,7 +49,7 @@ func command(req *api.Req, resp *api.Resp, e cmdenv) (err error) { case api.AddJob: err = addJob(req, resp, e) case api.ListJobs: - err = listJobs(resp, e) + err = listJobs(req, resp, e) case api.AddRepo: err = addRepo(req, resp, e) case api.ListRepos: @@ -96,13 +96,37 @@ func rawMode(req *api.Req, e cmdenv) (err error) { return } -func listJobs(resp *api.Resp, e cmdenv) (err error) { +func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) { + var params api.ListJobsParams + err = req.GetData(¶ms) + if err != nil { + return + } + jobs, err := db.Jobs(e.DB) if err != nil { return } - resp.SetData(&jobs) + var result []api.Job + for _, j := range jobs { + if params.Group != "" && j.Group != params.Group { + continue + } + if params.Repo != "" && j.RepoName != params.Repo { + continue + } + if params.Commit != "" && j.Commit != params.Commit { + continue + } + if params.Status != "" && j.Status != params.Status { + continue + } + + result = append(result, j) + } + + resp.SetData(&result) return } diff --git a/daemon/db/db_test.go b/daemon/db/db_test.go index 08b7060..aa91ec5 100644 --- a/daemon/db/db_test.go +++ b/daemon/db/db_test.go @@ -1,22 +1,31 @@ package db import ( + "database/sql" "os" "testing" "github.com/stretchr/testify/assert" ) -func TestOpenDatabase(t *testing.T) { +func tmpdb(t *testing.T) (file *os.File, db *sql.DB) { file, err := os.CreateTemp("", "temp-sqlite.db") assert.Nil(t, err) + // defer os.Remove(file.Name()) + + db, err = OpenDatabase(file.Name()) + assert.Nil(t, err) + // defer db.Close() + + return +} + +func TestOpenDatabase(t *testing.T) { + file, db := tmpdb(t) defer os.Remove(file.Name()) + db.Close() db, err := OpenDatabase(file.Name()) assert.Nil(t, err) db.Close() - - db, err = OpenDatabase(file.Name()) - assert.Nil(t, err) - db.Close() } diff --git a/daemon/db/job.go b/daemon/db/job.go index 66f88f7..44ed35d 100644 --- a/daemon/db/job.go +++ b/daemon/db/job.go @@ -12,9 +12,9 @@ func createJobTable(db *sql.DB) (err error) { CREATE TABLE IF NOT EXISTS job ( id INTEGER PRIMARY KEY, uuid TEXT, + group_uuid TEXT, repo TEXT, "commit" TEXT, - params TEXT, config TEXT, target TEXT, status TEXT DEFAULT "new" @@ -23,7 +23,8 @@ func createJobTable(db *sql.DB) (err error) { } func AddJob(db *sql.DB, job *api.Job) (err error) { - stmt, err := db.Prepare(`INSERT INTO job (uuid, repo, "commit", params, config, target) ` + + stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + + `config, target) ` + `VALUES ($1, $2, $3, $4, $5, $6);`) if err != nil { return @@ -34,7 +35,8 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { config := api.Marshal(job.Artifact) target := api.Marshal(job.Target) - res, err := stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params, + res, err := stmt.Exec(job.UUID, job.Group, + job.RepoName, job.Commit, config, target, ) if err != nil { @@ -45,9 +47,9 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { return } -func UpdateJob(db *sql.DB, job api.Job) (err error) { - stmt, err := db.Prepare(`UPDATE job SET uuid=$1, repo=$2, "commit"=$3, params=$4, ` + - `config=$5, target=$6, status=$7 WHERE id=$8`) +func UpdateJob(db *sql.DB, job *api.Job) (err error) { + stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` + + `"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`) if err != nil { return } @@ -56,14 +58,16 @@ func UpdateJob(db *sql.DB, job api.Job) (err error) { config := api.Marshal(job.Artifact) target := api.Marshal(job.Target) - _, err = stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params, + _, err = stmt.Exec(job.UUID, job.Group, + job.RepoName, job.Commit, config, target, job.Status, job.ID) return } func Jobs(db *sql.DB) (jobs []api.Job, err error) { - stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", params, config, target, status FROM job`) + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` + + `config, target, status FROM job`) if err != nil { return } @@ -79,7 +83,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { for rows.Next() { var job api.Job var config, target []byte - err = rows.Scan(&job.ID, &job.UUID, &job.RepoName, &job.Commit, &job.Params, &config, &target, &job.Status) + err = rows.Scan(&job.ID, &job.UUID, &job.Group, + &job.RepoName, &job.Commit, + &config, &target, &job.Status) if err != nil { return } @@ -101,8 +107,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { } func Job(db *sql.DB, uuid string) (job api.Job, err error) { - stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", ` + - `params, config, target, status ` + + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + + `repo, "commit", ` + + `config, target, status ` + `FROM job WHERE uuid=$1`) if err != nil { return @@ -110,7 +117,7 @@ func Job(db *sql.DB, uuid string) (job api.Job, err error) { defer stmt.Close() err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID, - &job.RepoName, &job.Commit, &job.Params, + &job.Group, &job.RepoName, &job.Commit, &job.Artifact, &job.Target, &job.Status) if err != nil { return diff --git a/daemon/db/job_test.go b/daemon/db/job_test.go index 599703f..3f89287 100644 --- a/daemon/db/job_test.go +++ b/daemon/db/job_test.go @@ -1,49 +1,32 @@ package db import ( - "database/sql" "os" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "code.dumpstack.io/tools/out-of-tree/api" ) -func testCreateJobTable(t *testing.T) (file *os.File, db *sql.DB) { - file, err := os.CreateTemp("", "temp-sqlite.db") - assert.Nil(t, err) - // defer os.Remove(file.Name()) - - db, err = sql.Open("sqlite3", file.Name()) - assert.Nil(t, err) - // defer db.Close() - - db.SetMaxOpenConns(1) - - err = createJobTable(db) - assert.Nil(t, err) - - return -} - func TestJobTable(t *testing.T) { - file, db := testCreateJobTable(t) - defer db.Close() + file, db := tmpdb(t) defer os.Remove(file.Name()) + defer db.Close() job := api.Job{ RepoName: "testname", Commit: "test", - Params: "none", + Group: uuid.New().String(), } err := AddJob(db, &job) assert.Nil(t, err) - job.Params = "changed" + job.Group = uuid.New().String() - err = UpdateJob(db, job) + err = UpdateJob(db, &job) assert.Nil(t, err) jobs, err := Jobs(db) @@ -51,5 +34,5 @@ func TestJobTable(t *testing.T) { assert.Equal(t, 1, len(jobs)) - assert.Equal(t, job.Params, jobs[0].Params) + assert.Equal(t, job.Group, jobs[0].Group) } diff --git a/daemon/process.go b/daemon/process.go index d3d42dc..379d06c 100644 --- a/daemon/process.go +++ b/daemon/process.go @@ -28,12 +28,15 @@ type jobProcessor struct { func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) { pj.job = job pj.db = db - pj.log = log.With().Str("uuid", job.UUID).Logger() + pj.log = log.With(). + Str("uuid", job.UUID). + Str("group", job.Group). + Logger() return } func (pj jobProcessor) Update() (err error) { - err = db.UpdateJob(pj.db, pj.job) + err = db.UpdateJob(pj.db, &pj.job) if err != nil { pj.log.Error().Err(err).Msgf("update job %v", pj.job) }