1
0

feat: parallel download of deb packages

This commit is contained in:
dump_stack() 2023-05-17 09:59:31 +00:00
parent 72f52d3200
commit a68ceacb43
Signed by: dump_stack
GPG Key ID: BE44DA8C062D87DC
2 changed files with 79 additions and 49 deletions

View File

@ -68,9 +68,7 @@ jobs:
def get_kernels() -> bool: def get_kernels() -> bool:
status, output = getstatusoutput( status, output = getstatusoutput(
"./out-of-tree --log-level=warn " "./out-of-tree distro debian fetch --max=16"
"distro debian get-deb "
"--ignore-cached --max=16"
) )
logging.info(output) logging.info(output)
return status == 0 return status == 0

122
distro.go
View File

@ -1,11 +1,14 @@
package main package main
import ( import (
"context"
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"time"
"github.com/cavaliergopher/grab/v3" "github.com/cavaliergopher/grab/v3"
"github.com/remeh/sizedwaitgroup"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"code.dumpstack.io/tools/out-of-tree/cache" "code.dumpstack.io/tools/out-of-tree/cache"
@ -21,7 +24,7 @@ type DistroCmd struct {
type DebianCmd struct { type DebianCmd struct {
Cache DebianCacheCmd `cmd:"" help:"populate cache"` Cache DebianCacheCmd `cmd:"" help:"populate cache"`
GetDeb DebianGetDebCmd `cmd:"" help:"download deb packages"` Fetch DebianFetchCmd `cmd:"" help:"download deb packages"`
} }
type DebianCacheCmd struct { type DebianCacheCmd struct {
@ -47,16 +50,81 @@ func (cmd *DebianCacheCmd) Run() (err error) {
return return
} }
type DebianGetDebCmd struct { type DebianFetchCmd struct {
Path string `help:"path to download directory" type:"existingdir" default:"./"` Path string `help:"path to download directory" type:"existingdir" default:"./"`
Regexp string `help:"match deb pkg names by regexp" default:".*"` Regexp string `help:"match deb pkg names by regexp" default:".*"`
IgnoreCached bool `help:"ignore packages found on remote mirror"` IgnoreMirror bool `help:"ignore check if packages on the mirror"`
Max int `help:"do not download more than X" default:"100500"` Max int `help:"do not download more than X" default:"100500"`
Threads int `help:"parallel download threads" default:"8"`
Timeout time.Duration `help:"timeout for each download" default:"1m"`
swg sizedwaitgroup.SizedWaitGroup
hasResults bool
} }
func (cmd DebianGetDebCmd) Run() (err error) { func (cmd *DebianFetchCmd) fetch(pkg snapshot.Package) {
flog := log.With().
Str("pkg", pkg.Deb.Name).
Logger()
defer cmd.swg.Done()
if !cmd.IgnoreMirror {
flog.Debug().Msg("check mirror")
found, _ := cache.PackageURL(config.Debian, pkg.Deb.URL)
if found {
flog.Info().Msg("found on the mirror")
return
}
}
target := filepath.Join(cmd.Path, filepath.Base(pkg.Deb.URL))
if fs.PathExists(target) {
flog.Debug().Msg("already exists")
return
}
tmp, err := os.MkdirTemp(cmd.Path, "tmp-")
if err != nil {
flog.Fatal().Err(err).Msg("mkdir")
return
}
defer os.RemoveAll(tmp)
flog.Info().Msg("fetch")
flog.Debug().Msg(pkg.Deb.URL)
ctx, cancel := context.WithTimeout(context.Background(), cmd.Timeout)
defer cancel()
req, err := grab.NewRequest(tmp, pkg.Deb.URL)
if err != nil {
flog.Warn().Err(err).Msg("cannot create request")
return
}
req = req.WithContext(ctx)
resp := grab.DefaultClient.Do(req)
if err := resp.Err(); err != nil {
flog.Warn().Err(err).Msg("request cancelled")
return
}
err = os.Rename(resp.Filename, target)
if err != nil {
flog.Fatal().Err(err).Msg("mv")
}
cmd.hasResults = true
cmd.Max--
}
func (cmd *DebianFetchCmd) Run() (err error) {
re, err := regexp.Compile(cmd.Regexp) re, err := regexp.Compile(cmd.Regexp)
if err != nil { if err != nil {
log.Fatal().Err(err).Msg("regexp") log.Fatal().Err(err).Msg("regexp")
@ -79,54 +147,18 @@ func (cmd DebianGetDebCmd) Run() (err error) {
} }
} }
tmp, err := os.MkdirTemp(cmd.Path, "tmp-") cmd.swg = sizedwaitgroup.New(cmd.Threads)
if err != nil {
return
}
defer os.RemoveAll(tmp)
hasresults := false
for _, pkg := range packages { for _, pkg := range packages {
if cmd.Max <= 0 { if cmd.Max <= 0 {
break break
} }
if cmd.IgnoreCached { cmd.swg.Add()
log.Debug().Msgf("check cache for %s", pkg.Deb.Name) go cmd.fetch(pkg)
found, _ := cache.PackageURL(config.Debian, pkg.Deb.URL)
if found {
log.Debug().Msgf("%s already cached", pkg.Deb.Name)
continue
}
} }
cmd.swg.Wait()
target := filepath.Join(cmd.Path, filepath.Base(pkg.Deb.URL)) if !cmd.hasResults {
if fs.PathExists(target) {
log.Info().Msgf("%s already exists", pkg.Deb.URL)
continue
}
log.Info().Msgf("downloading %s", pkg.Deb.URL)
resp, err := grab.Get(tmp, pkg.Deb.URL)
if err != nil {
err = nil
log.Warn().Err(err).Msg("download")
continue
}
err = os.Rename(resp.Filename, target)
if err != nil {
log.Fatal().Err(err).Msg("mv")
}
hasresults = true
cmd.Max--
}
if !hasresults {
log.Fatal().Msg("no packages found to download") log.Fatal().Msg("no packages found to download")
} }
return return