Compare commits
No commits in common. "fc193afe92674da716b650d89d77d2f8412c59d3" and "e633fd2e790bf550267aacd1bbbf542c93d507b4" have entirely different histories.
fc193afe92
...
e633fd2e79
93
api/api.go
93
api/api.go
@ -1,18 +1,18 @@
|
|||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"encoding/json"
|
||||||
"encoding/gob"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"reflect"
|
"reflect"
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.dumpstack.io/tools/out-of-tree/artifact"
|
"code.dumpstack.io/tools/out-of-tree/artifact"
|
||||||
"code.dumpstack.io/tools/out-of-tree/distro"
|
"code.dumpstack.io/tools/out-of-tree/distro"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrInvalid = errors.New("")
|
var ErrInvalid = errors.New("")
|
||||||
@ -46,8 +46,6 @@ const (
|
|||||||
type Job struct {
|
type Job struct {
|
||||||
ID int64
|
ID int64
|
||||||
|
|
||||||
UpdatedAt time.Time
|
|
||||||
|
|
||||||
// Job UUID
|
// Job UUID
|
||||||
UUID string
|
UUID string
|
||||||
// Group UUID
|
// Group UUID
|
||||||
@ -59,10 +57,6 @@ type Job struct {
|
|||||||
Artifact artifact.Artifact
|
Artifact artifact.Artifact
|
||||||
Target distro.KernelInfo
|
Target distro.KernelInfo
|
||||||
|
|
||||||
Created time.Time
|
|
||||||
Started time.Time
|
|
||||||
Finished time.Time
|
|
||||||
|
|
||||||
Status Status
|
Status Status
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,8 +77,6 @@ type ListJobsParams struct {
|
|||||||
|
|
||||||
// Status of the job
|
// Status of the job
|
||||||
Status Status
|
Status Status
|
||||||
|
|
||||||
UpdatedAfter int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Repo struct {
|
type Repo struct {
|
||||||
@ -105,12 +97,9 @@ type Req struct {
|
|||||||
Data []byte
|
Data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Req) SetData(data any) (err error) {
|
func (r *Req) SetData(data any) {
|
||||||
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
|
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
|
||||||
var buf bytes.Buffer
|
r.Data = Marshal(data)
|
||||||
err = gob.NewEncoder(&buf).Encode(data)
|
|
||||||
r.Data = buf.Bytes()
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Req) GetData(data any) (err error) {
|
func (r *Req) GetData(data any) (err error) {
|
||||||
@ -124,16 +113,32 @@ func (r *Req) GetData(data any) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := bytes.NewBuffer(r.Data)
|
log.Trace().Msgf("unmarshal %v", string(r.Data))
|
||||||
return gob.NewDecoder(buf).Decode(data)
|
err = json.Unmarshal(r.Data, &data)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Req) Encode(conn net.Conn) (err error) {
|
func (r Req) Encode(conn net.Conn) {
|
||||||
return gob.NewEncoder(conn).Encode(r)
|
log.Debug().Msgf("encode %v", r.Command)
|
||||||
|
err := json.NewEncoder(conn).Encode(&r)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Msgf("encode %v", r.Command)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Req) Decode(conn net.Conn) (err error) {
|
func (r *Req) Decode(conn net.Conn) (err error) {
|
||||||
return gob.NewDecoder(conn).Decode(r)
|
err = json.NewDecoder(conn).Decode(r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r Req) Marshal() (bytes []byte) {
|
||||||
|
return Marshal(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (Req) Unmarshal(data []byte) (r Req, err error) {
|
||||||
|
err = json.Unmarshal(data, &r)
|
||||||
|
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
type Resp struct {
|
type Resp struct {
|
||||||
@ -152,12 +157,9 @@ func NewResp() (resp Resp) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resp) SetData(data any) (err error) {
|
func (r *Resp) SetData(data any) {
|
||||||
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
|
r.Type = fmt.Sprintf("%v", reflect.TypeOf(data))
|
||||||
var buf bytes.Buffer
|
r.Data = Marshal(data)
|
||||||
err = gob.NewEncoder(&buf).Encode(data)
|
|
||||||
r.Data = buf.Bytes()
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resp) GetData(data any) (err error) {
|
func (r *Resp) GetData(data any) (err error) {
|
||||||
@ -171,19 +173,48 @@ func (r *Resp) GetData(data any) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := bytes.NewBuffer(r.Data)
|
log.Trace().Msgf("unmarshal %v", string(r.Data))
|
||||||
return gob.NewDecoder(buf).Decode(data)
|
err = json.Unmarshal(r.Data, &data)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resp) Encode(conn net.Conn) (err error) {
|
func (r *Resp) Encode(conn net.Conn) {
|
||||||
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
|
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
|
||||||
r.Error = fmt.Sprintf("%v", r.Err)
|
r.Error = fmt.Sprintf("%v", r.Err)
|
||||||
}
|
}
|
||||||
return gob.NewEncoder(conn).Encode(r)
|
log.Debug().Msgf("encode %v", r.UUID)
|
||||||
|
err := json.NewEncoder(conn).Encode(r)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Msgf("encode %v", r.UUID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Resp) Decode(conn net.Conn) (err error) {
|
func (r *Resp) Decode(conn net.Conn) (err error) {
|
||||||
err = gob.NewDecoder(conn).Decode(r)
|
err = json.NewDecoder(conn).Decode(r)
|
||||||
r.Err = ErrInvalid
|
r.Err = ErrInvalid
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Resp) Marshal() (bytes []byte) {
|
||||||
|
if r.Err != nil && r.Err != ErrInvalid && r.Error == "" {
|
||||||
|
r.Error = fmt.Sprintf("%v", r.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Marshal(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (Resp) Unmarshal(data []byte) (r Resp, err error) {
|
||||||
|
err = json.Unmarshal(data, &r)
|
||||||
|
log.Trace().Msgf("unmarshal %v", spew.Sdump(r))
|
||||||
|
r.Err = ErrInvalid
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func Marshal(data any) (bytes []byte) {
|
||||||
|
bytes, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal().Err(err).Msgf("marshal %v", data)
|
||||||
|
}
|
||||||
|
log.Trace().Msgf("marshal %v", string(bytes))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
49
api/api_test.go
Normal file
49
api/api_test.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestReq(t *testing.T) {
|
||||||
|
req := Req{}
|
||||||
|
|
||||||
|
req.Command = ListRepos
|
||||||
|
req.SetData(&Job{ID: 999, RepoName: "test"})
|
||||||
|
|
||||||
|
bytes := req.Marshal()
|
||||||
|
|
||||||
|
req2, err := Req{}.Unmarshal(bytes)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, req, req2)
|
||||||
|
|
||||||
|
job := Job{}
|
||||||
|
err = req2.GetData(&job)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, req2.Type, "*api.Job")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestResp(t *testing.T) {
|
||||||
|
resp := Resp{}
|
||||||
|
|
||||||
|
resp.Error = "abracadabra"
|
||||||
|
resp.SetData(&[]Repo{{}, {}})
|
||||||
|
|
||||||
|
bytes := resp.Marshal()
|
||||||
|
|
||||||
|
resp2, err := Resp{}.Unmarshal(bytes)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
resp2.Err = nil // non-marshallable
|
||||||
|
|
||||||
|
assert.Equal(t, resp, resp2)
|
||||||
|
|
||||||
|
var repos []Repo
|
||||||
|
err = resp2.GetData(&repos)
|
||||||
|
assert.Nil(t, err)
|
||||||
|
|
||||||
|
assert.Equal(t, resp2.Type, "*[]api.Repo")
|
||||||
|
}
|
@ -7,7 +7,6 @@ package cmd
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
|
|
||||||
@ -33,7 +32,6 @@ type DaemonJobsListCmd struct {
|
|||||||
Repo string `help:"repo name"`
|
Repo string `help:"repo name"`
|
||||||
Commit string `help:"commit sha"`
|
Commit string `help:"commit sha"`
|
||||||
Status string `help:"job status"`
|
Status string `help:"job status"`
|
||||||
After time.Time `help:"updated after" format:"2006-01-02 15:04:05"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
|
func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
|
||||||
@ -46,10 +44,6 @@ func (cmd *DaemonJobsListCmd) Run(dm *DaemonCmd, g *Globals) (err error) {
|
|||||||
Status: api.Status(cmd.Status),
|
Status: api.Status(cmd.Status),
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cmd.After.IsZero() {
|
|
||||||
params.UpdatedAfter = cmd.After.Unix()
|
|
||||||
}
|
|
||||||
|
|
||||||
jobs, err := c.Jobs(params)
|
jobs, err := c.Jobs(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error().Err(err).Msg("")
|
log.Error().Err(err).Msg("")
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
@ -104,7 +103,7 @@ func listJobs(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
jobs, err := db.Jobs(e.DB, "updated >= ?", params.UpdatedAfter)
|
jobs, err := db.Jobs(e.DB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -140,8 +139,6 @@ func addJob(req *api.Req, resp *api.Resp, e cmdenv) (err error) {
|
|||||||
|
|
||||||
job.GenUUID()
|
job.GenUUID()
|
||||||
|
|
||||||
job.Created = time.Now()
|
|
||||||
|
|
||||||
var repos []api.Repo
|
var repos []api.Repo
|
||||||
repos, err = db.Repos(e.DB)
|
repos, err = db.Repos(e.DB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -62,12 +62,10 @@ func (d *Daemon) Daemon() {
|
|||||||
swg := sizedwaitgroup.New(d.Threads)
|
swg := sizedwaitgroup.New(d.Threads)
|
||||||
log.Info().Int("threads", d.Threads).Msg("start")
|
log.Info().Int("threads", d.Threads).Msg("start")
|
||||||
|
|
||||||
first := true
|
|
||||||
|
|
||||||
for !d.shutdown {
|
for !d.shutdown {
|
||||||
d.wg.Add(1)
|
d.wg.Add(1)
|
||||||
|
|
||||||
jobs, err := db.Jobs(d.db, "")
|
jobs, err := db.Jobs(d.db)
|
||||||
if err != nil && !d.shutdown {
|
if err != nil && !d.shutdown {
|
||||||
log.Error().Err(err).Msg("")
|
log.Error().Err(err).Msg("")
|
||||||
d.wg.Done()
|
d.wg.Done()
|
||||||
@ -82,11 +80,6 @@ func (d *Daemon) Daemon() {
|
|||||||
|
|
||||||
pj := newJobProcessor(job, d.db)
|
pj := newJobProcessor(job, d.db)
|
||||||
|
|
||||||
if first && job.Status == api.StatusRunning {
|
|
||||||
pj.SetStatus(api.StatusWaiting)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if job.Status == api.StatusNew {
|
if job.Status == api.StatusNew {
|
||||||
pj.SetStatus(api.StatusWaiting)
|
pj.SetStatus(api.StatusWaiting)
|
||||||
continue
|
continue
|
||||||
@ -104,8 +97,6 @@ func (d *Daemon) Daemon() {
|
|||||||
}(pj)
|
}(pj)
|
||||||
}
|
}
|
||||||
|
|
||||||
first = false
|
|
||||||
|
|
||||||
d.wg.Done()
|
d.wg.Done()
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
|
138
daemon/db/job.go
138
daemon/db/job.go
@ -1,10 +1,8 @@
|
|||||||
package db
|
package db
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"encoding/gob"
|
"encoding/json"
|
||||||
"time"
|
|
||||||
|
|
||||||
"code.dumpstack.io/tools/out-of-tree/api"
|
"code.dumpstack.io/tools/out-of-tree/api"
|
||||||
)
|
)
|
||||||
@ -13,49 +11,33 @@ func createJobTable(db *sql.DB) (err error) {
|
|||||||
_, err = db.Exec(`
|
_, err = db.Exec(`
|
||||||
CREATE TABLE IF NOT EXISTS job (
|
CREATE TABLE IF NOT EXISTS job (
|
||||||
id INTEGER PRIMARY KEY,
|
id INTEGER PRIMARY KEY,
|
||||||
updated INT,
|
|
||||||
uuid TEXT,
|
uuid TEXT,
|
||||||
group_uuid TEXT,
|
group_uuid TEXT,
|
||||||
repo TEXT,
|
repo TEXT,
|
||||||
"commit" TEXT,
|
"commit" TEXT,
|
||||||
config TEXT,
|
config TEXT,
|
||||||
target TEXT,
|
target TEXT,
|
||||||
created INT,
|
|
||||||
started INT,
|
|
||||||
finished INT,
|
|
||||||
status TEXT DEFAULT "new"
|
status TEXT DEFAULT "new"
|
||||||
)`)
|
)`)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddJob(db *sql.DB, job *api.Job) (err error) {
|
func AddJob(db *sql.DB, job *api.Job) (err error) {
|
||||||
stmt, err := db.Prepare(`INSERT INTO job (updated, uuid, group_uuid, repo, "commit", ` +
|
stmt, err := db.Prepare(`INSERT INTO job (uuid, group_uuid, repo, "commit", ` +
|
||||||
`config, target, created, started, finished) ` +
|
`config, target) ` +
|
||||||
`VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);`)
|
`VALUES ($1, $2, $3, $4, $5, $6);`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
var abuf bytes.Buffer
|
config := api.Marshal(job.Artifact)
|
||||||
err = gob.NewEncoder(&abuf).Encode(job.Artifact)
|
target := api.Marshal(job.Target)
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
config := abuf.Bytes()
|
|
||||||
|
|
||||||
var tbuf bytes.Buffer
|
res, err := stmt.Exec(job.UUID, job.Group,
|
||||||
err = gob.NewEncoder(&tbuf).Encode(job.Target)
|
job.RepoName, job.Commit,
|
||||||
if err != nil {
|
config, target,
|
||||||
return
|
|
||||||
}
|
|
||||||
target := tbuf.Bytes()
|
|
||||||
|
|
||||||
res, err := stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
|
|
||||||
job.RepoName, job.Commit, config, target,
|
|
||||||
job.Created.Unix(), job.Started.Unix(),
|
|
||||||
job.Finished.Unix(),
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -66,83 +48,33 @@ func AddJob(db *sql.DB, job *api.Job) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func UpdateJob(db *sql.DB, job *api.Job) (err error) {
|
func UpdateJob(db *sql.DB, job *api.Job) (err error) {
|
||||||
stmt, err := db.Prepare(`UPDATE job ` +
|
stmt, err := db.Prepare(`UPDATE job SET uuid=$1, group_uuid=$2, repo=$3, ` +
|
||||||
`SET updated=$1, uuid=$2, group_uuid=$3, repo=$4, ` +
|
`"commit"=$4, config=$5, target=$6, status=$7 WHERE id=$8`)
|
||||||
`"commit"=$5, config=$6, target=$7, ` +
|
|
||||||
`created=$8, started=$9, finished=$10, ` +
|
|
||||||
`status=$11 ` +
|
|
||||||
`WHERE id=$12`)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
var abuf bytes.Buffer
|
config := api.Marshal(job.Artifact)
|
||||||
err = gob.NewEncoder(&abuf).Encode(job.Artifact)
|
target := api.Marshal(job.Target)
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
config := abuf.Bytes()
|
|
||||||
|
|
||||||
var tbuf bytes.Buffer
|
_, err = stmt.Exec(job.UUID, job.Group,
|
||||||
err = gob.NewEncoder(&tbuf).Encode(job.Target)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
target := tbuf.Bytes()
|
|
||||||
|
|
||||||
_, err = stmt.Exec(time.Now().Unix(), job.UUID, job.Group,
|
|
||||||
job.RepoName, job.Commit,
|
job.RepoName, job.Commit,
|
||||||
config, target,
|
config, target,
|
||||||
job.Created.Unix(), job.Started.Unix(),
|
job.Status, job.ID)
|
||||||
job.Finished.Unix(), job.Status, job.ID)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func scanJob(scan func(dest ...any) error) (job api.Job, err error) {
|
func Jobs(db *sql.DB) (jobs []api.Job, err error) {
|
||||||
var config, target []byte
|
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, repo, "commit", ` +
|
||||||
var updated, created, started, finished int64
|
`config, target, status FROM job`)
|
||||||
err = scan(&job.ID, &updated, &job.UUID, &job.Group,
|
|
||||||
&job.RepoName, &job.Commit, &config, &target,
|
|
||||||
&created, &started, &finished, &job.Status)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
abuf := bytes.NewBuffer(config)
|
|
||||||
err = gob.NewDecoder(abuf).Decode(&job.Artifact)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tbuf := bytes.NewBuffer(target)
|
|
||||||
err = gob.NewDecoder(tbuf).Decode(&job.Target)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
job.UpdatedAt = time.Unix(updated, 0)
|
|
||||||
job.Created = time.Unix(created, 0)
|
|
||||||
job.Started = time.Unix(started, 0)
|
|
||||||
job.Finished = time.Unix(finished, 0)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
|
|
||||||
q := `SELECT id, updated, uuid, group_uuid, ` +
|
|
||||||
`repo, "commit", config, target, created, ` +
|
|
||||||
`started, finished, status FROM job`
|
|
||||||
if len(where) != 0 {
|
|
||||||
q += ` WHERE ` + where
|
|
||||||
}
|
|
||||||
stmt, err := db.Prepare(q)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
rows, err := stmt.Query(args...)
|
rows, err := stmt.Query()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -150,10 +82,24 @@ func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
|
|||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var job api.Job
|
var job api.Job
|
||||||
job, err = scanJob(rows.Scan)
|
var config, target []byte
|
||||||
|
err = rows.Scan(&job.ID, &job.UUID, &job.Group,
|
||||||
|
&job.RepoName, &job.Commit,
|
||||||
|
&config, &target, &job.Status)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal(config, &job.Artifact)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.Unmarshal(target, &job.Target)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
jobs = append(jobs, job)
|
jobs = append(jobs, job)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,17 +107,23 @@ func Jobs(db *sql.DB, where string, args ...any) (jobs []api.Job, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func Job(db *sql.DB, uuid string) (job api.Job, err error) {
|
func Job(db *sql.DB, uuid string) (job api.Job, err error) {
|
||||||
stmt, err := db.Prepare(`SELECT id, updated, uuid, ` +
|
stmt, err := db.Prepare(`SELECT id, uuid, group_uuid, ` +
|
||||||
`group_uuid, ` +
|
`repo, "commit", ` +
|
||||||
`repo, "commit", config, target, ` +
|
`config, target, status ` +
|
||||||
`created, started, finished, status ` +
|
|
||||||
`FROM job WHERE uuid=$1`)
|
`FROM job WHERE uuid=$1`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer stmt.Close()
|
defer stmt.Close()
|
||||||
|
|
||||||
return scanJob(stmt.QueryRow(uuid).Scan)
|
err = stmt.QueryRow(uuid).Scan(&job.ID, &job.UUID,
|
||||||
|
&job.Group, &job.RepoName, &job.Commit,
|
||||||
|
&job.Artifact, &job.Target, &job.Status)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) {
|
func JobStatus(db *sql.DB, uuid string) (st api.Status, err error) {
|
||||||
|
@ -26,25 +26,13 @@ func TestJobTable(t *testing.T) {
|
|||||||
|
|
||||||
job.Group = uuid.New().String()
|
job.Group = uuid.New().String()
|
||||||
|
|
||||||
job.Status = api.StatusSuccess
|
|
||||||
|
|
||||||
err = UpdateJob(db, &job)
|
err = UpdateJob(db, &job)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
jobs, err := Jobs(db, "")
|
jobs, err := Jobs(db)
|
||||||
assert.Nil(t, err)
|
assert.Nil(t, err)
|
||||||
|
|
||||||
assert.Equal(t, 1, len(jobs))
|
assert.Equal(t, 1, len(jobs))
|
||||||
|
|
||||||
assert.Equal(t, job.Group, jobs[0].Group)
|
assert.Equal(t, job.Group, jobs[0].Group)
|
||||||
|
|
||||||
job, err = Job(db, job.UUID)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, api.StatusSuccess, job.Status)
|
|
||||||
|
|
||||||
st, err := JobStatus(db, job.UUID)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
|
|
||||||
assert.Equal(t, job.Status, st)
|
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
@ -77,10 +76,7 @@ func (pj *jobProcessor) Process(res *Resources) (err error) {
|
|||||||
log.Info().Msgf("process job %v", pj.job.UUID)
|
log.Info().Msgf("process job %v", pj.job.UUID)
|
||||||
|
|
||||||
pj.SetStatus(api.StatusRunning)
|
pj.SetStatus(api.StatusRunning)
|
||||||
pj.job.Started = time.Now()
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
pj.job.Finished = time.Now()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
pj.SetStatus(api.StatusFailure)
|
pj.SetStatus(api.StatusFailure)
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
Reference in New Issue
Block a user