From a26c49208c571cc536e4be60db88bb12d8d5c213 Mon Sep 17 00:00:00 2001 From: demo Date: Tue, 26 May 2026 16:26:22 -0400 Subject: feat: reveal bug in the channel linkage topology --- main.go | 68 ++++++++++++++++++++++++++++++++++++----------------------------- 1 file changed, 38 insertions(+), 30 deletions(-) (limited to 'main.go') diff --git a/main.go b/main.go index 9c1a94d..bbd946a 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,6 @@ package main import ( - "context" "flag" "fmt" "log" @@ -50,46 +49,58 @@ func main() { func pool(startURL url.URL, maxConcurrency, maxURLs int) { var wg sync.WaitGroup urls := make(chan url.URL) + outlets := make([]<-chan []url.URL, maxConcurrency) - ctx, cancel := context.WithCancel(context.Background()) for i := range maxConcurrency { - outlets[i] = worker(ctx, &wg, i+1, urls) + outlets[i] = worker(&wg, i+1, urls) } - out := fanIn(outlets...) + batches := fanIn(outlets...) + dedup := deduplicate(batches) - go func() { - urls <- startURL - }() + urls <- startURL - seen := make(map[url.URL]bool) - for batch := range out { - for _, u := range batch { - if !seen[u] { - seen[u] = true + for u := range dedup { + fmt.Printf("%s\n", &u) + urls <- u + } + + fmt.Println("waiting") + wg.Wait() + fmt.Println("done") +} - go func() { - urls <- u - }() +func deduplicate(batches <-chan []url.URL) <-chan url.URL { + out := make(chan url.URL) - fmt.Printf("(%d) %s\n", len(seen), &u) + go func() { + defer close(out) + + seen := make(map[url.URL]bool) + for batch := range batches { + for _, u := range batch { + fmt.Println("(deduplicate) got url") + if !seen[u] { + seen[u] = true + out <- u + fmt.Println("(deduplicate) finished url") + } } } - } + }() - cancel() - wg.Wait() + return out } -func worker(ctx context.Context, wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { +func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { out := make(chan []url.URL) wg.Go(func() { - defer func() { close(out) }() + defer close(out) for u := range urls { - //fmt.Printf("(%d) starting %s\n", id, &u) + fmt.Printf("(%d) got url\n", id) doc, err := fetch(u) if err != nil { @@ -97,13 +108,8 @@ func worker(ctx context.Context, wg *sync.WaitGroup, id int, urls <-chan url.URL } batch := findURLs(u, doc) - // fmt.Printf("(%d) finished %s\n", id, &u) - - select { - case out <- batch: - case <-ctx.Done(): - return - } + out <- batch + fmt.Printf("(%d) finished url\n", id) } }) @@ -114,10 +120,12 @@ func fanIn(chans ...<-chan []url.URL) <-chan []url.URL { out := make(chan []url.URL) var wg sync.WaitGroup - for _, ch := range chans { + for i, ch := range chans { wg.Go(func() { for batch := range ch { + fmt.Printf("(outlet %d) got batch\n", i) out <- batch + fmt.Printf("(outlet %d) finished batch\n", i) } }) } -- cgit v1.2.3