summaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
authordemo <demo@antix1>2026-05-26 15:28:35 -0400
committerdemo <demo@antix1>2026-05-26 15:28:35 -0400
commitad30fff0b7dbce0e46995ab41d6f5beba341cbff (patch)
treedc1f1b67c0cd39c1dd1918caa864c977d1e22f81 /main.go
parent18e3030a0185992a2b531aa13fe57c40eedf395e (diff)
feat: design worker-pool webcrawler
Diffstat (limited to 'main.go')
-rw-r--r--main.go106
1 files changed, 56 insertions, 50 deletions
diff --git a/main.go b/main.go
index c812a95..27d3e27 100644
--- a/main.go
+++ b/main.go
@@ -3,11 +3,11 @@
package main
import (
- "context"
"flag"
"fmt"
"log"
"net/url"
+ "sync"
)
func main() {
@@ -38,76 +38,82 @@ func main() {
log.Fatal(err)
}
- crawler(*startURL, *maxConcurrency, *maxURLs)
-
+ pool(*startURL, *maxConcurrency, *maxURLs)
}
-func crawler(startURL url.URL, maxConcurrency, maxURLs int) {
- worklist := make(chan []url.URL)
+func pool(startURL url.URL, maxConcurrency, maxURLs int) {
+ var wg sync.WaitGroup
+ urls := make(chan url.URL)
+ outlets := make([]<-chan []url.URL, maxConcurrency)
+
+ for i := range maxConcurrency {
+ outlets[i] = worker(&wg, i+1, urls)
+ }
+
+ out := fanIn(outlets...)
+
go func() {
- worklist <- []url.URL{startURL}
+ urls <- startURL
}()
- sem := make(chan struct{}, maxConcurrency)
- ctx, cancel := context.WithCancel(context.Background())
-
seen := make(map[url.URL]bool)
- i := 1
-
- // FIXME: unfortunately, this example leaks, but I don't know
- // how to fix that yet.
-loop:
- for list := range worklist {
- for _, u := range list {
- if maxURLs > 0 && len(seen) == maxURLs {
- break loop
- }
-
+ for batch := range out {
+ for _, u := range batch {
if !seen[u] {
- fmt.Printf("%d. %s\n", i, &u)
- i++
seen[u] = true
go func() {
- sem <- struct{}{}
- defer func() { <-sem }()
-
- more := getMoreURLs(ctx, u)
- if len(more) > 0 {
- worklist <- more
- }
+ urls <- u
}()
+
+ fmt.Printf("(%d) %s\n", len(seen), &u)
}
}
}
- // We broke the range loop, meaning there should be no more
- // pending getMoreURLs jobs anyway.
- cancel()
+ wg.Wait()
+}
- // For now, print out some diagnostics that prove that there
- // are still pending sends on the worklist channel.
- for batch := range worklist {
- fmt.Printf("%d\n", len(batch))
+func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL {
+ out := make(chan []url.URL)
- for _, u := range batch {
- fmt.Printf("-- %s\n", &u)
+ wg.Go(func() {
+ defer func() { close(out) }()
+
+ for u := range urls {
+ //fmt.Printf("(%d) starting %s\n", id, &u)
+
+ doc, err := fetch(u)
+ if err != nil {
+ log.Print(err)
+ }
+
+ batch := findURLs(u, doc)
+ // fmt.Printf("(%d) finished %s\n", id, &u)
+
+ out <- batch
}
- }
+ })
+
+ return out
}
-func getMoreURLs(ctx context.Context, u url.URL) []url.URL {
- select {
- case <-ctx.Done():
- return nil
- default:
- }
+func fanIn(chans ...<-chan []url.URL) <-chan []url.URL {
+ out := make(chan []url.URL)
+ var wg sync.WaitGroup
- doc, err := fetch(u)
- if err != nil {
- log.Print(err)
- return nil
+ for _, ch := range chans {
+ wg.Go(func() {
+ for batch := range ch {
+ out <- batch
+ }
+ })
}
- return findURLs(u, doc)
+ go func() {
+ wg.Wait()
+ close(out)
+ }()
+
+ return out
}