From ad30fff0b7dbce0e46995ab41d6f5beba341cbff Mon Sep 17 00:00:00 2001 From: demo Date: Tue, 26 May 2026 15:28:35 -0400 Subject: feat: design worker-pool webcrawler --- main.go | 106 ++++++++++++++++++++++++++++++++++------------------------------ 1 file changed, 56 insertions(+), 50 deletions(-) diff --git a/main.go b/main.go index c812a95..27d3e27 100644 --- a/main.go +++ b/main.go @@ -3,11 +3,11 @@ package main import ( - "context" "flag" "fmt" "log" "net/url" + "sync" ) func main() { @@ -38,76 +38,82 @@ func main() { log.Fatal(err) } - crawler(*startURL, *maxConcurrency, *maxURLs) - + pool(*startURL, *maxConcurrency, *maxURLs) } -func crawler(startURL url.URL, maxConcurrency, maxURLs int) { - worklist := make(chan []url.URL) +func pool(startURL url.URL, maxConcurrency, maxURLs int) { + var wg sync.WaitGroup + urls := make(chan url.URL) + outlets := make([]<-chan []url.URL, maxConcurrency) + + for i := range maxConcurrency { + outlets[i] = worker(&wg, i+1, urls) + } + + out := fanIn(outlets...) + go func() { - worklist <- []url.URL{startURL} + urls <- startURL }() - sem := make(chan struct{}, maxConcurrency) - ctx, cancel := context.WithCancel(context.Background()) - seen := make(map[url.URL]bool) - i := 1 - - // FIXME: unfortunately, this example leaks, but I don't know - // how to fix that yet. -loop: - for list := range worklist { - for _, u := range list { - if maxURLs > 0 && len(seen) == maxURLs { - break loop - } - + for batch := range out { + for _, u := range batch { if !seen[u] { - fmt.Printf("%d. %s\n", i, &u) - i++ seen[u] = true go func() { - sem <- struct{}{} - defer func() { <-sem }() - - more := getMoreURLs(ctx, u) - if len(more) > 0 { - worklist <- more - } + urls <- u }() + + fmt.Printf("(%d) %s\n", len(seen), &u) } } } - // We broke the range loop, meaning there should be no more - // pending getMoreURLs jobs anyway. - cancel() + wg.Wait() +} - // For now, print out some diagnostics that prove that there - // are still pending sends on the worklist channel. - for batch := range worklist { - fmt.Printf("%d\n", len(batch)) +func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { + out := make(chan []url.URL) - for _, u := range batch { - fmt.Printf("-- %s\n", &u) + wg.Go(func() { + defer func() { close(out) }() + + for u := range urls { + //fmt.Printf("(%d) starting %s\n", id, &u) + + doc, err := fetch(u) + if err != nil { + log.Print(err) + } + + batch := findURLs(u, doc) + // fmt.Printf("(%d) finished %s\n", id, &u) + + out <- batch } - } + }) + + return out } -func getMoreURLs(ctx context.Context, u url.URL) []url.URL { - select { - case <-ctx.Done(): - return nil - default: - } +func fanIn(chans ...<-chan []url.URL) <-chan []url.URL { + out := make(chan []url.URL) + var wg sync.WaitGroup - doc, err := fetch(u) - if err != nil { - log.Print(err) - return nil + for _, ch := range chans { + wg.Go(func() { + for batch := range ch { + out <- batch + } + }) } - return findURLs(u, doc) + go func() { + wg.Wait() + close(out) + }() + + return out } -- cgit v1.2.3