summaryrefslogtreecommitdiff
path: root/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'main.go')
-rw-r--r--main.go75
1 files changed, 38 insertions, 37 deletions
diff --git a/main.go b/main.go
index 6c5d77f..418e2ef 100644
--- a/main.go
+++ b/main.go
@@ -48,84 +48,85 @@ func main() {
func pool(startURL url.URL, maxConcurrency, maxURLs int) {
var wg sync.WaitGroup
- urls := make(chan *url.URL)
+ urls := make(chan url.URL)
- outlets := make([]<-chan *[]url.URL, maxConcurrency)
+ outlets := make([]<-chan []url.URL, maxConcurrency)
for i := range maxConcurrency {
outlets[i] = worker(&wg, i+1, urls)
}
batches := fanIn(outlets...)
- dedup := deduplicate(batches)
+ go deduplicate(batches, urls)
- urls <- &startURL
-
- for u := range dedup {
- fmt.Printf("%s\n", u)
- urls <- u
- }
+ urls <- startURL
fmt.Println("waiting")
wg.Wait()
fmt.Println("done")
}
-func deduplicate(batches <-chan *[]url.URL) <-chan *url.URL {
- out := make(chan *url.URL)
-
- go func() {
- defer close(out)
-
- seen := make(map[url.URL]bool)
- for batch := range batches {
- for _, u := range *batch {
- fmt.Println("(deduplicate) got url")
- if !seen[u] {
- seen[u] = true
- out <- &u
+func deduplicate(batches <-chan []url.URL, urls chan url.URL) {
+ seen := make(map[url.URL]bool)
+
+ for batch := range batches {
+ for _, u := range batch {
+ fmt.Println("(deduplicate) got url")
+ if !seen[u] {
+ seen[u] = true
+ fmt.Printf("(seen %d) %s\n", len(seen), &u)
+
+ // Sending to urls channel has to be
+ // out of phase with deduplicate,
+ // because of the way the pipeline is
+ // set up: each channel is immediately
+ // synchronized with the next,
+ // creating a transitive effect where
+ // urls channel is absurdly
+ // synchronized with itself. So we
+ // have to do it this way.
+ go func() {
+ urls <- u
fmt.Println("(deduplicate) finished url")
- }
+ }()
}
}
- }()
-
- return out
+ }
}
-func worker(wg *sync.WaitGroup, id int, urls <-chan *url.URL) <-chan *[]url.URL {
- out := make(chan *[]url.URL)
+func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL {
+ out := make(chan []url.URL)
wg.Go(func() {
defer close(out)
for u := range urls {
- fmt.Printf("(%d) got url\n", id)
+ fmt.Printf("(worker %d) got url\n", id)
- doc, err := fetch(*u)
+ doc, err := fetch(u)
if err != nil {
log.Print(err)
}
- batch := findURLs(*u, doc)
- out <- &batch
- fmt.Printf("(%d) finished url\n", id)
+ batch := findURLs(u, doc)
+ out <- batch
+ fmt.Printf("(worker %d) finished url\n", id)
}
})
return out
}
-func fanIn(chans ...<-chan *[]url.URL) <-chan *[]url.URL {
- out := make(chan *[]url.URL)
+func fanIn(chans ...<-chan []url.URL) <-chan []url.URL {
+ out := make(chan []url.URL)
var wg sync.WaitGroup
for i, ch := range chans {
wg.Go(func() {
for batch := range ch {
- fmt.Printf("(outlet %d) got batch\n", i)
+ fmt.Printf("(fanin %d) got batch\n", i+1)
out <- batch
- fmt.Printf("(outlet %d) finished batch\n", i)
+ fmt.Printf("(fanin %d) finished batch\n", i+1)
}
})
}