From b8be5fbe2773370c24c0b387ad5244338709f375 Mon Sep 17 00:00:00 2001 From: Mikhail Klementev Date: Mon, 26 Feb 2024 08:55:27 +0000 Subject: [PATCH] feat(daemon): task grouping --- api/api.go | 7 +++++-- cmd/pew.go | 6 ++++++ daemon/db/db_test.go | 19 ++++++++++++++----- daemon/db/job.go | 31 +++++++++++++++++++------------ daemon/db/job_test.go | 24 +++--------------------- daemon/process.go | 7 +++++-- 6 files changed, 52 insertions(+), 42 deletions(-) diff --git a/api/api.go b/api/api.go index 10fec86..b55e983 100644 --- a/api/api.go +++ b/api/api.go @@ -44,13 +44,16 @@ const ( ) type Job struct { - ID int64 + ID int64 + + // Job UUID UUID string + // Group UUID + Group string RepoName string Commit string - Params string Artifact artifact.Artifact Target distro.KernelInfo diff --git a/cmd/pew.go b/cmd/pew.go index 6e1ac39..d25b083 100644 --- a/cmd/pew.go +++ b/cmd/pew.go @@ -16,6 +16,7 @@ import ( "time" "github.com/davecgh/go-spew/spew" + "github.com/google/uuid" "github.com/remeh/sizedwaitgroup" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -98,6 +99,9 @@ type PewCmd struct { useRemote bool remoteAddr string + + // UUID of the job set + groupUUID string } func (cmd *PewCmd) getRepoName(worktree string, ka artifact.Artifact) { @@ -149,6 +153,7 @@ func (cmd *PewCmd) syncRepo(worktree string, ka artifact.Artifact) (err error) { } func (cmd *PewCmd) Run(g *Globals) (err error) { + cmd.groupUUID = uuid.New().String() cmd.useRemote = g.Remote cmd.remoteAddr = g.RemoteAddr @@ -330,6 +335,7 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup, log.Trace().Msgf("kernelinfo: %v", spew.Sdump(ki)) job := api.Job{} + job.Group = cmd.groupUUID job.RepoName = cmd.repoName job.Commit = cmd.commit diff --git a/daemon/db/db_test.go b/daemon/db/db_test.go index 08b7060..aa91ec5 100644 --- a/daemon/db/db_test.go +++ b/daemon/db/db_test.go @@ -1,22 +1,31 @@ package db import ( + "database/sql" "os" "testing" "github.com/stretchr/testify/assert" ) -func TestOpenDatabase(t *testing.T) { +func tmpdb(t *testing.T) (file *os.File, db *sql.DB) { file, err := os.CreateTemp("", "temp-sqlite.db") assert.Nil(t, err) + // defer os.Remove(file.Name()) + + db, err = OpenDatabase(file.Name()) + assert.Nil(t, err) + // defer db.Close() + + return +} + +func TestOpenDatabase(t *testing.T) { + file, db := tmpdb(t) defer os.Remove(file.Name()) + db.Close() db, err := OpenDatabase(file.Name()) assert.Nil(t, err) db.Close() - - db, err = OpenDatabase(file.Name()) - assert.Nil(t, err) - db.Close() } diff --git a/daemon/db/job.go b/daemon/db/job.go index 66f88f7..44ed35d 100644 --- a/daemon/db/job.go +++ b/daemon/db/job.go @@ -12,9 +12,9 @@ func createJobTable(db *sql.DB) (err error) { CREATE TABLE IF NOT EXISTS job ( id INTEGER PRIMARY KEY, uuid TEXT, + group_uuid TEXT, repo TEXT, "commit" TEXT, - params TEXT, config TEXT, target TEXT, status TEXT DEFAULT "new" @@ -23,7 +23,8 @@ func createJobTable(db *sql.DB) (err error) { } func AddJob(db *sql.DB, job *api.Job) (err error) { - stmt, err := db.Prepare(`INSERT INTO job (uuid, repo, "commit", params, config, target) ` + + stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + + `config, target) ` + `VALUES ($1, $2, $3, $4, $5, $6);`) if err != nil { return @@ -34,7 +35,8 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { config := api.Marshal(job.Artifact) target := api.Marshal(job.Target) - res, err := stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params, + res, err := stmt.Exec(job.UUID, job.Group, + job.RepoName, job.Commit, config, target, ) if err != nil { @@ -45,9 +47,9 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { return } -func UpdateJob(db *sql.DB, job api.Job) (err error) { - stmt, err := db.Prepare(`UPDATE job SET uuid=$1, repo=$2, "commit"=$3, params=$4, ` + - `config=$5, target=$6, status=$7 WHERE id=$8`) +func UpdateJob(db *sql.DB, job *api.Job) (err error) { + stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` + + `"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`) if err != nil { return } @@ -56,14 +58,16 @@ func UpdateJob(db *sql.DB, job api.Job) (err error) { config := api.Marshal(job.Artifact) target := api.Marshal(job.Target) - _, err = stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params, + _, err = stmt.Exec(job.UUID, job.Group, + job.RepoName, job.Commit, config, target, job.Status, job.ID) return } func Jobs(db *sql.DB) (jobs []api.Job, err error) { - stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", params, config, target, status FROM job`) + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` + + `config, target, status FROM job`) if err != nil { return } @@ -79,7 +83,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { for rows.Next() { var job api.Job var config, target []byte - err = rows.Scan(&job.ID, &job.UUID, &job.RepoName, &job.Commit, &job.Params, &config, &target, &job.Status) + err = rows.Scan(&job.ID, &job.UUID, &job.Group, + &job.RepoName, &job.Commit, + &config, &target, &job.Status) if err != nil { return } @@ -101,8 +107,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { } func Job(db *sql.DB, uuid string) (job api.Job, err error) { - stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", ` + - `params, config, target, status ` + + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + + `repo, "commit", ` + + `config, target, status ` + `FROM job WHERE uuid=$1`) if err != nil { return @@ -110,7 +117,7 @@ func Job(db *sql.DB, uuid string) (job api.Job, err error) { defer stmt.Close() err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID, - &job.RepoName, &job.Commit, &job.Params, + &job.Group, &job.RepoName, &job.Commit, &job.Artifact, &job.Target, &job.Status) if err != nil { return diff --git a/daemon/db/job_test.go b/daemon/db/job_test.go index 599703f..dbe8646 100644 --- a/daemon/db/job_test.go +++ b/daemon/db/job_test.go @@ -1,7 +1,6 @@ package db import ( - "database/sql" "os" "testing" @@ -10,27 +9,10 @@ import ( "code.dumpstack.io/tools/out-of-tree/api" ) -func testCreateJobTable(t *testing.T) (file *os.File, db *sql.DB) { - file, err := os.CreateTemp("", "temp-sqlite.db") - assert.Nil(t, err) - // defer os.Remove(file.Name()) - - db, err = sql.Open("sqlite3", file.Name()) - assert.Nil(t, err) - // defer db.Close() - - db.SetMaxOpenConns(1) - - err = createJobTable(db) - assert.Nil(t, err) - - return -} - func TestJobTable(t *testing.T) { - file, db := testCreateJobTable(t) - defer db.Close() + file, db := tmpdb(t) defer os.Remove(file.Name()) + defer db.Close() job := api.Job{ RepoName: "testname", @@ -43,7 +25,7 @@ func TestJobTable(t *testing.T) { job.Params = "changed" - err = UpdateJob(db, job) + err = UpdateJob(db, &job) assert.Nil(t, err) jobs, err := Jobs(db) diff --git a/daemon/process.go b/daemon/process.go index d3d42dc..379d06c 100644 --- a/daemon/process.go +++ b/daemon/process.go @@ -28,12 +28,15 @@ type jobProcessor struct { func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) { pj.job = job pj.db = db - pj.log = log.With().Str("uuid", job.UUID).Logger() + pj.log = log.With(). + Str("uuid", job.UUID). + Str("group", job.Group). + Logger() return } func (pj jobProcessor) Update() (err error) { - err = db.UpdateJob(pj.db, pj.job) + err = db.UpdateJob(pj.db, &pj.job) if err != nil { pj.log.Error().Err(err).Msgf("update job %v", pj.job) }