Skip to content

Commit

Permalink
chore: rename and better comments
Browse files Browse the repository at this point in the history
Signed-off-by: gfanton <[email protected]>
  • Loading branch information
gfanton committed Jan 24, 2024
1 parent d768534 commit b180849
Showing 1 changed file with 58 additions and 49 deletions.
107 changes: 58 additions & 49 deletions contribs/gnodev/pkg/watcher/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,15 @@ type PackageWatcher struct {
PackagesUpdate <-chan PackageUpdateList
Errors <-chan error

ctx context.Context
stop context.CancelFunc

logger log.Logger
watcher *fsnotify.Watcher
pkgs []string
ctx context.Context
stop context.CancelFunc
pkgsDir []string
emitter events.Emitter
}

type PackageUpdateList []events.PackageUpdate

func (pkgsu PackageUpdateList) PackagesPath() []string {
pkgs := make([]string, len(pkgsu))
for i, pkg := range pkgsu {
pkgs[i] = pkg.Package
}
return pkgs
}

func (pkgsu PackageUpdateList) FilesPath() []string {
files := make([]string, 0)
for _, pkg := range pkgsu {
files = append(files, pkg.Files...)
}
return files
}

func NewPackageWatcher(logger log.Logger, emitter events.Emitter) (*PackageWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand All @@ -53,7 +36,7 @@ func NewPackageWatcher(logger log.Logger, emitter events.Emitter) (*PackageWatch

ctx, cancel := context.WithCancel(context.Background())
p := &PackageWatcher{
pkgs: []string{},
pkgsDir: []string{},
logger: logger,
watcher: watcher,
ctx: ctx,
Expand All @@ -70,6 +53,10 @@ func (p *PackageWatcher) Stop() {
p.stop()
}

// AddPackages adds new packages to the watcher.
// Packages are sorted by their length in descending order to facilitate easier
// and more efficient matching with corresponding paths. The longest paths are
// compared first.
func (p *PackageWatcher) AddPackages(pkgs ...gnomod.Pkg) error {
for _, pkg := range pkgs {
dir := pkg.Dir
Expand All @@ -79,17 +66,17 @@ func (p *PackageWatcher) AddPackages(pkgs ...gnomod.Pkg) error {
return fmt.Errorf("unable to get absolute path of %q: %w", dir, err)
}

// Find the correct insertion point using sorting search
index := sort.Search(len(p.pkgs), func(i int) bool {
return len(p.pkgs[i]) <= len(dir)
// Use binary search to find the correct insertion point
index := sort.Search(len(p.pkgsDir), func(i int) bool {
return len(p.pkgsDir[i]) <= len(dir) // Longest paths first
})

// Check if the string already exists at the insertion point
if index < len(p.pkgs) && (p.pkgs)[index] == dir {
continue // Skip as it's a duplicate
// Check for duplicates at the insertion point to avoid redundancy
if index < len(p.pkgsDir) && p.pkgsDir[index] == dir {
continue // Skip
}

// Add the pakcage to the watcher
// Add the package to the watcher and handle any errors
if err := p.watcher.Add(abs); err != nil {
return fmt.Errorf("unable to watch %q: %w", pkg.Dir, err)
}
Expand All @@ -99,15 +86,14 @@ func (p *PackageWatcher) AddPackages(pkgs ...gnomod.Pkg) error {
}

func (p *PackageWatcher) startWatching() {
const timeout = time.Millisecond * 500

cerrs := make(chan error, 1)
const timeout = time.Millisecond * 500 // Debounce interval

cwatch := make(chan PackageUpdateList)
cErrors := make(chan error, 1)
cPkgUpdate := make(chan PackageUpdateList)

go func() {
defer close(cerrs)
defer close(cwatch)
defer close(cErrors)
defer close(cPkgUpdate)

var debounceTimer <-chan time.Time
var pathList = []string{}
Expand All @@ -120,50 +106,59 @@ func (p *PackageWatcher) startWatching() {
case watchErr := <-p.watcher.Errors:
err = fmt.Errorf("watch error: %w", watchErr)
case <-debounceTimer:
// Process and emit package updates after the debounce interval
updates := p.generatePackagesUpdateList(pathList)
for _, update := range updates {
p.logger.Error("packages update",
"pkg", update.Package,
"files", update.Files,
)
}
panic(fmt.Sprintf("%+v", updates))

cwatch <- updates

// Notify that we have some packages update
// Send updates
cPkgUpdate <- updates
p.emitter.Emit(events.NewPackagesUpdateEvent(updates))

// Reset pathList and debounceTimer
// Reset the path list and debounce timer
pathList = []string{}
debounceTimer = nil
case evt := <-p.watcher.Events:
// Only handle write operations
if evt.Op != fsnotify.Write {
continue
}

pathList = append(pathList, evt.Name)

// Set up the debounce timer
debounceTimer = time.After(timeout)
}
}

cerrs <- err
cErrors <- err // Send any final error to the channel
}()

p.PackagesUpdate = cwatch
p.Errors = cerrs
// Set update channels
p.PackagesUpdate = cPkgUpdate
p.Errors = cErrors
}

func (p *PackageWatcher) generatePackagesUpdateList(paths []string) PackageUpdateList {
pkgsUpdate := []events.PackageUpdate{}

mpkgs := map[string]*events.PackageUpdate{} // pkg -> update
mpkgs := map[string]*events.PackageUpdate{} // Pkg -> Update
for _, path := range paths {
for _, pkg := range p.pkgs {
for _, pkg := range p.pkgsDir {
// Check if the path is inside the package directory
if !strings.HasPrefix(pkg, path) {
continue
}

if len(pkg) == len(path) {
continue // Skip if pkg == path
}

// Accumulate file updates for each package
pkgu, ok := mpkgs[pkg]
if !ok {
pkgsUpdate = append(pkgsUpdate, events.PackageUpdate{
Expand All @@ -173,13 +168,27 @@ func (p *PackageWatcher) generatePackagesUpdateList(paths []string) PackageUpdat
pkgu = &pkgsUpdate[len(pkgsUpdate)-1]
}

if len(pkg) == len(path) {
continue
}

pkgu.Files = append(pkgu.Files, path)
}
}

return pkgsUpdate
}

type PackageUpdateList []events.PackageUpdate

func (pkgsu PackageUpdateList) PackagesPath() []string {
pkgs := make([]string, len(pkgsu))
for i, pkg := range pkgsu {
pkgs[i] = pkg.Package
}
return pkgs
}

func (pkgsu PackageUpdateList) FilesPath() []string {
files := make([]string, 0)
for _, pkg := range pkgsu {
files = append(files, pkg.Files...)
}
return files
}

0 comments on commit b180849

Please sign in to comment.