1
0

Compare commits

...

3 Commits

8 changed files with 165 additions and 163 deletions

View File

@ -1,18 +1,18 @@
package api
import (
"encoding/json"
"bytes"
"encoding/gob"
"errors"
"fmt"
"net"
"reflect"
"time"
"code.dumpstack.io/tools/out-of-tree/artifact"
"code.dumpstack.io/tools/out-of-tree/distro"
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
"github.com/rs/zerolog/log"
)
var ErrInvalid = errors.New("")
@ -46,6 +46,8 @@ const (
type Job struct {
ID int64
UpdatedAt time.Time
// Job UUID
UUID string
// Group UUID
@ -57,6 +59,10 @@ type Job struct {
Artifact artifact.Artifact
Target distro.KernelInfo
Created time.Time
Started time.Time
Finished time.Time
Status Status
}
@ -77,6 +83,8 @@ type ListJobsParams struct {
// Status of the job
Status Status
UpdatedAfter int64
}
type Repo struct {
@ -97,9 +105,12 @@ type Req struct {
Data []byte
}
func (r *Req) SetData(data any) {
func (r *Req) SetData(data any) (err error) {
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
r.Data = Marshal(data)
var buf bytes.Buffer
err = gob.NewEncoder(&buf).Encode(data)
r.Data = buf.Bytes()
return
}
func (r *Req) GetData(data any) (err error) {
@ -113,32 +124,16 @@ func (r *Req) GetData(data any) (err error) {
return
}
log.Trace().Msgf("unmarshal %v", string(r.Data))
err = json.Unmarshal(r.Data, &data)
return
buf := bytes.NewBuffer(r.Data)
return gob.NewDecoder(buf).Decode(data)
}
func (r Req) Encode(conn net.Conn) {
log.Debug().Msgf("encode %v", r.Command)
err := json.NewEncoder(conn).Encode(&r)
if err != nil {
log.Fatal().Msgf("encode %v", r.Command)
}
func (r *Req) Encode(conn net.Conn) (err error) {
return gob.NewEncoder(conn).Encode(r)
}
func (r *Req) Decode(conn net.Conn) (err error) {
err = json.NewDecoder(conn).Decode(r)
return
}
func (r Req) Marshal() (bytes []byte) {
return Marshal(r)
}
func (Req) Unmarshal(data []byte) (r Req, err error) {
err = json.Unmarshal(data, &r)
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
return
return gob.NewDecoder(conn).Decode(r)
}
type Resp struct {
@ -157,9 +152,12 @@ func NewResp() (resp Resp) {
return
}
func (r *Resp) SetData(data any) {
func (r *Resp) SetData(data any) (err error) {
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
r.Data = Marshal(data)
var buf bytes.Buffer
err = gob.NewEncoder(&buf).Encode(data)
r.Data = buf.Bytes()
return
}
func (r *Resp) GetData(data any) (err error) {
@ -173,48 +171,19 @@ func (r *Resp) GetData(data any) (err error) {
return
}
log.Trace().Msgf("unmarshal %v", string(r.Data))
err = json.Unmarshal(r.Data, &data)
return
buf := bytes.NewBuffer(r.Data)
return gob.NewDecoder(buf).Decode(data)
}
func (r *Resp) Encode(conn net.Conn) {
func (r *Resp) Encode(conn net.Conn) (err error) {
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
r.Error = fmt.Sprintf("%v", r.Err)
}
log.Debug().Msgf("encode %v", r.UUID)
err := json.NewEncoder(conn).Encode(r)
if err != nil {
log.Fatal().Msgf("encode %v", r.UUID)
}
return gob.NewEncoder(conn).Encode(r)
}
func (r *Resp) Decode(conn net.Conn) (err error) {
err = json.NewDecoder(conn).Decode(r)
err = gob.NewDecoder(conn).Decode(r)
r.Err = ErrInvalid
return
}
func (r *Resp) Marshal() (bytes []byte) {
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
r.Error = fmt.Sprintf("%v", r.Err)
}
return Marshal(r)
}
func (Resp) Unmarshal(data []byte) (r Resp, err error) {
err = json.Unmarshal(data, &r)
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
r.Err = ErrInvalid
return
}
func Marshal(data any) (bytes []byte) {
bytes, err := json.Marshal(data)
if err != nil {
log.Fatal().Err(err).Msgf("marshal %v", data)
}
log.Trace().Msgf("marshal %v", string(bytes))
return
}

View File

@ -1,49 +0,0 @@
package api
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestReq(t *testing.T) {
req := Req{}
req.Command = ListRepos
req.SetData(&Job{ID: 999, RepoName: "test"})
bytes := req.Marshal()
req2, err := Req{}.Unmarshal(bytes)
assert.Nil(t, err)
assert.Equal(t, req, req2)
job := Job{}
err = req2.GetData(&job)
assert.Nil(t, err)
assert.Equal(t, req2.Type, "*api.Job")
}
func TestResp(t *testing.T) {
resp := Resp{}
resp.Error = "abracadabra"
resp.SetData(&[]Repo{{}, {}})
bytes := resp.Marshal()
resp2, err := Resp{}.Unmarshal(bytes)
assert.Nil(t, err)
resp2.Err = nil // non-marshallable
assert.Equal(t, resp, resp2)
var repos []Repo
err = resp2.GetData(&repos)
assert.Nil(t, err)
assert.Equal(t, resp2.Type, "*[]api.Repo")
}

View File

@ -7,6 +7,7 @@ package cmd
import (
"encoding/json"
"fmt"
"time"
"github.com/rs/zerolog/log"
@ -28,10 +29,11 @@ type DaemonJobCmd struct {
}
type DaemonJobsListCmd struct {
Group string `help:"group uuid"`
Repo string `help:"repo name"`
Commit string `help:"commit sha"`
Status string `help:"job status"`
Group string `help:"group uuid"`
Repo string `help:"repo name"`
Commit string `help:"commit sha"`
Status string `help:"job status"`
After time.Time `help:"updated after" format:"2006-01-02 15:04:05"`
}
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
@ -44,6 +46,10 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
Status: api.Status(cmd.Status),
}
if !cmd.After.IsZero() {
params.UpdatedAfter = cmd.After.Unix()
}
jobs, err := c.Jobs(params)
if err != nil {
log.Error().Err(err).Msg("")

View File

@ -10,6 +10,7 @@ import (
"os/exec"
"path/filepath"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid"
@ -103,7 +104,7 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
return
}
jobs, err := db.Jobs(e.DB)
jobs, err := db.Jobs(e.DB, "updated >= ?", params.UpdatedAfter)
if err != nil {
return
}
@ -139,6 +140,8 @@ func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
job.GenUUID()
job.Created = time.Now()
var repos []api.Repo
repos, err = db.Repos(e.DB)
if err != nil {

View File

@ -62,10 +62,12 @@ func (d *Daemon) Daemon() {
swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start")
first := true
for !d.shutdown {
d.wg.Add(1)
jobs, err := db.Jobs(d.db)
jobs, err := db.Jobs(d.db, "")
if err != nil && !d.shutdown {
log.Error().Err(err).Msg("")
d.wg.Done()
@ -80,6 +82,11 @@ func (d *Daemon) Daemon() {
pj := newJobProcessor(job, d.db)
if first && job.Status == api.StatusRunning {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting)
continue
@ -97,6 +104,8 @@ func (d *Daemon) Daemon() {
}(pj)
}
first = false
d.wg.Done()
time.Sleep(time.Second)
}

View File

@ -1,8 +1,10 @@
package db
import (
"bytes"
"database/sql"
"encoding/json"
"encoding/gob"
"time"
"code.dumpstack.io/tools/out-of-tree/api"
)
@ -11,33 +13,49 @@ func createJobTable(db *sql.DB) (err error) {
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY,
updated INT,
uuid TEXT,
group_uuid TEXT,
repo TEXT,
"commit" TEXT,
config TEXT,
target TEXT,
created INT,
started INT,
finished INT,
status TEXT DEFAULT "new"
)`)
return
}
func AddJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` +
`config, target) ` +
`VALUES ($1, $2, $3, $4, $5, $6);`)
stmt, err := db.Prepare(`INSERT INTO job (updated, uuid, group_uuid, repo, "commit", ` +
`config, target, created, started, finished) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`)
if err != nil {
return
}
defer stmt.Close()
config := api.Marshal(job.Artifact)
target := api.Marshal(job.Target)
var abuf bytes.Buffer
err = gob.NewEncoder(&abuf).Encode(job.Artifact)
if err != nil {
return
}
config := abuf.Bytes()
res, err := stmt.Exec(job.UUID, job.Group,
job.RepoName, job.Commit,
config, target,
var tbuf bytes.Buffer
err = gob.NewEncoder(&tbuf).Encode(job.Target)
if err != nil {
return
}
target := tbuf.Bytes()
res, err := stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, config, target,
job.Created.Unix(), job.Started.Unix(),
job.Finished.Unix(),
)
if err != nil {
return
@ -48,33 +66,83 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
}
func UpdateJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` +
`"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`)
stmt, err := db.Prepare(`UPDATE job ` +
`SET updated=$1, uuid=$2, group_uuid=$3, repo=$4, ` +
`"commit"=$5, config=$6, target=$7, ` +
`created=$8, started=$9, finished=$10, ` +
`status=$11 ` +
`WHERE id=$12`)
if err != nil {
return
}
defer stmt.Close()
config := api.Marshal(job.Artifact)
target := api.Marshal(job.Target)
var abuf bytes.Buffer
err = gob.NewEncoder(&abuf).Encode(job.Artifact)
if err != nil {
return
}
config := abuf.Bytes()
_, err = stmt.Exec(job.UUID, job.Group,
var tbuf bytes.Buffer
err = gob.NewEncoder(&tbuf).Encode(job.Target)
if err != nil {
return
}
target := tbuf.Bytes()
_, err = stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit,
config, target,
job.Status, job.ID)
job.Created.Unix(), job.Started.Unix(),
job.Finished.Unix(), job.Status, job.ID)
return
}
func Jobs(db *sql.DB) (jobs []api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` +
`config, target, status FROM job`)
func scanJob(scan func(dest ...any) error) (job api.Job, err error) {
var config, target []byte
var updated, created, started, finished int64
err = scan(&job.ID, &updated, &job.UUID, &job.Group,
&job.RepoName, &job.Commit, &config, &target,
&created, &started, &finished, &job.Status)
if err != nil {
return
}
abuf := bytes.NewBuffer(config)
err = gob.NewDecoder(abuf).Decode(&job.Artifact)
if err != nil {
return
}
tbuf := bytes.NewBuffer(target)
err = gob.NewDecoder(tbuf).Decode(&job.Target)
if err != nil {
return
}
job.UpdatedAt = time.Unix(updated, 0)
job.Created = time.Unix(created, 0)
job.Started = time.Unix(started, 0)
job.Finished = time.Unix(finished, 0)
return
}
func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
q := `SELECT id, updated, uuid, group_uuid, ` +
`repo, "commit", config, target, created, ` +
`started, finished, status FROM job`
if len(where) != 0 {
q += ` WHERE ` + where
}
stmt, err := db.Prepare(q)
if err != nil {
return
}
defer stmt.Close()
rows, err := stmt.Query()
rows, err := stmt.Query(args...)
if err != nil {
return
}
@ -82,24 +150,10 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
for rows.Next() {
var job api.Job
var config, target []byte
err = rows.Scan(&job.ID, &job.UUID, &job.Group,
&job.RepoName, &job.Commit,
&config, &target, &job.Status)
job, err = scanJob(rows.Scan)
if err != nil {
return
}
err = json.Unmarshal(config, &job.Artifact)
if err != nil {
return
}
err = json.Unmarshal(target, &job.Target)
if err != nil {
return
}
jobs = append(jobs, job)
}
@ -107,23 +161,17 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
}
func Job(db *sql.DB, uuid string) (job api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` +
`repo, "commit", ` +
`config, target, status ` +
stmt, err := db.Prepare(`SELECT id, updated, uuid, ` +
`group_uuid, ` +
`repo, "commit", config, target, ` +
`created, started, finished, status ` +
`FROM job WHERE uuid=$1`)
if err != nil {
return
}
defer stmt.Close()
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
&job.Group, &job.RepoName, &job.Commit,
&job.Artifact, &job.Target, &job.Status)
if err != nil {
return
}
return
return scanJob(stmt.QueryRow(uuid).Scan)
}
func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) {

View File

@ -26,13 +26,25 @@ func TestJobTable(t *testing.T) {
job.Group = uuid.New().String()
job.Status = api.StatusSuccess
err = UpdateJob(db, &job)
assert.Nil(t, err)
jobs, err := Jobs(db)
jobs, err := Jobs(db, "")
assert.Nil(t, err)
assert.Equal(t, 1, len(jobs))
assert.Equal(t, job.Group, jobs[0].Group)
job, err = Job(db, job.UUID)
assert.Nil(t, err)
assert.Equal(t, api.StatusSuccess, job.Status)
st, err := JobStatus(db, job.UUID)
assert.Nil(t, err)
assert.Equal(t, job.Status, st)
}

View File

@ -7,6 +7,7 @@ import (
"os"
"os/exec"
"path/filepath"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
@ -76,7 +77,10 @@ func (pj *jobProcessor) Process(res *Resources) (err error) {
log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning)
pj.job.Started = time.Now()
defer func() {
pj.job.Finished = time.Now()
if err != nil {
pj.SetStatus(api.StatusFailure)
} else {