1
0

Compare commits

..

No commits in common. "fc193afe92674da716b650d89d77d2f8412c59d3" and "e633fd2e790bf550267aacd1bbbf542c93d507b4" have entirely different histories.

8 changed files with 163 additions and 165 deletions

View File

@ -1,18 +1,18 @@
package api package api
import ( import (
"bytes" "encoding/json"
"encoding/gob"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"reflect" "reflect"
"time"
"code.dumpstack.io/tools/out-of-tree/artifact" "code.dumpstack.io/tools/out-of-tree/artifact"
"code.dumpstack.io/tools/out-of-tree/distro" "code.dumpstack.io/tools/out-of-tree/distro"
"github.com/davecgh/go-spew/spew"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/rs/zerolog/log"
) )
var ErrInvalid = errors.New("") var ErrInvalid = errors.New("")
@ -46,8 +46,6 @@ const (
type Job struct { type Job struct {
ID int64 ID int64
UpdatedAt time.Time
// Job UUID // Job UUID
UUID string UUID string
// Group UUID // Group UUID
@ -59,10 +57,6 @@ type Job struct {
Artifact artifact.Artifact Artifact artifact.Artifact
Target distro.KernelInfo Target distro.KernelInfo
Created time.Time
Started time.Time
Finished time.Time
Status Status Status Status
} }
@ -83,8 +77,6 @@ type ListJobsParams struct {
// Status of the job // Status of the job
Status Status Status Status
UpdatedAfter int64
} }
type Repo struct { type Repo struct {
@ -105,12 +97,9 @@ type Req struct {
Data []byte Data []byte
} }
func (r *Req) SetData(data any) (err error) { func (r *Req) SetData(data any) {
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data)) r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
var buf bytes.Buffer r.Data = Marshal(data)
err = gob.NewEncoder(&buf).Encode(data)
r.Data = buf.Bytes()
return
} }
func (r *Req) GetData(data any) (err error) { func (r *Req) GetData(data any) (err error) {
@ -124,16 +113,32 @@ func (r *Req) GetData(data any) (err error) {
return return
} }
buf := bytes.NewBuffer(r.Data) log.Trace().Msgf("unmarshal %v", string(r.Data))
return gob.NewDecoder(buf).Decode(data) err = json.Unmarshal(r.Data, &data)
return
} }
func (r *Req) Encode(conn net.Conn) (err error) { func (r Req) Encode(conn net.Conn) {
return gob.NewEncoder(conn).Encode(r) log.Debug().Msgf("encode %v", r.Command)
err := json.NewEncoder(conn).Encode(&r)
if err != nil {
log.Fatal().Msgf("encode %v", r.Command)
}
} }
func (r *Req) Decode(conn net.Conn) (err error) { func (r *Req) Decode(conn net.Conn) (err error) {
return gob.NewDecoder(conn).Decode(r) err = json.NewDecoder(conn).Decode(r)
return
}
func (r Req) Marshal() (bytes []byte) {
return Marshal(r)
}
func (Req) Unmarshal(data []byte) (r Req, err error) {
err = json.Unmarshal(data, &r)
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
return
} }
type Resp struct { type Resp struct {
@ -152,12 +157,9 @@ func NewResp() (resp Resp) {
return return
} }
func (r *Resp) SetData(data any) (err error) { func (r *Resp) SetData(data any) {
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data)) r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
var buf bytes.Buffer r.Data = Marshal(data)
err = gob.NewEncoder(&buf).Encode(data)
r.Data = buf.Bytes()
return
} }
func (r *Resp) GetData(data any) (err error) { func (r *Resp) GetData(data any) (err error) {
@ -171,19 +173,48 @@ func (r *Resp) GetData(data any) (err error) {
return return
} }
buf := bytes.NewBuffer(r.Data) log.Trace().Msgf("unmarshal %v", string(r.Data))
return gob.NewDecoder(buf).Decode(data) err = json.Unmarshal(r.Data, &data)
return
} }
func (r *Resp) Encode(conn net.Conn) (err error) { func (r *Resp) Encode(conn net.Conn) {
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" { if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
r.Error = fmt.Sprintf("%v", r.Err) r.Error = fmt.Sprintf("%v", r.Err)
} }
return gob.NewEncoder(conn).Encode(r) log.Debug().Msgf("encode %v", r.UUID)
err := json.NewEncoder(conn).Encode(r)
if err != nil {
log.Fatal().Msgf("encode %v", r.UUID)
}
} }
func (r *Resp) Decode(conn net.Conn) (err error) { func (r *Resp) Decode(conn net.Conn) (err error) {
err = gob.NewDecoder(conn).Decode(r) err = json.NewDecoder(conn).Decode(r)
r.Err = ErrInvalid r.Err = ErrInvalid
return return
} }
func (r *Resp) Marshal() (bytes []byte) {
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
r.Error = fmt.Sprintf("%v", r.Err)
}
return Marshal(r)
}
func (Resp) Unmarshal(data []byte) (r Resp, err error) {
err = json.Unmarshal(data, &r)
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
r.Err = ErrInvalid
return
}
func Marshal(data any) (bytes []byte) {
bytes, err := json.Marshal(data)
if err != nil {
log.Fatal().Err(err).Msgf("marshal %v", data)
}
log.Trace().Msgf("marshal %v", string(bytes))
return
}

