From c909c2a35294c5adc02555d6d5ba0abba6f96558 Mon Sep 17 00:00:00 2001 From: Mikhail Klementev Date: Tue, 27 Feb 2024 02:00:07 +0000 Subject: [PATCH] feat(daemon): timestamps --- api/api.go | 11 ++++++ cmd/daemon.go | 19 +++++++--- daemon/commands.go | 15 ++++++++ daemon/daemon.go | 9 +++++ daemon/db/job.go | 84 +++++++++++++++++++++++++------------------ daemon/db/job_test.go | 12 +++++++ daemon/process.go | 4 +++ 7 files changed, 116 insertions(+), 38 deletions(-) diff --git a/api/api.go b/api/api.go index 66347d0..a4cd85f 100644 --- a/api/api.go +++ b/api/api.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "reflect" + "time" "code.dumpstack.io/tools/out-of-tree/artifact" "code.dumpstack.io/tools/out-of-tree/distro" @@ -57,6 +58,10 @@ type Job struct { Artifact artifact.Artifact Target distro.KernelInfo + Created time.Time + Started time.Time + Finished time.Time + Status Status } @@ -77,6 +82,12 @@ type ListJobsParams struct { // Status of the job Status Status + + // Time range (unix timestamps) + Time struct { + After int64 + Before int64 + } } type Repo struct { diff --git a/cmd/daemon.go b/cmd/daemon.go index 4630ec7..e2de06b 100644 --- a/cmd/daemon.go +++ b/cmd/daemon.go @@ -7,6 +7,7 @@ package cmd import ( "encoding/json" "fmt" + "time" "github.com/rs/zerolog/log" @@ -28,10 +29,12 @@ type DaemonJobCmd struct { } type DaemonJobsListCmd struct { - Group string `help:"group uuid"` - Repo string `help:"repo name"` - Commit string `help:"commit sha"` - Status string `help:"job status"` + Group string `help:"group uuid"` + Repo string `help:"repo name"` + Commit string `help:"commit sha"` + Status string `help:"job status"` + After time.Time `help:"created after" format:"2006-01-02 15:04:05"` + Before time.Time `help:"created before" format:"2006-01-02 15:04:05"` } func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { @@ -44,6 +47,14 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { Status: api.Status(cmd.Status), } + if !cmd.After.IsZero() { + params.Time.After = cmd.After.Unix() + } + + if !cmd.Before.IsZero() { + params.Time.Before = cmd.Before.Unix() + } + jobs, err := c.Jobs(params) if err != nil { log.Error().Err(err).Msg("") diff --git a/daemon/commands.go b/daemon/commands.go index b73322d..743abcf 100644 --- a/daemon/commands.go +++ b/daemon/commands.go @@ -10,6 +10,7 @@ import ( "os/exec" "path/filepath" "sync" + "time" "github.com/davecgh/go-spew/spew" "github.com/google/uuid" @@ -122,6 +123,18 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) { if params.Status != "" && j.Status != params.Status { continue } + if params.Time.After != 0 { + if time.Unix(params.Time.After, 0). + After(j.Created) { + continue + } + } + if params.Time.Before != 0 { + if time.Unix(params.Time.Before, 0). + Before(j.Created) { + continue + } + } result = append(result, j) } @@ -139,6 +152,8 @@ func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) { job.GenUUID() + job.Created = time.Now() + var repos []api.Repo repos, err = db.Repos(e.DB) if err != nil { diff --git a/daemon/daemon.go b/daemon/daemon.go index a0811b9..c66aedd 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -62,6 +62,8 @@ func (d *Daemon) Daemon() { swg := sizedwaitgroup.New(d.Threads) log.Info().Int("threads", d.Threads).Msg("start") + first := true + for !d.shutdown { d.wg.Add(1) @@ -80,6 +82,11 @@ func (d *Daemon) Daemon() { pj := newJobProcessor(job, d.db) + if first && job.Status == api.StatusRunning { + pj.SetStatus(api.StatusWaiting) + continue + } + if job.Status == api.StatusNew { pj.SetStatus(api.StatusWaiting) continue @@ -97,6 +104,8 @@ func (d *Daemon) Daemon() { }(pj) } + first = false + d.wg.Done() time.Sleep(time.Second) } diff --git a/daemon/db/job.go b/daemon/db/job.go index 44ed35d..5bf07b8 100644 --- a/daemon/db/job.go +++ b/daemon/db/job.go @@ -3,6 +3,7 @@ package db import ( "database/sql" "encoding/json" + "time" "code.dumpstack.io/tools/out-of-tree/api" ) @@ -17,6 +18,9 @@ func createJobTable(db *sql.DB) (err error) { "commit" TEXT, config TEXT, target TEXT, + created INT, + started INT, + finished INT, status TEXT DEFAULT "new" )`) return @@ -24,8 +28,8 @@ func createJobTable(db *sql.DB) (err error) { func AddJob(db *sql.DB, job *api.Job) (err error) { stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + - `config, target) ` + - `VALUES ($1, $2, $3, $4, $5, $6);`) + `config, target, created, started, finished) ` + + `VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);`) if err != nil { return } @@ -36,8 +40,9 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { target := api.Marshal(job.Target) res, err := stmt.Exec(job.UUID, job.Group, - job.RepoName, job.Commit, - config, target, + job.RepoName, job.Commit, config, target, + job.Created.Unix(), job.Started.Unix(), + job.Finished.Unix(), ) if err != nil { return @@ -48,8 +53,12 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { } func UpdateJob(db *sql.DB, job *api.Job) (err error) { - stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` + - `"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`) + stmt, err := db.Prepare(`UPDATE job ` + + `SET uuid=$1, group_uuid=$2, repo=$3, ` + + `"commit"=$4, config=$5, target=$6, ` + + `created=$7, started=$8, finished=$9, ` + + `status=$10 ` + + `WHERE id=$11`) if err != nil { return } @@ -61,13 +70,41 @@ func UpdateJob(db *sql.DB, job *api.Job) (err error) { _, err = stmt.Exec(job.UUID, job.Group, job.RepoName, job.Commit, config, target, - job.Status, job.ID) + job.Created.Unix(), job.Started.Unix(), + job.Finished.Unix(), job.Status, job.ID) + return +} + +func scanJob(scan func(dest ...any) error) (job api.Job, err error) { + var config, target []byte + var created, started, finished int64 + err = scan(&job.ID, &job.UUID, &job.Group, + &job.RepoName, &job.Commit, &config, &target, + &created, &started, &finished, &job.Status) + if err != nil { + return + } + + err = json.Unmarshal(config, &job.Artifact) + if err != nil { + return + } + + err = json.Unmarshal(target, &job.Target) + if err != nil { + return + } + + job.Created = time.Unix(created, 0) + job.Started = time.Unix(started, 0) + job.Finished = time.Unix(finished, 0) return } func Jobs(db *sql.DB) (jobs []api.Job, err error) { - stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` + - `config, target, status FROM job`) + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + + `repo, "commit", config, target, created, ` + + `started, finished, status FROM job`) if err != nil { return } @@ -82,24 +119,10 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { for rows.Next() { var job api.Job - var config, target []byte - err = rows.Scan(&job.ID, &job.UUID, &job.Group, - &job.RepoName, &job.Commit, - &config, &target, &job.Status) + job, err = scanJob(rows.Scan) if err != nil { return } - - err = json.Unmarshal(config, &job.Artifact) - if err != nil { - return - } - - err = json.Unmarshal(target, &job.Target) - if err != nil { - return - } - jobs = append(jobs, job) } @@ -108,22 +131,15 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { func Job(db *sql.DB, uuid string) (job api.Job, err error) { stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + - `repo, "commit", ` + - `config, target, status ` + + `repo, "commit", config, target, ` + + `created, started, finished, status ` + `FROM job WHERE uuid=$1`) if err != nil { return } defer stmt.Close() - err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID, - &job.Group, &job.RepoName, &job.Commit, - &job.Artifact, &job.Target, &job.Status) - if err != nil { - return - } - - return + return scanJob(stmt.QueryRow(uuid).Scan) } func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) { diff --git a/daemon/db/job_test.go b/daemon/db/job_test.go index 3f89287..0f08539 100644 --- a/daemon/db/job_test.go +++ b/daemon/db/job_test.go @@ -26,6 +26,8 @@ func TestJobTable(t *testing.T) { job.Group = uuid.New().String() + job.Status = api.StatusSuccess + err = UpdateJob(db, &job) assert.Nil(t, err) @@ -35,4 +37,14 @@ func TestJobTable(t *testing.T) { assert.Equal(t, 1, len(jobs)) assert.Equal(t, job.Group, jobs[0].Group) + + job, err = Job(db, job.UUID) + assert.Nil(t, err) + + assert.Equal(t, api.StatusSuccess, job.Status) + + st, err := JobStatus(db, job.UUID) + assert.Nil(t, err) + + assert.Equal(t, job.Status, st) } diff --git a/daemon/process.go b/daemon/process.go index 379d06c..bb0b4de 100644 --- a/daemon/process.go +++ b/daemon/process.go @@ -7,6 +7,7 @@ import ( "os" "os/exec" "path/filepath" + "time" "github.com/rs/zerolog" "github.com/rs/zerolog/log" @@ -76,7 +77,10 @@ func (pj *jobProcessor) Process(res *Resources) (err error) { log.Info().Msgf("process job %v", pj.job.UUID) pj.SetStatus(api.StatusRunning) + pj.job.Started = time.Now() + defer func() { + pj.job.Finished = time.Now() if err != nil { pj.SetStatus(api.StatusFailure) } else {