1
0
Fork 0

feat(daemon): query jobs by update time

timestamps
dump_stack() 2024-02-28 01:48:00 +00:00
parent c909c2a352
commit 1c8e1d068b
Signed by: dump_stack
GPG Key ID: C9905BA72B5E02BB
6 changed files with 31 additions and 43 deletions

View File

@ -47,6 +47,8 @@ const (
type Job struct { type Job struct {
ID int64 ID int64
UpdatedAt time.Time
// Job UUID // Job UUID
UUID string UUID string
// Group UUID // Group UUID
@ -83,11 +85,7 @@ type ListJobsParams struct {
// Status of the job // Status of the job
Status Status Status Status
// Time range (unix timestamps) UpdatedAfter int64
Time struct {
After int64
Before int64
}
} }
type Repo struct { type Repo struct {

View File

@ -33,8 +33,7 @@ type DaemonJobsListCmd struct {
Repo string `help:"repo name"` Repo string `help:"repo name"`
Commit string `help:"commit sha"` Commit string `help:"commit sha"`
Status string `help:"job status"` Status string `help:"job status"`
After time.Time `help:"created after" format:"2006-01-02 15:04:05"` After time.Time `help:"updated after" format:"2006-01-02 15:04:05"`
Before time.Time `help:"created before" format:"2006-01-02 15:04:05"`
} }
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
@ -48,11 +47,7 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
} }
if !cmd.After.IsZero() { if !cmd.After.IsZero() {
params.Time.After = cmd.After.Unix() params.UpdatedAfter = cmd.After.Unix()
}
if !cmd.Before.IsZero() {
params.Time.Before = cmd.Before.Unix()
} }
jobs, err := c.Jobs(params) jobs, err := c.Jobs(params)

View File

@ -104,7 +104,7 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
return return
} }
jobs, err := db.Jobs(e.DB) jobs, err := db.Jobs(e.DB, "updated >= ?", params.UpdatedAfter)
if err != nil { if err != nil {
return return
} }
@ -123,18 +123,6 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
if params.Status != "" && j.Status != params.Status { if params.Status != "" && j.Status != params.Status {
continue continue
} }
if params.Time.After != 0 {
if time.Unix(params.Time.After, 0).
After(j.Created) {
continue
}
}
if params.Time.Before != 0 {
if time.Unix(params.Time.Before, 0).
Before(j.Created) {
continue
}
}
result = append(result, j) result = append(result, j)
} }

View File