49
api/api_test.go Normal file
View File

@ -0,0 +1,49 @@
package api
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestReq(t *testing.T) {
req := Req{}
req.Command = ListRepos
req.SetData(&Job{ID: 999, RepoName: "test"})
bytes := req.Marshal()
req2, err := Req{}.Unmarshal(bytes)
assert.Nil(t, err)
assert.Equal(t, req, req2)
job := Job{}
err = req2.GetData(&job)
assert.Nil(t, err)
assert.Equal(t, req2.Type, "*api.Job")
}
func TestResp(t *testing.T) {
resp := Resp{}
resp.Error = "abracadabra"
resp.SetData(&[]Repo{{}, {}})
bytes := resp.Marshal()
resp2, err := Resp{}.Unmarshal(bytes)
assert.Nil(t, err)
resp2.Err = nil // non-marshallable
assert.Equal(t, resp, resp2)
var repos []Repo
err = resp2.GetData(&repos)
assert.Nil(t, err)
assert.Equal(t, resp2.Type, "*[]api.Repo")
}

View File

@ -7,7 +7,6 @@ package cmd
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"time"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -33,7 +32,6 @@ type DaemonJobsListCmd struct {
Repo string `help:"repo name"` Repo string `help:"repo name"`
Commit string `help:"commit sha"` Commit string `help:"commit sha"`
Status string `help:"job status"` Status string `help:"job status"`
After time.Time `help:"updated after" format:"2006-01-02 15:04:05"`
} }
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) { func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
@ -46,10 +44,6 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
Status: api.Status(cmd.Status), Status: api.Status(cmd.Status),
} }
if !cmd.After.IsZero() {
params.UpdatedAfter = cmd.After.Unix()
}
jobs, err := c.Jobs(params) jobs, err := c.Jobs(params)
if err != nil { if err != nil {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")

View File

@ -10,7 +10,6 @@ import (
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/google/uuid" "github.com/google/uuid"
@ -104,7 +103,7 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
return return
} }
jobs, err := db.Jobs(e.DB, "updated >= ?", params.UpdatedAfter) jobs, err := db.Jobs(e.DB)
if err != nil { if err != nil {
return return
} }
@ -140,8 +139,6 @@ func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
job.GenUUID() job.GenUUID()
job.Created = time.Now()
var repos []api.Repo var repos []api.Repo
repos, err = db.Repos(e.DB) repos, err = db.Repos(e.DB)
if err != nil { if err != nil {

View File

@ -62,12 +62,10 @@ func (d *Daemon) Daemon() {
swg := sizedwaitgroup.New(d.Threads) swg := sizedwaitgroup.New(d.Threads)
log.Info().Int("threads", d.Threads).Msg("start") log.Info().Int("threads", d.Threads).Msg("start")
first := true
for !d.shutdown { for !d.shutdown {
d.wg.Add(1) d.wg.Add(1)
jobs, err := db.Jobs(d.db, "") jobs, err := db.Jobs(d.db)
if err != nil && !d.shutdown { if err != nil && !d.shutdown {
log.Error().Err(err).Msg("") log.Error().Err(err).Msg("")
d.wg.Done() d.wg.Done()
@ -82,11 +80,6 @@ func (d *Daemon) Daemon() {
pj := newJobProcessor(job, d.db) pj := newJobProcessor(job, d.db)
if first && job.Status == api.StatusRunning {
pj.SetStatus(api.StatusWaiting)
continue
}
if job.Status == api.StatusNew { if job.Status == api.StatusNew {
pj.SetStatus(api.StatusWaiting) pj.SetStatus(api.StatusWaiting)
continue continue
@ -104,8 +97,6 @@ func (d *Daemon) Daemon() {
}(pj) }(pj)
} }
first = false
d.wg.Done() d.wg.Done()
time.Sleep(time.Second) time.Sleep(time.Second)
} }

View File

@ -1,10 +1,8 @@
package db package db
import ( import (
"bytes"
"database/sql" "database/sql"
"encoding/gob" "encoding/json"
"time"
"code.dumpstack.io/tools/out-of-tree/api" "code.dumpstack.io/tools/out-of-tree/api"
) )
@ -13,49 +11,33 @@ func createJobTable(db *sql.DB) (err error) {
_, err = db.Exec(` _, err = db.Exec(`
CREATE TABLE IF NOT EXISTS job ( CREATE TABLE IF NOT EXISTS job (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
updated INT,
uuid TEXT, uuid TEXT,
group_uuid TEXT, group_uuid TEXT,
repo TEXT, repo TEXT,
"commit" TEXT, "commit" TEXT,
config TEXT, config TEXT,
target TEXT, target TEXT,
created INT,
started INT,
finished INT,
status TEXT DEFAULT "new" status TEXT DEFAULT "new"
)`) )`)
return return
} }
func AddJob(db *sql.DB, job *api.Job) (err error) { func AddJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`INSERT INTO job (updated, uuid, group_uuid, repo, "commit", ` + stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` +
`config, target, created, started, finished) ` + `config, target) ` +
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`) `VALUES ($1, $2, $3, $4, $5, $6);`)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
var abuf bytes.Buffer config := api.Marshal(job.Artifact)
err = gob.NewEncoder(&abuf).Encode(job.Artifact) target := api.Marshal(job.Target)
if err != nil {
return
}
config := abuf.Bytes()
var tbuf bytes.Buffer res, err := stmt.Exec(job.UUID, job.Group,
err = gob.NewEncoder(&tbuf).Encode(job.Target) job.RepoName, job.Commit,
if err != nil { config, target,
return
}
target := tbuf.Bytes()
res, err := stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, config, target,
job.Created.Unix(), job.Started.Unix(),
job.Finished.Unix(),
) )
if err != nil { if err != nil {
return return
@ -66,83 +48,33 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
} }
func UpdateJob(db *sql.DB, job *api.Job) (err error) { func UpdateJob(db *sql.DB, job *api.Job) (err error) {
stmt, err := db.Prepare(`UPDATE job ` + stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` +
`SET updated=$1, uuid=$2, group_uuid=$3, repo=$4, ` + `"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`)
`"commit"=$5, config=$6, target=$7, ` +
`created=$8, started=$9, finished=$10, ` +
`status=$11 ` +
`WHERE id=$12`)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
var abuf bytes.Buffer config := api.Marshal(job.Artifact)
err = gob.NewEncoder(&abuf).Encode(job.Artifact) target := api.Marshal(job.Target)
if err != nil {
return
}
config := abuf.Bytes()
var tbuf bytes.Buffer _, err = stmt.Exec(job.UUID, job.Group,
err = gob.NewEncoder(&tbuf).Encode(job.Target)
if err != nil {
return
}
target := tbuf.Bytes()
_, err = stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
job.RepoName, job.Commit, job.RepoName, job.Commit,
config, target, config, target,
job.Created.Unix(), job.Started.Unix(), job.Status, job.ID)
job.Finished.Unix(), job.Status, job.ID)
return return
} }
func scanJob(scan func(dest ...any) error) (job api.Job, err error) { func Jobs(db *sql.DB) (jobs []api.Job, err error) {
var config, target []byte stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` +
var updated, created, started, finished int64 `config, target, status FROM job`)
err = scan(&job.ID, &updated, &job.UUID, &job.Group,
&job.RepoName, &job.Commit, &config, &target,
&created, &started, &finished, &job.Status)
if err != nil {
return
}
abuf := bytes.NewBuffer(config)
err = gob.NewDecoder(abuf).Decode(&job.Artifact)
if err != nil {
return
}
tbuf := bytes.NewBuffer(target)
err = gob.NewDecoder(tbuf).Decode(&job.Target)
if err != nil {
return
}
job.UpdatedAt = time.Unix(updated, 0)
job.Created = time.Unix(created, 0)
job.Started = time.Unix(started, 0)
job.Finished = time.Unix(finished, 0)
return
}
func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
q := `SELECT id, updated, uuid, group_uuid, ` +
`repo, "commit", config, target, created, ` +
`started, finished, status FROM job`
if len(where) != 0 {
q += ` WHERE ` + where
}
stmt, err := db.Prepare(q)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
rows, err := stmt.Query(args...) rows, err := stmt.Query()
if err != nil { if err != nil {
return return
} }
@ -150,10 +82,24 @@ func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
for rows.Next() { for rows.Next() {
var job api.Job var job api.Job
job, err = scanJob(rows.Scan) var config, target []byte
err = rows.Scan(&job.ID, &job.UUID, &job.Group,
&job.RepoName, &job.Commit,
&config, &target, &job.Status)
if err != nil { if err != nil {
return return
} }
err = json.Unmarshal(config, &job.Artifact)
if err != nil {
return
}
err = json.Unmarshal(target, &job.Target)
if err != nil {
return
}
jobs = append(jobs, job) jobs = append(jobs, job)
} }
@ -161,17 +107,23 @@ func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
} }
func Job(db *sql.DB, uuid string) (job api.Job, err error) { func Job(db *sql.DB, uuid string) (job api.Job, err error) {
stmt, err := db.Prepare(`SELECT id, updated, uuid, ` + stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` +
`group_uuid, ` + `repo, "commit", ` +
`repo, "commit", config, target, ` + `config, target, status ` +
`created, started, finished, status ` +
`FROM job WHERE uuid=$1`) `FROM job WHERE uuid=$1`)
if err != nil { if err != nil {
return return
} }
defer stmt.Close() defer stmt.Close()
return scanJob(stmt.QueryRow(uuid).Scan) err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
&job.Group, &job.RepoName, &job.Commit,
&job.Artifact, &job.Target, &job.Status)
if err != nil {
return
}
return
} }
func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) { func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) {

View File

@ -26,25 +26,13 @@ func TestJobTable(t *testing.T) {
job.Group = uuid.New().String() job.Group = uuid.New().String()
job.Status = api.StatusSuccess
err = UpdateJob(db, &job) err = UpdateJob(db, &job)
assert.Nil(t, err) assert.Nil(t, err)
jobs, err := Jobs(db, "") jobs, err := Jobs(db)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, 1, len(jobs)) assert.Equal(t, 1, len(jobs))
assert.Equal(t, job.Group, jobs[0].Group) assert.Equal(t, job.Group, jobs[0].Group)
job, err = Job(db, job.UUID)
assert.Nil(t, err)
assert.Equal(t, api.StatusSuccess, job.Status)
st, err := JobStatus(db, job.UUID)
assert.Nil(t, err)
assert.Equal(t, job.Status, st)
} }

View File

@ -7,7 +7,6 @@ import (
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -77,10 +76,7 @@ func (pj *jobProcessor) Process(res *Resources) (err error) {
log.Info().Msgf("process job %v", pj.job.UUID) log.Info().Msgf("process job %v", pj.job.UUID)
pj.SetStatus(api.StatusRunning) pj.SetStatus(api.StatusRunning)
pj.job.Started = time.Now()
defer func() { defer func() {
pj.job.Finished = time.Now()
if err != nil { if err != nil {
pj.SetStatus(api.StatusFailure) pj.SetStatus(api.StatusFailure)
} else { } else {