feat(daemon): task groups
This commit is contained in:
parent
2b4db95166
commit
dc945044a7
20
api/api.go
20
api/api.go
@ -45,12 +45,15 @@ const (
|
|||||||
|
|
||||||
type Job struct {
|
type Job struct {
|
||||||
ID int64
|
ID int64
|
||||||
|
|
||||||
|
// Job UUID
|
||||||
UUID string
|
UUID string
|
||||||
|
// Group UUID
|
||||||
|
Group string
|
||||||
|
|
||||||
RepoName string
|
RepoName string
|
||||||
Commit string
|
Commit string
|
||||||
|
|
||||||
Params string
|
|
||||||
Artifact artifact.Artifact
|
Artifact artifact.Artifact
|
||||||
Target distro.KernelInfo
|
Target distro.KernelInfo
|
||||||
|
|
||||||
@ -61,6 +64,21 @@ func (job *Job) GenUUID() {
|
|||||||
job.UUID = uuid.New().String()
|
job.UUID = uuid.New().String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListJobsParams is the parameters for ListJobs command
|
||||||
|
type ListJobsParams struct {
|
||||||
|
// Group UUID
|
||||||
|
Group string
|
||||||
|
|
||||||
|
// Repo name
|
||||||
|
Repo string
|
||||||
|
|
||||||
|
// Commit hash
|
||||||
|
Commit string
|
||||||
|
|
||||||
|
// Status of the job
|
||||||
|
Status Status
|
||||||
|
}
|
||||||
|
|
||||||
type Repo struct {
|
type Repo struct {
|
||||||
ID int64
|
ID int64
|
||||||
Name string
|
Name string
|
||||||
|
@ -85,8 +85,8 @@ func (c Client) request(cmd api.Command, data any) (resp api.Resp, err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Client) Jobs() (jobs []api.Job, err error) {
|
func (c Client) Jobs(params api.ListJobsParams) (jobs []api.Job, err error) {
|
||||||
resp, _ := c.request(api.ListJobs, nil)
|
resp, _ := c.request(api.ListJobs, ¶ms)
|
||||||
|
|
||||||
err = resp.GetData(&jobs)
|
err = resp.GetData(&jobs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
|
"code.dumpstack.io/tools/out-of-tree/api"
|
||||||
"code.dumpstack.io/tools/out-of-tree/client"
|
"code.dumpstack.io/tools/out-of-tree/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -26,11 +27,24 @@ type DaemonJobCmd struct {
|
|||||||
Log DaemonJobsLogsCmd `cmd:"" help:"job logs"`
|
Log DaemonJobsLogsCmd `cmd:"" help:"job logs"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type DaemonJobsListCmd struct{}
|
type DaemonJobsListCmd struct {
|
||||||
|
Group string `help:"group uuid"`
|
||||||
|
Repo string `help:"repo name"`
|
||||||
|
Commit string `help:"commit sha"`
|
||||||
|
Status string `help:"job status"`
|
||||||
|
}
|
||||||
|
|
||||||
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
|
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
|
||||||
c := client.Client{RemoteAddr: g.RemoteAddr}
|
c := client.Client{RemoteAddr: g.RemoteAddr}
|
||||||
jobs, err := c.Jobs()
|
|
||||||
|
params := api.ListJobsParams{
|
||||||
|
Group: cmd.Group,
|
||||||
|
Repo: cmd.Repo,
|
||||||
|
Commit: cmd.Commit,
|
||||||
|
Status: api.Status(cmd.Status),
|
||||||
|
}
|
||||||
|
|
||||||
|
jobs, err := c.Jobs(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("")
|
log.Error().Err(err).Msg("")
|
||||||
return
|
return
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/remeh/sizedwaitgroup"
|
"github.com/remeh/sizedwaitgroup"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
@ -98,6 +99,9 @@ type PewCmd struct {
|
|||||||
|
|
||||||
useRemote bool
|
useRemote bool
|
||||||
remoteAddr string
|
remoteAddr string
|
||||||
|
|
||||||
|
// UUID of the job set
|
||||||
|
groupUUID string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *PewCmd) getRepoName(worktree string, ka artifact.Artifact) {
|
func (cmd *PewCmd) getRepoName(worktree string, ka artifact.Artifact) {
|
||||||
@ -149,6 +153,7 @@ func (cmd *PewCmd) syncRepo(worktree string, ka artifact.Artifact) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *PewCmd) Run(g *Globals) (err error) {
|
func (cmd *PewCmd) Run(g *Globals) (err error) {
|
||||||
|
cmd.groupUUID = uuid.New().String()
|
||||||
cmd.useRemote = g.Remote
|
cmd.useRemote = g.Remote
|
||||||
cmd.remoteAddr = g.RemoteAddr
|
cmd.remoteAddr = g.RemoteAddr
|
||||||
|
|
||||||
@ -330,6 +335,7 @@ func (cmd PewCmd) remote(swg *sizedwaitgroup.SizedWaitGroup,
|
|||||||
log.Trace().Msgf("kernelinfo: %v", spew.Sdump(ki))
|
log.Trace().Msgf("kernelinfo: %v", spew.Sdump(ki))
|
||||||
|
|
||||||
job := api.Job{}
|
job := api.Job{}
|
||||||
|
job.Group = cmd.groupUUID
|
||||||
job.RepoName = cmd.repoName
|
job.RepoName = cmd.repoName
|
||||||
job.Commit = cmd.commit
|
job.Commit = cmd.commit
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ func command(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
|||||||
case api.AddJob:
|
case api.AddJob:
|
||||||
err = addJob(req, resp, e)
|
err = addJob(req, resp, e)
|
||||||
case api.ListJobs:
|
case api.ListJobs:
|
||||||
err = listJobs(resp, e)
|
err = listJobs(req, resp, e)
|
||||||
case api.AddRepo:
|
case api.AddRepo:
|
||||||
err = addRepo(req, resp, e)
|
err = addRepo(req, resp, e)
|
||||||
case api.ListRepos:
|
case api.ListRepos:
|
||||||
@ -96,13 +96,37 @@ func rawMode(req *api.Req, e cmdenv) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func listJobs(resp *api.Resp, e cmdenv) (err error) {
|
func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
||||||
|
var params api.ListJobsParams
|
||||||
|
err = req.GetData(¶ms)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
jobs, err := db.Jobs(e.DB)
|
jobs, err := db.Jobs(e.DB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.SetData(&jobs)
|
var result []api.Job
|
||||||
|
for _, j := range jobs {
|
||||||
|
if params.Group != "" && j.Group != params.Group {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if params.Repo != "" && j.RepoName != params.Repo {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if params.Commit != "" && j.Commit != params.Commit {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if params.Status != "" && j.Status != params.Status {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, j)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp.SetData(&result)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,22 +1,31 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"database/sql"
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestOpenDatabase(t *testing.T) {
|
func tmpdb(t *testing.T) (file *os.File, db *sql.DB) {
|
||||||
file, err := os.CreateTemp("", "temp-sqlite.db")
|
file, err := os.CreateTemp("", "temp-sqlite.db")
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
// defer os.Remove(file.Name())
|
||||||
|
|
||||||
|
db, err = OpenDatabase(file.Name())
|
||||||
|
assert.Nil(t, err)
|
||||||
|
// defer db.Close()
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOpenDatabase(t *testing.T) {
|
||||||
|
file, db := tmpdb(t)
|
||||||
defer os.Remove(file.Name())
|
defer os.Remove(file.Name())
|
||||||
|
db.Close()
|
||||||
|
|
||||||
db, err := OpenDatabase(file.Name())
|
db, err := OpenDatabase(file.Name())
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
db.Close()
|
db.Close()
|
||||||
|
|
||||||
db, err = OpenDatabase(file.Name())
|
|
||||||
assert.Nil(t, err)
|
|
||||||
db.Close()
|
|
||||||
}
|
}
|
||||||
|
@ -12,9 +12,9 @@ func createJobTable(db *sql.DB) (err error) {
|
|||||||
CREATE TABLE IF NOT EXISTS job (
|
CREATE TABLE IF NOT EXISTS job (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
uuid TEXT,
|
uuid TEXT,
|
||||||
|
group_uuid TEXT,
|
||||||
repo TEXT,
|
repo TEXT,
|
||||||
"commit" TEXT,
|
"commit" TEXT,
|
||||||
params TEXT,
|
|
||||||
config TEXT,
|
config TEXT,
|
||||||
target TEXT,
|
target TEXT,
|
||||||
status TEXT DEFAULT "new"
|
status TEXT DEFAULT "new"
|
||||||
@ -23,7 +23,8 @@ func createJobTable(db *sql.DB) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func AddJob(db *sql.DB, job *api.Job) (err error) {
|
func AddJob(db *sql.DB, job *api.Job) (err error) {
|
||||||
stmt, err := db.Prepare(`INSERT INTO job (uuid, repo, "commit", params, config, target) ` +
|
stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` +
|
||||||
|
`config, target) ` +
|
||||||
`VALUES ($1, $2, $3, $4, $5, $6);`)
|
`VALUES ($1, $2, $3, $4, $5, $6);`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -34,7 +35,8 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
|
|||||||
config := api.Marshal(job.Artifact)
|
config := api.Marshal(job.Artifact)
|
||||||
target := api.Marshal(job.Target)
|
target := api.Marshal(job.Target)
|
||||||
|
|
||||||
res, err := stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params,
|
res, err := stmt.Exec(job.UUID, job.Group,
|
||||||
|
job.RepoName, job.Commit,
|
||||||
config, target,
|
config, target,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -45,9 +47,9 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func UpdateJob(db *sql.DB, job api.Job) (err error) {
|
func UpdateJob(db *sql.DB, job *api.Job) (err error) {
|
||||||
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, repo=$2, "commit"=$3, params=$4, ` +
|
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` +
|
||||||
`config=$5, target=$6, status=$7 WHERE id=$8`)
|
`"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -56,14 +58,16 @@ func UpdateJob(db *sql.DB, job api.Job) (err error) {
|
|||||||
config := api.Marshal(job.Artifact)
|
config := api.Marshal(job.Artifact)
|
||||||
target := api.Marshal(job.Target)
|
target := api.Marshal(job.Target)
|
||||||
|
|
||||||
_, err = stmt.Exec(job.UUID, job.RepoName, job.Commit, job.Params,
|
_, err = stmt.Exec(job.UUID, job.Group,
|
||||||
|
job.RepoName, job.Commit,
|
||||||
config, target,
|
config, target,
|
||||||
job.Status, job.ID)
|
job.Status, job.ID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func Jobs(db *sql.DB) (jobs []api.Job, err error) {
|
func Jobs(db *sql.DB) (jobs []api.Job, err error) {
|
||||||
stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", params, config, target, status FROM job`)
|
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` +
|
||||||
|
`config, target, status FROM job`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -79,7 +83,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
|
|||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var job api.Job
|
var job api.Job
|
||||||
var config, target []byte
|
var config, target []byte
|
||||||
err = rows.Scan(&job.ID, &job.UUID, &job.RepoName, &job.Commit, &job.Params, &config, &target, &job.Status)
|
err = rows.Scan(&job.ID, &job.UUID, &job.Group,
|
||||||
|
&job.RepoName, &job.Commit,
|
||||||
|
&config, &target, &job.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -101,8 +107,9 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Job(db *sql.DB, uuid string) (job api.Job, err error) {
|
func Job(db *sql.DB, uuid string) (job api.Job, err error) {
|
||||||
stmt, err := db.Prepare(`SELECT id, uuid, repo, "commit", ` +
|
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` +
|
||||||
`params, config, target, status ` +
|
`repo, "commit", ` +
|
||||||
|
`config, target, status ` +
|
||||||
`FROM job WHERE uuid=$1`)
|
`FROM job WHERE uuid=$1`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -110,7 +117,7 @@ func Job(db *sql.DB, uuid string) (job api.Job, err error) {
|
|||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
|
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
|
||||||
&job.RepoName, &job.Commit, &job.Params,
|
&job.Group, &job.RepoName, &job.Commit,
|
||||||
&job.Artifact, &job.Target, &job.Status)
|
&job.Artifact, &job.Target, &job.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -1,49 +1,32 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"database/sql"
|
|
||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"code.dumpstack.io/tools/out-of-tree/api"
|
"code.dumpstack.io/tools/out-of-tree/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testCreateJobTable(t *testing.T) (file *os.File, db *sql.DB) {
|
|
||||||
file, err := os.CreateTemp("", "temp-sqlite.db")
|
|
||||||
assert.Nil(t, err)
|
|
||||||
// defer os.Remove(file.Name())
|
|
||||||
|
|
||||||
db, err = sql.Open("sqlite3", file.Name())
|
|
||||||
assert.Nil(t, err)
|
|
||||||
// defer db.Close()
|
|
||||||
|
|
||||||
db.SetMaxOpenConns(1)
|
|
||||||
|
|
||||||
err = createJobTable(db)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestJobTable(t *testing.T) {
|
func TestJobTable(t *testing.T) {
|
||||||
file, db := testCreateJobTable(t)
|
file, db := tmpdb(t)
|
||||||
defer db.Close()
|
|
||||||
defer os.Remove(file.Name())
|
defer os.Remove(file.Name())
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
job := api.Job{
|
job := api.Job{
|
||||||
RepoName: "testname",
|
RepoName: "testname",
|
||||||
Commit: "test",
|
Commit: "test",
|
||||||
Params: "none",
|
Group: uuid.New().String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
err := AddJob(db, &job)
|
err := AddJob(db, &job)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
job.Params = "changed"
|
job.Group = uuid.New().String()
|
||||||
|
|
||||||
err = UpdateJob(db, job)
|
err = UpdateJob(db, &job)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
jobs, err := Jobs(db)
|
jobs, err := Jobs(db)
|
||||||
@ -51,5 +34,5 @@ func TestJobTable(t *testing.T) {
|
|||||||
|
|
||||||
assert.Equal(t, 1, len(jobs))
|
assert.Equal(t, 1, len(jobs))
|
||||||
|
|
||||||
assert.Equal(t, job.Params, jobs[0].Params)
|
assert.Equal(t, job.Group, jobs[0].Group)
|
||||||
}
|
}
|
||||||
|
@ -28,12 +28,15 @@ type jobProcessor struct {
|
|||||||
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
|
func newJobProcessor(job api.Job, db *sql.DB) (pj jobProcessor) {
|
||||||
pj.job = job
|
pj.job = job
|
||||||
pj.db = db
|
pj.db = db
|
||||||
pj.log = log.With().Str("uuid", job.UUID).Logger()
|
pj.log = log.With().
|
||||||
|
Str("uuid", job.UUID).
|
||||||
|
Str("group", job.Group).
|
||||||
|
Logger()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pj jobProcessor) Update() (err error) {
|
func (pj jobProcessor) Update() (err error) {
|
||||||
err = db.UpdateJob(pj.db, pj.job)
|
err = db.UpdateJob(pj.db, &pj.job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pj.log.Error().Err(err).Msgf("update job %v", pj.job)
|
pj.log.Error().Err(err).Msgf("update job %v", pj.job)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user