1
0

Compare commits

...

3 Commits

8 changed files with 165 additions and 163 deletions

View File

@ -1,18 +1,18 @@
package api package api
import ( import (
"encoding/json" "bytes"
"encoding/gob"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"reflect" "reflect"
"time"
"code.dumpstack.io/tools/out-of-tree/artifact" "code.dumpstack.io/tools/out-of-tree/artifact"
"code.dumpstack.io/tools/out-of-tree/distro" "code.dumpstack.io/tools/out-of-tree/distro"
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/rs/zerolog/log"
) )
var ErrInvalid = errors.New("") var ErrInvalid = errors.New("")
@ -46,6 +46,8 @@ const (
type Job struct { type Job struct {
ID int64 ID int64
UpdatedAt time.Time
// Job UUID // Job UUID
UUID string UUID string
// Group UUID // Group UUID
@ -57,6 +59,10 @@ type Job struct {
Artifact artifact.Artifact Artifact artifact.Artifact
Target distro.KernelInfo Target distro.KernelInfo
Created time.Time
Started time.Time
Finished time.Time
Status Status Status Status
} }
@ -77,6 +83,8 @@ type ListJobsParams struct {
// Status of the job // Status of the job
Status Status Status Status
UpdatedAfter int64
} }
type Repo struct { type Repo struct {
@ -97,9 +105,12 @@ type Req struct {
Data []byte Data []byte
} }
func (r *Req) SetData(data any) { func (r *Req) SetData(data any) (err error) {
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data)) r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
r.Data = Marshal(data) var buf bytes.Buffer
err = gob.NewEncoder(&buf).Encode(data)
r.Data = buf.Bytes()
return
} }
func (r *Req) GetData(data any) (err error) { func (r *Req) GetData(data any) (err error) {
@ -113,32 +124,16 @@ func (r *Req) GetData(data any) (err error) {
return return
} }
log.Trace().Msgf("unmarshal %v", string(r.Data)) buf := bytes.NewBuffer(r.Data)
err = json.Unmarshal(r.Data, &data) return gob.NewDecoder(buf).Decode(data)
return
} }
func (r Req) Encode(conn net.Conn) { func (r *Req) Encode(conn net.Conn) (err error) {
log.Debug().Msgf("encode %v", r.Command) return gob.NewEncoder(conn).Encode(r)
err := json.NewEncoder(conn).Encode(&r)
if err != nil {
log.Fatal().Msgf("encode %v", r.Command)
}
} }
func (r *Req) Decode(conn net.Conn) (err error) { func (r *Req) Decode(conn net.Conn) (err error) {
err = json.NewDecoder(conn).Decode(r) return gob.NewDecoder(conn).Decode(r)
return
}
func (r Req) Marshal() (bytes []byte) {
return Marshal(r)
}
func (Req) Unmarshal(data []byte) (r Req, err error) {
err = json.Unmarshal(data, &r)
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
return
} }
type Resp struct { type Resp struct {
@ -157,9 +152,12 @@ func NewResp() (resp Resp) {
return return
} }
func (r *Resp) SetData(data any) { func (r *Resp) SetData(data any) (err error) {
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data)) r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
r.Data = Marshal(data) var buf bytes.Buffer
err = gob.NewEncoder(&buf).Encode(data)
r.Data = buf.Bytes()
return
} }
func (r *Resp) GetData(data any) (err error) { func (r *Resp) GetData(data any) (err error) {
@ -173,48 +171,19 @@ func (r *Resp) GetData(data any) (err error) {
return return
} }
log.Trace().Msgf("unmarshal %v", string(r.Data)) buf := bytes.NewBuffer(r.Data)
err = json.Unmarshal(r.Data, &data) return gob.NewDecoder(buf).Decode(data)
return
} }
func (r *Resp) Encode(conn net.Conn) { func (r *Resp) Encode(conn net.Conn) (err error) {
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" { if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
r.Error = fmt.Sprintf("%v", r.Err) r.Error = fmt.Sprintf("%v", r.Err)
} }
log.Debug().Msgf("encode %v", r.UUID) return gob.NewEncoder(conn).Encode(r)
err := json.NewEncoder(conn).Encode(r)
if err != nil {
log.Fatal().Msgf("encode %v", r.UUID)
}
} }
func (r *Resp) Decode(conn net.Conn) (err error) { func (r *Resp) Decode(conn net.Conn) (err error) {
err = json.NewDecoder(conn).Decode(r) err = gob.NewDecoder(conn).Decode(r)
r.Err = ErrInvalid r.Err = ErrInvalid
return return
} }
func (r *Resp) Marshal() (bytes []byte) {
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
r.Error = fmt.Sprintf("%v", r.Err)
}
return Marshal(r)
}
func (Resp) Unmarshal(data []byte) (r Resp, err error) {
err = json.Unmarshal(data, &r)
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
r.Err = ErrInvalid
return
}
func Marshal(data any) (bytes []byte) {
bytes, err := json.Marshal(data)
if err != nil {
log.Fatal().Err(err).Msgf("marshal %v", data)
}
log.Trace().Msgf("marshal %v", string(bytes))
return
}

