summaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go68
1 files changed, 38 insertions, 30 deletions
diff --git a/main.go b/main.go
index 9c1a94d..bbd946a 100644
--- a/main.go
+++ b/main.go
@@ -3,7 +3,6 @@
package main
import (
- "context"
"flag"
"fmt"
"log"
@@ -50,46 +49,58 @@ func main() {
func pool(startURL url.URL, maxConcurrency, maxURLs int) {
var wg sync.WaitGroup
urls := make(chan url.URL)
+
outlets := make([]<-chan []url.URL, maxConcurrency)
- ctx, cancel := context.WithCancel(context.Background())
for i := range maxConcurrency {
- outlets[i] = worker(ctx, &wg, i+1, urls)
+ outlets[i] = worker(&wg, i+1, urls)
}
- out := fanIn(outlets...)
+ batches := fanIn(outlets...)
+ dedup := deduplicate(batches)
- go func() {
- urls <- startURL
- }()
+ urls <- startURL
- seen := make(map[url.URL]bool)
- for batch := range out {
- for _, u := range batch {
- if !seen[u] {
- seen[u] = true
+ for u := range dedup {
+ fmt.Printf("%s\n", &u)
+ urls <- u
+ }
+
+ fmt.Println("waiting")
+ wg.Wait()
+ fmt.Println("done")
+}
- go func() {
- urls <- u
- }()
+func deduplicate(batches <-chan []url.URL) <-chan url.URL {
+ out := make(chan url.URL)
- fmt.Printf("(%d) %s\n", len(seen), &u)
+ go func() {
+ defer close(out)
+
+ seen := make(map[url.URL]bool)
+ for batch := range batches {
+ for _, u := range batch {
+ fmt.Println("(deduplicate) got url")
+ if !seen[u] {
+ seen[u] = true
+ out <- u
+ fmt.Println("(deduplicate) finished url")
+ }
}
}
- }
+ }()
- cancel()
- wg.Wait()
+ return out
}
-func worker(ctx context.Context, wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL {
+func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL {
out := make(chan []url.URL)
wg.Go(func() {
- defer func() { close(out) }()
+ defer close(out)
for u := range urls {
- //fmt.Printf("(%d) starting %s\n", id, &u)
+ fmt.Printf("(%d) got url\n", id)
doc, err := fetch(u)
if err != nil {
@@ -97,13 +108,8 @@ func worker(ctx context.Context, wg *sync.WaitGroup, id int, urls <-chan url.URL
}
batch := findURLs(u, doc)
- // fmt.Printf("(%d) finished %s\n", id, &u)
-
- select {
- case out <- batch:
- case <-ctx.Done():
- return
- }
+ out <- batch
+ fmt.Printf("(%d) finished url\n", id)
}
})
@@ -114,10 +120,12 @@ func fanIn(chans ...<-chan []url.URL) <-chan []url.URL {
out := make(chan []url.URL)
var wg sync.WaitGroup
- for _, ch := range chans {
+ for i, ch := range chans {
wg.Go(func() {
for batch := range ch {
+ fmt.Printf("(outlet %d) got batch\n", i)
out <- batch
+ fmt.Printf("(outlet %d) finished batch\n", i)
}
})
}