fix: fix race conditions

This commit is contained in:
Bruno Bernard 2025-11-12 13:09:38 +04:00
commit 234b7f81f8

View file

@ -28,6 +28,21 @@ func getCacheDir() string {
return filepath.Join(xdg.CacheHome, "bbrew")
}
type multiCloser []io.Closer
func (mc multiCloser) Close() error {
var errs []string
for _, c := range mc {
if err := c.Close(); err != nil {
errs = append(errs, err.Error())
}
}
if len(errs) > 0 {
return fmt.Errorf("errors while closing: %s", strings.Join(errs, "; "))
}
return nil
}
type BrewServiceInterface interface {
GetPrefixPath() (path string)
GetFormulae() (formulae *[]models.Formula)
@ -61,6 +76,8 @@ type BrewService struct {
// Unified package list
allPackages *[]models.Package
analyticsMutex sync.RWMutex
brewVersion string
prefixPath string
}
@ -214,75 +231,72 @@ func (s *BrewService) StreamPackages(forceDownload bool) (<-chan models.Package,
defer close(pkgChan)
defer close(errChan)
metadata := s.loadMetadataAsync()
var wg sync.WaitGroup
wg.Add(4)
if err := s.streamFormulae(forceDownload, metadata, pkgChan); err != nil {
errChan <- err
return
}
// Load analytics concurrently
go func() {
defer wg.Done()
_ = s.loadAnalytics()
}()
go func() {
defer wg.Done()
_ = s.loadCaskAnalytics()
}()
if err := s.streamCasks(forceDownload, metadata, pkgChan); err != nil {
errChan <- err
return
}
// Load installed packages concurrently
installedFormulae := make(map[string]models.Formula)
installedCasks := make(map[string]models.Cask)
var installedMutex sync.Mutex
metadata.wg.Wait()
go func() {
defer wg.Done()
if s.loadInstalled() == nil {
installedMutex.Lock()
for _, f := range *s.installed {
installedFormulae[f.Name] = f
}
installedMutex.Unlock()
}
}()
go func() {
defer wg.Done()
if s.loadInstalledCasks() == nil {
installedMutex.Lock()
for _, c := range *s.installedCasks {
installedCasks[c.Token] = c
}
installedMutex.Unlock()
}
}()
// Wait for all metadata to load before streaming
wg.Wait()
var streamWg sync.WaitGroup
streamWg.Add(2)
go func() {
defer streamWg.Done()
if err := s.streamFormulae(forceDownload, installedFormulae, pkgChan); err != nil {
errChan <- err
}
}()
go func() {
defer streamWg.Done()
if err := s.streamCasks(forceDownload, installedCasks, pkgChan); err != nil {
errChan <- err
}
}()
streamWg.Wait()
}()
return pkgChan, errChan
}
type packageMetadata struct {
formulae map[string]models.Formula
casks map[string]models.Cask
mu sync.Mutex
wg sync.WaitGroup
}
func (s *BrewService) loadMetadataAsync() *packageMetadata {
meta := &packageMetadata{
formulae: make(map[string]models.Formula),
casks: make(map[string]models.Cask),
}
meta.wg.Add(4)
go func() {
defer meta.wg.Done()
_ = s.loadAnalytics()
}()
go func() {
defer meta.wg.Done()
_ = s.loadCaskAnalytics()
}()
go func() {
defer meta.wg.Done()
if s.loadInstalled() == nil {
meta.mu.Lock()
for _, f := range *s.installed {
meta.formulae[f.Name] = f
}
meta.mu.Unlock()
}
}()
go func() {
defer meta.wg.Done()
if s.loadInstalledCasks() == nil {
meta.mu.Lock()
for _, c := range *s.installedCasks {
meta.casks[c.Token] = c
}
meta.mu.Unlock()
}
}()
return meta
}
func (s *BrewService) streamFormulae(forceDownload bool, meta *packageMetadata, pkgChan chan<- models.Package) error {
func (s *BrewService) streamFormulae(forceDownload bool, installed map[string]models.Formula, pkgChan chan<- models.Package) error {
reader, err := s.openReader(forceDownload, "formula.json", FormulaeAPIURL)
if err != nil {
return err
@ -290,7 +304,7 @@ func (s *BrewService) streamFormulae(forceDownload bool, meta *packageMetadata,
defer reader.Close()
dec := json.NewDecoder(reader)
if _, err := dec.Token(); err != nil {
if _, err := dec.Token(); err != nil { // Read opening bracket
return fmt.Errorf("failed to read JSON start: %w", err)
}
@ -300,21 +314,18 @@ func (s *BrewService) streamFormulae(forceDownload bool, meta *packageMetadata,
return fmt.Errorf("failed to decode formula: %w", err)
}
meta.mu.Lock()
if installed, exists := meta.formulae[formula.Name]; exists {
formula = installed
if inst, ok := installed[formula.Name]; ok {
formula = inst
}
pkg := models.NewPackageFromFormula(&formula)
s.enrichPackageWithAnalytics(&pkg, formula.Name, s.analytics)
meta.mu.Unlock()
pkgChan <- pkg
}
return nil
}
func (s *BrewService) streamCasks(forceDownload bool, meta *packageMetadata, pkgChan chan<- models.Package) error {
func (s *BrewService) streamCasks(forceDownload bool, installed map[string]models.Cask, pkgChan chan<- models.Package) error {
reader, err := s.openReader(forceDownload, "cask.json", CaskAPIURL)
if err != nil {
return err
@ -322,7 +333,7 @@ func (s *BrewService) streamCasks(forceDownload bool, meta *packageMetadata, pkg
defer reader.Close()
dec := json.NewDecoder(reader)
if _, err := dec.Token(); err != nil {
if _, err := dec.Token(); err != nil { // Read opening bracket
return fmt.Errorf("failed to read JSON start: %w", err)
}
@ -332,23 +343,20 @@ func (s *BrewService) streamCasks(forceDownload bool, meta *packageMetadata, pkg
return fmt.Errorf("failed to decode cask: %w", err)
}
meta.mu.Lock()
if installed, exists := meta.casks[cask.Token]; exists {
cask = installed
if inst, ok := installed[cask.Token]; ok {
cask = inst
}
pkg := models.NewPackageFromCask(&cask)
s.enrichPackageWithAnalytics(&pkg, cask.Token, s.caskAnalytics)
meta.mu.Unlock()
pkgChan <- pkg
}
return nil
}
func (s *BrewService) openReader(forceDownload bool, filename, url string) (io.ReadCloser, error) {
cacheFile := filepath.Join(getCacheDir(), filename)
if !forceDownload {
cacheFile := filepath.Join(getCacheDir(), filename)
if file, err := os.Open(cacheFile); err == nil {
return file, nil
}
@ -358,10 +366,32 @@ func (s *BrewService) openReader(forceDownload bool, filename, url string) (io.R
if err != nil {
return nil, fmt.Errorf("failed to fetch %s: %w", filename, err)
}
return resp.Body, nil
// Ensure cache directory exists
if err := os.MkdirAll(getCacheDir(), 0750); err != nil {
return resp.Body, fmt.Errorf("could not create cache directory: %w", err)
}
file, err := os.Create(cacheFile)
if err != nil {
return resp.Body, fmt.Errorf("could not create cache file: %w", err) // Return body if we can't cache
}
// Use a struct that holds both closers
type readCloser struct {
io.Reader
io.Closer
}
return readCloser{
Reader: io.TeeReader(resp.Body, file),
Closer: multiCloser{resp.Body, file},
}, nil
}
func (s *BrewService) enrichPackageWithAnalytics(pkg *models.Package, key string, analytics map[string]models.AnalyticsItem) {
s.analyticsMutex.RLock()
defer s.analyticsMutex.RUnlock()
if a, exists := analytics[key]; exists && a.Number > 0 {
downloads, _ := strconv.Atoi(strings.ReplaceAll(a.Count, ",", ""))
pkg.Analytics90dRank = a.Number
@ -583,7 +613,9 @@ func (s *BrewService) loadAnalytics() (err error) {
analyticsByFormula[f.Formula] = f
}
s.analyticsMutex.Lock()
s.analytics = analyticsByFormula
s.analyticsMutex.Unlock()
return nil
}
@ -610,7 +642,9 @@ func (s *BrewService) loadCaskAnalytics() (err error) {
}
}
s.analyticsMutex.Lock()
s.caskAnalytics = analyticsByCask
s.analyticsMutex.Unlock()
return nil
}