@ -67,7 +67,7 @@ func (d *Daemon) Daemon() {
for !d.shutdown { for !d.shutdown {
d.wg.Add(1) d.wg.Add(1)
jobs, err := db.Jobs(d.db) jobs, err := db.Jobs(d.db, "")
if err != nil && !d.shutdown { if err != nil && !d.shutdown {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")
d.wg.Done() d.wg.Done()

View File

@ -12,6 +12,7 @@ func createJobTable(db *sql.DB) (err error) {
_, err = db.Exec(` _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS job ( CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
updated INT,
uuid TEXT, uuid TEXT,
group_uuid TEXT, group_uuid TEXT,
repo TEXT, repo TEXT,
@ -27,9 +28,9 @@ func createJobTable(db *sql.DB) (err error) {
} }
func AddJob(db *sql.DB, job *api.Job) (err error) { func AddJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + stmt, err := db.Prepare(`INSERT INTO job (updated, uuid, group_uuid, repo, "commit", ` +
`config, target, created, started, finished) ` + `config, target, created, started, finished) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);`) `VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`)
if err != nil { if err != nil {
return return
} }
@ -39,7 +40,7 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
config := api.Marshal(job.Artifact) config := api.Marshal(job.Artifact)
target := api.Marshal(job.Target) target := api.Marshal(job.Target)
res, err := stmt.Exec(job.UUID, job.Group, res, err := stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, config, target, job.RepoName, job.Commit, config, target,
job.Created.Unix(), job.Started.Unix(), job.Created.Unix(), job.Started.Unix(),
job.Finished.Unix(), job.Finished.Unix(),
@ -54,11 +55,11 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
func UpdateJob(db *sql.DB, job *api.Job) (err error) { func UpdateJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`UPDATE job ` + stmt, err := db.Prepare(`UPDATE job ` +
`SET uuid=$1, group_uuid=$2, repo=$3, ` + `SET updated=$1, uuid=$2, group_uuid=$3, repo=$4, ` +
`"commit"=$4, config=$5, target=$6, ` + `"commit"=$5, config=$6, target=$7, ` +
`created=$7, started=$8, finished=$9, ` + `created=$8, started=$9, finished=$10, ` +
`status=$10 ` + `status=$11 ` +
`WHERE id=$11`) `WHERE id=$12`)
if err != nil { if err != nil {
return return
} }
@ -67,7 +68,7 @@ func UpdateJob(db *sql.DB, job *api.Job) (err error) {
config := api.Marshal(job.Artifact) config := api.Marshal(job.Artifact)
target := api.Marshal(job.Target) target := api.Marshal(job.Target)
_, err = stmt.Exec(job.UUID, job.Group, _, err = stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, job.RepoName, job.Commit,
config, target, config, target,
job.Created.Unix(), job.Started.Unix(), job.Created.Unix(), job.Started.Unix(),
@ -77,8 +78,8 @@ func UpdateJob(db *sql.DB, job *api.Job) (err error) {
func scanJob(scan func(dest ...any) error) (job api.Job, err error) { func scanJob(scan func(dest ...any) error) (job api.Job, err error) {
var config, target []byte var config, target []byte
var created, started, finished int64 var updated, created, started, finished int64
err = scan(&job.ID, &job.UUID, &job.Group, err = scan(&job.ID, &updated, &job.UUID, &job.Group,
&job.RepoName, &job.Commit, &config, &target, &job.RepoName, &job.Commit, &config, &target,
&created, &started, &finished, &job.Status) &created, &started, &finished, &job.Status)
if err != nil { if err != nil {
@ -95,23 +96,28 @@ func scanJob(scan func(dest ...any) error) (job api.Job, err error) {
return return
} }
job.UpdatedAt = time.Unix(updated, 0)
job.Created = time.Unix(created, 0) job.Created = time.Unix(created, 0)
job.Started = time.Unix(started, 0) job.Started = time.Unix(started, 0)
job.Finished = time.Unix(finished, 0) job.Finished = time.Unix(finished, 0)
return return
} }
func Jobs(db *sql.DB) (jobs []api.Job, err error) { func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + q := `SELECT id, updated, uuid, group_uuid, ` +
`repo, "commit", config, target, created, ` + `repo, "commit", config, target, created, ` +
`started, finished, status FROM job`) `started, finished, status FROM job`
if len(where) != 0 {
q += ` WHERE ` + where
}
stmt, err := db.Prepare(q)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
rows, err := stmt.Query() rows, err := stmt.Query(args...)
if err != nil { if err != nil {
return return
} }
@ -130,7 +136,8 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
} }
func Job(db *sql.DB, uuid string) (job api.Job, err error) { func Job(db *sql.DB, uuid string) (job api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + stmt, err := db.Prepare(`SELECT id, updated, uuid, ` +
`group_uuid, ` +
`repo, "commit", config, target, ` + `repo, "commit", config, target, ` +
`created, started, finished, status ` + `created, started, finished, status ` +
`FROM job WHERE uuid=$1`) `FROM job WHERE uuid=$1`)

View File

@ -31,7 +31,7 @@ func TestJobTable(t *testing.T) {
err = UpdateJob(db, &job) err = UpdateJob(db, &job)
assert.Nil(t, err) assert.Nil(t, err)
jobs, err := Jobs(db) jobs, err := Jobs(db, "")
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(jobs)) assert.Equal(t, 1, len(jobs))