feat(daemon): timestamps
This commit is contained in:
		
							
								
								
									
										11
									
								
								api/api.go
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								api/api.go
									
									
									
									
									
								
							| @@ -6,6 +6,7 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net" | 	"net" | ||||||
| 	"reflect" | 	"reflect" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"code.dumpstack.io/tools/out-of-tree/artifact" | 	"code.dumpstack.io/tools/out-of-tree/artifact" | ||||||
| 	"code.dumpstack.io/tools/out-of-tree/distro" | 	"code.dumpstack.io/tools/out-of-tree/distro" | ||||||
| @@ -57,6 +58,10 @@ type Job struct { | |||||||
| 	Artifact artifact.Artifact | 	Artifact artifact.Artifact | ||||||
| 	Target   distro.KernelInfo | 	Target   distro.KernelInfo | ||||||
|  |  | ||||||
|  | 	Created  time.Time | ||||||
|  | 	Started  time.Time | ||||||
|  | 	Finished time.Time | ||||||
|  |  | ||||||
| 	Status Status | 	Status Status | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -77,6 +82,12 @@ type ListJobsParams struct { | |||||||
|  |  | ||||||
| 	// Status of the job | 	// Status of the job | ||||||
| 	Status Status | 	Status Status | ||||||
|  |  | ||||||
|  | 	// Time range (unix timestamps) | ||||||
|  | 	Time struct { | ||||||
|  | 		After  int64 | ||||||
|  | 		Before int64 | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| type Repo struct { | type Repo struct { | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ package cmd | |||||||
| import ( | import ( | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/rs/zerolog/log" | 	"github.com/rs/zerolog/log" | ||||||
|  |  | ||||||
| @@ -28,10 +29,12 @@ type DaemonJobCmd struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type DaemonJobsListCmd struct { | type DaemonJobsListCmd struct { | ||||||
| 	Group  string `help:"group uuid"` | 	Group  string    `help:"group uuid"` | ||||||
| 	Repo   string `help:"repo name"` | 	Repo   string    `help:"repo name"` | ||||||
| 	Commit string `help:"commit sha"` | 	Commit string    `help:"commit sha"` | ||||||
| 	Status string `help:"job status"` | 	Status string    `help:"job status"` | ||||||
|  | 	After  time.Time `help:"created after" format:"2006-01-02 15:04:05"` | ||||||
|  | 	Before time.Time `help:"created before" format:"2006-01-02 15:04:05"` | ||||||
| } | } | ||||||
|  |  | ||||||
| func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { | func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { | ||||||
| @@ -44,6 +47,14 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { | |||||||
| 		Status: api.Status(cmd.Status), | 		Status: api.Status(cmd.Status), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	if !cmd.After.IsZero() { | ||||||
|  | 		params.Time.After = cmd.After.Unix() | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if !cmd.Before.IsZero() { | ||||||
|  | 		params.Time.Before = cmd.Before.Unix() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	jobs, err := c.Jobs(params) | 	jobs, err := c.Jobs(params) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Error().Err(err).Msg("") | 		log.Error().Err(err).Msg("") | ||||||
|   | |||||||
| @@ -10,6 +10,7 @@ import ( | |||||||
| 	"os/exec" | 	"os/exec" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
| 	"sync" | 	"sync" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/davecgh/go-spew/spew" | 	"github.com/davecgh/go-spew/spew" | ||||||
| 	"github.com/google/uuid" | 	"github.com/google/uuid" | ||||||
| @@ -122,6 +123,18 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) { | |||||||
| 		if params.Status != "" && j.Status != params.Status { | 		if params.Status != "" && j.Status != params.Status { | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  | 		if params.Time.After != 0 { | ||||||
|  | 			if time.Unix(params.Time.After, 0). | ||||||
|  | 				After(j.Created) { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if params.Time.Before != 0 { | ||||||
|  | 			if time.Unix(params.Time.Before, 0). | ||||||
|  | 				Before(j.Created) { | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 		result = append(result, j) | 		result = append(result, j) | ||||||
| 	} | 	} | ||||||
| @@ -139,6 +152,8 @@ func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) { | |||||||
|  |  | ||||||
| 	job.GenUUID() | 	job.GenUUID() | ||||||
|  |  | ||||||
|  | 	job.Created = time.Now() | ||||||
|  |  | ||||||
| 	var repos []api.Repo | 	var repos []api.Repo | ||||||
| 	repos, err = db.Repos(e.DB) | 	repos, err = db.Repos(e.DB) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|   | |||||||
| @@ -62,6 +62,8 @@ func (d *Daemon) Daemon() { | |||||||
| 	swg := sizedwaitgroup.New(d.Threads) | 	swg := sizedwaitgroup.New(d.Threads) | ||||||
| 	log.Info().Int("threads", d.Threads).Msg("start") | 	log.Info().Int("threads", d.Threads).Msg("start") | ||||||
|  |  | ||||||
|  | 	first := true | ||||||
|  |  | ||||||
| 	for !d.shutdown { | 	for !d.shutdown { | ||||||
| 		d.wg.Add(1) | 		d.wg.Add(1) | ||||||
|  |  | ||||||
| @@ -80,6 +82,11 @@ func (d *Daemon) Daemon() { | |||||||
|  |  | ||||||
| 			pj := newJobProcessor(job, d.db) | 			pj := newJobProcessor(job, d.db) | ||||||
|  |  | ||||||
|  | 			if first && job.Status == api.StatusRunning { | ||||||
|  | 				pj.SetStatus(api.StatusWaiting) | ||||||
|  | 				continue | ||||||
|  | 			} | ||||||
|  |  | ||||||
| 			if job.Status == api.StatusNew { | 			if job.Status == api.StatusNew { | ||||||
| 				pj.SetStatus(api.StatusWaiting) | 				pj.SetStatus(api.StatusWaiting) | ||||||
| 				continue | 				continue | ||||||
| @@ -97,6 +104,8 @@ func (d *Daemon) Daemon() { | |||||||
| 			}(pj) | 			}(pj) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		first = false | ||||||
|  |  | ||||||
| 		d.wg.Done() | 		d.wg.Done() | ||||||
| 		time.Sleep(time.Second) | 		time.Sleep(time.Second) | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -3,6 +3,7 @@ package db | |||||||
| import ( | import ( | ||||||
| 	"database/sql" | 	"database/sql" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"code.dumpstack.io/tools/out-of-tree/api" | 	"code.dumpstack.io/tools/out-of-tree/api" | ||||||
| ) | ) | ||||||
| @@ -17,6 +18,9 @@ func createJobTable(db *sql.DB) (err error) { | |||||||
| 		"commit"	TEXT, | 		"commit"	TEXT, | ||||||
| 		config		TEXT, | 		config		TEXT, | ||||||
| 		target		TEXT, | 		target		TEXT, | ||||||
|  | 		created		INT, | ||||||
|  | 		started		INT, | ||||||
|  | 		finished	INT, | ||||||
| 		status		TEXT DEFAULT "new" | 		status		TEXT DEFAULT "new" | ||||||
| 	)`) | 	)`) | ||||||
| 	return | 	return | ||||||
| @@ -24,8 +28,8 @@ func createJobTable(db *sql.DB) (err error) { | |||||||
|  |  | ||||||
| func AddJob(db *sql.DB, job *api.Job) (err error) { | func AddJob(db *sql.DB, job *api.Job) (err error) { | ||||||
| 	stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + | 	stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + | ||||||
| 		`config, target) ` + | 		`config, target, created, started, finished) ` + | ||||||
| 		`VALUES ($1, $2, $3, $4, $5, $6);`) | 		`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9);`) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| @@ -36,8 +40,9 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { | |||||||
| 	target := api.Marshal(job.Target) | 	target := api.Marshal(job.Target) | ||||||
|  |  | ||||||
| 	res, err := stmt.Exec(job.UUID, job.Group, | 	res, err := stmt.Exec(job.UUID, job.Group, | ||||||
| 		job.RepoName, job.Commit, | 		job.RepoName, job.Commit, config, target, | ||||||
| 		config, target, | 		job.Created.Unix(), job.Started.Unix(), | ||||||
|  | 		job.Finished.Unix(), | ||||||
| 	) | 	) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
| @@ -48,8 +53,12 @@ func AddJob(db *sql.DB, job *api.Job) (err error) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func UpdateJob(db *sql.DB, job *api.Job) (err error) { | func UpdateJob(db *sql.DB, job *api.Job) (err error) { | ||||||
| 	stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` + | 	stmt, err := db.Prepare(`UPDATE job ` + | ||||||
| 		`"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`) | 		`SET uuid=$1, group_uuid=$2, repo=$3, ` + | ||||||
|  | 		`"commit"=$4, config=$5, target=$6, ` + | ||||||
|  | 		`created=$7, started=$8, finished=$9, ` + | ||||||
|  | 		`status=$10 ` + | ||||||
|  | 		`WHERE id=$11`) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| @@ -61,13 +70,41 @@ func UpdateJob(db *sql.DB, job *api.Job) (err error) { | |||||||
| 	_, err = stmt.Exec(job.UUID, job.Group, | 	_, err = stmt.Exec(job.UUID, job.Group, | ||||||
| 		job.RepoName, job.Commit, | 		job.RepoName, job.Commit, | ||||||
| 		config, target, | 		config, target, | ||||||
| 		job.Status, job.ID) | 		job.Created.Unix(), job.Started.Unix(), | ||||||
|  | 		job.Finished.Unix(), job.Status, job.ID) | ||||||
|  | 	return | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func scanJob(scan func(dest ...any) error) (job api.Job, err error) { | ||||||
|  | 	var config, target []byte | ||||||
|  | 	var created, started, finished int64 | ||||||
|  | 	err = scan(&job.ID, &job.UUID, &job.Group, | ||||||
|  | 		&job.RepoName, &job.Commit, &config, &target, | ||||||
|  | 		&created, &started, &finished, &job.Status) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	err = json.Unmarshal(config, &job.Artifact) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	err = json.Unmarshal(target, &job.Target) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	job.Created = time.Unix(created, 0) | ||||||
|  | 	job.Started = time.Unix(started, 0) | ||||||
|  | 	job.Finished = time.Unix(finished, 0) | ||||||
| 	return | 	return | ||||||
| } | } | ||||||
|  |  | ||||||
| func Jobs(db *sql.DB) (jobs []api.Job, err error) { | func Jobs(db *sql.DB) (jobs []api.Job, err error) { | ||||||
| 	stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` + | 	stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + | ||||||
| 		`config, target, status FROM job`) | 		`repo, "commit", config, target, created, ` + | ||||||
|  | 		`started, finished, status FROM job`) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| @@ -82,24 +119,10 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { | |||||||
|  |  | ||||||
| 	for rows.Next() { | 	for rows.Next() { | ||||||
| 		var job api.Job | 		var job api.Job | ||||||
| 		var config, target []byte | 		job, err = scanJob(rows.Scan) | ||||||
| 		err = rows.Scan(&job.ID, &job.UUID, &job.Group, |  | ||||||
| 			&job.RepoName, &job.Commit, |  | ||||||
| 			&config, &target, &job.Status) |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		err = json.Unmarshal(config, &job.Artifact) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		err = json.Unmarshal(target, &job.Target) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		jobs = append(jobs, job) | 		jobs = append(jobs, job) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -108,22 +131,15 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) { | |||||||
|  |  | ||||||
| func Job(db *sql.DB, uuid string) (job api.Job, err error) { | func Job(db *sql.DB, uuid string) (job api.Job, err error) { | ||||||
| 	stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + | 	stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + | ||||||
| 		`repo, "commit", ` + | 		`repo, "commit", config, target, ` + | ||||||
| 		`config, target, status ` + | 		`created, started, finished, status ` + | ||||||
| 		`FROM job WHERE uuid=$1`) | 		`FROM job WHERE uuid=$1`) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
| 	defer stmt.Close() | 	defer stmt.Close() | ||||||
|  |  | ||||||
| 	err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID, | 	return scanJob(stmt.QueryRow(uuid).Scan) | ||||||
| 		&job.Group, &job.RepoName, &job.Commit, |  | ||||||
| 		&job.Artifact, &job.Target, &job.Status) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) { | func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) { | ||||||
|   | |||||||
| @@ -26,6 +26,8 @@ func TestJobTable(t *testing.T) { | |||||||
|  |  | ||||||
| 	job.Group = uuid.New().String() | 	job.Group = uuid.New().String() | ||||||
|  |  | ||||||
|  | 	job.Status = api.StatusSuccess | ||||||
|  |  | ||||||
| 	err = UpdateJob(db, &job) | 	err = UpdateJob(db, &job) | ||||||
| 	assert.Nil(t, err) | 	assert.Nil(t, err) | ||||||
|  |  | ||||||
| @@ -35,4 +37,14 @@ func TestJobTable(t *testing.T) { | |||||||
| 	assert.Equal(t, 1, len(jobs)) | 	assert.Equal(t, 1, len(jobs)) | ||||||
|  |  | ||||||
| 	assert.Equal(t, job.Group, jobs[0].Group) | 	assert.Equal(t, job.Group, jobs[0].Group) | ||||||
|  |  | ||||||
|  | 	job, err = Job(db, job.UUID) | ||||||
|  | 	assert.Nil(t, err) | ||||||
|  |  | ||||||
|  | 	assert.Equal(t, api.StatusSuccess, job.Status) | ||||||
|  |  | ||||||
|  | 	st, err := JobStatus(db, job.UUID) | ||||||
|  | 	assert.Nil(t, err) | ||||||
|  |  | ||||||
|  | 	assert.Equal(t, job.Status, st) | ||||||
| } | } | ||||||
|   | |||||||
| @@ -7,6 +7,7 @@ import ( | |||||||
| 	"os" | 	"os" | ||||||
| 	"os/exec" | 	"os/exec" | ||||||
| 	"path/filepath" | 	"path/filepath" | ||||||
|  | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/rs/zerolog" | 	"github.com/rs/zerolog" | ||||||
| 	"github.com/rs/zerolog/log" | 	"github.com/rs/zerolog/log" | ||||||
| @@ -76,7 +77,10 @@ func (pj *jobProcessor) Process(res *Resources) (err error) { | |||||||
| 	log.Info().Msgf("process job %v", pj.job.UUID) | 	log.Info().Msgf("process job %v", pj.job.UUID) | ||||||
|  |  | ||||||
| 	pj.SetStatus(api.StatusRunning) | 	pj.SetStatus(api.StatusRunning) | ||||||
|  | 	pj.job.Started = time.Now() | ||||||
|  |  | ||||||
| 	defer func() { | 	defer func() { | ||||||
|  | 		pj.job.Finished = time.Now() | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			pj.SetStatus(api.StatusFailure) | 			pj.SetStatus(api.StatusFailure) | ||||||
| 		} else { | 		} else { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user