From fb1bdcc58470ee7c56586cc604591f850ecd59ad Mon Sep 17 00:00:00 2001 From: demo Date: Tue, 26 May 2026 17:07:35 -0400 Subject: feat: restore original "print 45 and hang" behavior --- main.go | 75 +++++++++++++++++++++++++++++++++-------------------------------- 1 file changed, 38 insertions(+), 37 deletions(-) (limited to 'main.go') diff --git a/main.go b/main.go index 6c5d77f..418e2ef 100644 --- a/main.go +++ b/main.go @@ -48,84 +48,85 @@ func main() { func pool(startURL url.URL, maxConcurrency, maxURLs int) { var wg sync.WaitGroup - urls := make(chan *url.URL) + urls := make(chan url.URL) - outlets := make([]<-chan *[]url.URL, maxConcurrency) + outlets := make([]<-chan []url.URL, maxConcurrency) for i := range maxConcurrency { outlets[i] = worker(&wg, i+1, urls) } batches := fanIn(outlets...) - dedup := deduplicate(batches) + go deduplicate(batches, urls) - urls <- &startURL - - for u := range dedup { - fmt.Printf("%s\n", u) - urls <- u - } + urls <- startURL fmt.Println("waiting") wg.Wait() fmt.Println("done") } -func deduplicate(batches <-chan *[]url.URL) <-chan *url.URL { - out := make(chan *url.URL) - - go func() { - defer close(out) - - seen := make(map[url.URL]bool) - for batch := range batches { - for _, u := range *batch { - fmt.Println("(deduplicate) got url") - if !seen[u] { - seen[u] = true - out <- &u +func deduplicate(batches <-chan []url.URL, urls chan url.URL) { + seen := make(map[url.URL]bool) + + for batch := range batches { + for _, u := range batch { + fmt.Println("(deduplicate) got url") + if !seen[u] { + seen[u] = true + fmt.Printf("(seen %d) %s\n", len(seen), &u) + + // Sending to urls channel has to be + // out of phase with deduplicate, + // because of the way the pipeline is + // set up: each channel is immediately + // synchronized with the next, + // creating a transitive effect where + // urls channel is absurdly + // synchronized with itself. So we + // have to do it this way. + go func() { + urls <- u fmt.Println("(deduplicate) finished url") - } + }() } } - }() - - return out + } } -func worker(wg *sync.WaitGroup, id int, urls <-chan *url.URL) <-chan *[]url.URL { - out := make(chan *[]url.URL) +func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { + out := make(chan []url.URL) wg.Go(func() { defer close(out) for u := range urls { - fmt.Printf("(%d) got url\n", id) + fmt.Printf("(worker %d) got url\n", id) - doc, err := fetch(*u) + doc, err := fetch(u) if err != nil { log.Print(err) } - batch := findURLs(*u, doc) - out <- &batch - fmt.Printf("(%d) finished url\n", id) + batch := findURLs(u, doc) + out <- batch + fmt.Printf("(worker %d) finished url\n", id) } }) return out } -func fanIn(chans ...<-chan *[]url.URL) <-chan *[]url.URL { - out := make(chan *[]url.URL) +func fanIn(chans ...<-chan []url.URL) <-chan []url.URL { + out := make(chan []url.URL) var wg sync.WaitGroup for i, ch := range chans { wg.Go(func() { for batch := range ch { - fmt.Printf("(outlet %d) got batch\n", i) + fmt.Printf("(fanin %d) got batch\n", i+1) out <- batch - fmt.Printf("(outlet %d) finished batch\n", i) + fmt.Printf("(fanin %d) finished batch\n", i+1) } }) } -- cgit v1.2.3