View File

@ -1,49 +0,0 @@
package api
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestReq(t *testing.T) {
req := Req{}
req.Command = ListRepos
req.SetData(&Job{ID: 999, RepoName: "test"})
bytes := req.Marshal()
req2, err := Req{}.Unmarshal(bytes)
assert.Nil(t, err)
assert.Equal(t, req, req2)
job := Job{}
err = req2.GetData(&job)
assert.Nil(t, err)
assert.Equal(t, req2.Type, "*api.Job")
}
func TestResp(t *testing.T) {
resp := Resp{}
resp.Error = "abracadabra"
resp.SetData(&[]Repo{{}, {}})
bytes := resp.Marshal()
resp2, err := Resp{}.Unmarshal(bytes)
assert.Nil(t, err)
resp2.Err = nil // non-marshallable
assert.Equal(t, resp, resp2)
var repos []Repo
err = resp2.GetData(&repos)
assert.Nil(t, err)
assert.Equal(t, resp2.Type, "*[]api.Repo")
}

View File

@ -7,6 +7,7 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -28,10 +29,11 @@ type DaemonJobCmd struct {
} }
type DaemonJobsListCmd struct { type DaemonJobsListCmd struct {
Group string `help:"group uuid"` Group string `help:"group uuid"`
Repo string `help:"repo name"` Repo string `help:"repo name"`
Commit string `help:"commit sha"` Commit string `help:"commit sha"`
Status string `help:"job status"` Status string `help:"job status"`
After time.Time `help:"updated after" format:"2006-01-02 15:04:05"`
} }
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
@ -44,6 +46,10 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
Status: api.Status(cmd.Status), Status: api.Status(cmd.Status),
} }
if !cmd.After.IsZero() {
params.UpdatedAfter = cmd.After.Unix()
}
jobs, err := c.Jobs(params) jobs, err := c.Jobs(params)
if err != nil { if err != nil {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")

View File

@ -10,6 +10,7 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/google/uuid" "github.com/google/uuid"
@ -103,7 +104,7 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
return return
} }
jobs, err := db.Jobs(e.DB) jobs, err := db.Jobs(e.DB, "updated >= ?", params.UpdatedAfter)
if err != nil { if err != nil {
return return
} }
@ -139,6 +140,8 @@ func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
job.GenUUID() job.GenUUID()
job.Created = time.Now()
var repos []api.Repo var repos []api.Repo
repos, err = db.Repos(e.DB) repos, err = db.Repos(e.DB)
if err != nil { if err != nil {

View File

@ -62,10 +62,12 @@ func (d *Daemon) Daemon() {
swg := sizedwaitgroup.New(d.Threads) swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start") log.Info().Int("threads", d.Threads).Msg("start")
first := true
for !d.shutdown { for !d.shutdown {
d.wg.Add(1) d.wg.Add(1)
jobs, err := db.Jobs(d.db) jobs, err := db.Jobs(d.db, "")
if err != nil && !d.shutdown { if err != nil && !d.shutdown {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")
d.wg.Done() d.wg.Done()
@ -80,6 +82,11 @@ func (d *Daemon) Daemon() {
pj := newJobProcessor(job, d.db) pj := newJobProcessor(job, d.db)
if first && job.Status == api.StatusRunning {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status == api.StatusNew { if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting) pj.SetStatus(api.StatusWaiting)
continue continue
@ -97,6 +104,8 @@ func (d *Daemon) Daemon() {
}(pj) }(pj)
} }
first = false
d.wg.Done() d.wg.Done()
time.Sleep(time.Second) time.Sleep(time.Second)
} }

View File

@ -1,8 +1,10 @@
package db package db
import ( import (
"bytes"
"database/sql" "database/sql"
"encoding/json" "encoding/gob"
"time"
"code.dumpstack.io/tools/out-of-tree/api" "code.dumpstack.io/tools/out-of-tree/api"
) )
@ -11,33 +13,49 @@ func createJobTable(db *sql.DB) (err error) {
_, err = db.Exec(` _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS job ( CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
updated INT,
uuid TEXT, uuid TEXT,
group_uuid TEXT, group_uuid TEXT,
repo TEXT, repo TEXT,
"commit" TEXT, "commit" TEXT,
config TEXT, config TEXT,
target TEXT, target TEXT,
created INT,
started INT,
finished INT,
status TEXT DEFAULT "new" status TEXT DEFAULT "new"
)`) )`)
return return
} }
func AddJob(db *sql.DB, job *api.Job) (err error) { func AddJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` + stmt, err := db.Prepare(`INSERT INTO job (updated, uuid, group_uuid, repo, "commit", ` +
`config, target) ` + `config, target, created, started, finished) ` +
`VALUES ($1, $2, $3, $4, $5, $6);`) `VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
config := api.Marshal(job.Artifact) var abuf bytes.Buffer
target := api.Marshal(job.Target) err = gob.NewEncoder(&abuf).Encode(job.Artifact)
if err != nil {
return
}
config := abuf.Bytes()
res, err := stmt.Exec(job.UUID, job.Group, var tbuf bytes.Buffer
job.RepoName, job.Commit, err = gob.NewEncoder(&tbuf).Encode(job.Target)
config, target, if err != nil {
return
}
target := tbuf.Bytes()
res, err := stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, config, target,
job.Created.Unix(), job.Started.Unix(),
job.Finished.Unix(),
) )
if err != nil { if err != nil {
return return
@ -48,33 +66,83 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
} }
func UpdateJob(db *sql.DB, job *api.Job) (err error) { func UpdateJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` + stmt, err := db.Prepare(`UPDATE job ` +
`"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`) `SET updated=$1, uuid=$2, group_uuid=$3, repo=$4, ` +
`"commit"=$5, config=$6, target=$7, ` +
`created=$8, started=$9, finished=$10, ` +
`status=$11 ` +
`WHERE id=$12`)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
config := api.Marshal(job.Artifact) var abuf bytes.Buffer
target := api.Marshal(job.Target) err = gob.NewEncoder(&abuf).Encode(job.Artifact)
if err != nil {
return
}
config := abuf.Bytes()
_, err = stmt.Exec(job.UUID, job.Group, var tbuf bytes.Buffer
err = gob.NewEncoder(&tbuf).Encode(job.Target)
if err != nil {
return
}
target := tbuf.Bytes()
_, err = stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, job.RepoName, job.Commit,
config, target, config, target,
job.Status, job.ID) job.Created.Unix(), job.Started.Unix(),
job.Finished.Unix(), job.Status, job.ID)
return return
} }
func Jobs(db *sql.DB) (jobs []api.Job, err error) { func scanJob(scan func(dest ...any) error) (job api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` + var config, target []byte
`config, target, status FROM job`) var updated, created, started, finished int64
err = scan(&job.ID, &updated, &job.UUID, &job.Group,
&job.RepoName, &job.Commit, &config, &target,
&created, &started, &finished, &job.Status)
if err != nil {
return
}
abuf := bytes.NewBuffer(config)
err = gob.NewDecoder(abuf).Decode(&job.Artifact)
if err != nil {
return
}
tbuf := bytes.NewBuffer(target)
err = gob.NewDecoder(tbuf).Decode(&job.Target)
if err != nil {
return
}
job.UpdatedAt = time.Unix(updated, 0)
job.Created = time.Unix(created, 0)
job.Started = time.Unix(started, 0)
job.Finished = time.Unix(finished, 0)
return
}
func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
q := `SELECT id, updated, uuid, group_uuid, ` +
`repo, "commit", config, target, created, ` +
`started, finished, status FROM job`
if len(where) != 0 {
q += ` WHERE ` + where
}
stmt, err := db.Prepare(q)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
rows, err := stmt.Query() rows, err := stmt.Query(args...)
if err != nil { if err != nil {
return return
} }
@ -82,24 +150,10 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
for rows.Next() { for rows.Next() {
var job api.Job var job api.Job
var config, target []byte job, err = scanJob(rows.Scan)
err = rows.Scan(&job.ID, &job.UUID, &job.Group,
&job.RepoName, &job.Commit,
&config, &target, &job.Status)
if err != nil { if err != nil {
return return
} }
err = json.Unmarshal(config, &job.Artifact)
if err != nil {
return
}
err = json.Unmarshal(target, &job.Target)
if err != nil {
return
}
jobs = append(jobs, job) jobs = append(jobs, job)
} }
@ -107,23 +161,17 @@ func Jobs(db *sql.DB) (jobs []api.Job, err error) {
} }
func Job(db *sql.DB, uuid string) (job api.Job, err error) { func Job(db *sql.DB, uuid string) (job api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` + stmt, err := db.Prepare(`SELECT id, updated, uuid, ` +
`repo, "commit", ` + `group_uuid, ` +
`config, target, status ` + `repo, "commit", config, target, ` +
`created, started, finished, status ` +
`FROM job WHERE uuid=$1`) `FROM job WHERE uuid=$1`)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID, return scanJob(stmt.QueryRow(uuid).Scan)
&job.Group, &job.RepoName, &job.Commit,
&job.Artifact, &job.Target, &job.Status)
if err != nil {
return
}
return
} }
func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) { func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) {

View File

@ -26,13 +26,25 @@ func TestJobTable(t *testing.T) {
job.Group = uuid.New().String() job.Group = uuid.New().String()
job.Status = api.StatusSuccess
err = UpdateJob(db, &job) err = UpdateJob(db, &job)
assert.Nil(t, err) assert.Nil(t, err)
jobs, err := Jobs(db) jobs, err := Jobs(db, "")
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(jobs)) assert.Equal(t, 1, len(jobs))
assert.Equal(t, job.Group, jobs[0].Group) assert.Equal(t, job.Group, jobs[0].Group)
job, err = Job(db, job.UUID)
assert.Nil(t, err)
assert.Equal(t, api.StatusSuccess, job.Status)
st, err := JobStatus(db, job.UUID)
assert.Nil(t, err)
assert.Equal(t, job.Status, st)
} }

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -76,7 +77,10 @@ func (pj *jobProcessor) Process(res *Resources) (err error) {
log.Info().Msgf("process job %v", pj.job.UUID) log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning) pj.SetStatus(api.StatusRunning)
pj.job.Started = time.Now()
defer func() { defer func() {
pj.job.Finished = time.Now()
if err != nil { if err != nil {
pj.SetStatus(api.StatusFailure) pj.SetStatus(api.StatusFailure)
} else { } else {