diff options
Diffstat (limited to 'main.go')
| -rw-r--r-- | main.go | 99 |
1 files changed, 1 insertions, 98 deletions
@@ -9,7 +9,6 @@ import ( "net/url" "runtime/pprof" "strings" - "sync" "time" ) @@ -42,106 +41,10 @@ func main() { } getLeakProfile(func() { - pool(*startURL, *maxConcurrency, *maxURLs) + classic(*startURL, *maxConcurrency, *maxURLs) }) } -func pool(startURL url.URL, maxConcurrency, maxURLs int) { - var wg sync.WaitGroup - urls := make(chan url.URL) - - outlets := make([]<-chan []url.URL, maxConcurrency) - - for i := range maxConcurrency { - outlets[i] = worker(&wg, i+1, urls) - } - - batches := fanIn(outlets...) - - go func() { - urls <- startURL - }() - - deduplicate(batches, urls) - - fmt.Println("waiting") - wg.Wait() - fmt.Println("done") -} - -func deduplicate(batches <-chan []url.URL, urls chan url.URL) { - seen := make(map[url.URL]bool) - - for batch := range batches { - for _, u := range batch { - fmt.Println("(deduplicate) got url") - if !seen[u] { - seen[u] = true - fmt.Printf("(seen %d) %s\n", len(seen), &u) - - // Sending to urls channel has to be - // out of phase with deduplicate, - // because of the way the pipeline is - // set up: each channel is immediately - // synchronized with the next, - // creating a transitive effect where - // urls channel is absurdly - // synchronized with itself. So we - // have to do it this way. - go func() { - urls <- u - fmt.Println("(deduplicate) finished url") - }() - } - } - } -} - -func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { - out := make(chan []url.URL) - - wg.Go(func() { - defer close(out) - - for u := range urls { - fmt.Printf("(worker %d) got url\n", id) - - doc, err := fetch(u) - if err != nil { - log.Print(err) - } - - batch := findURLs(u, doc) - out <- batch - fmt.Printf("(worker %d) finished url\n", id) - } - }) - - return out -} - -func fanIn(chans ...<-chan []url.URL) <-chan []url.URL { - out := make(chan []url.URL) - var wg sync.WaitGroup - - for i, ch := range chans { - wg.Go(func() { - for batch := range ch { - fmt.Printf("(fanin %d) got batch\n", i+1) - out <- batch - fmt.Printf("(fanin %d) finished batch\n", i+1) - } - }) - } - - go func() { - wg.Wait() - close(out) - }() - - return out -} - // getLeakProfile runs a leaky program snippet, extracts the goroutine leak profile, // and writes it to stdout. func getLeakProfile(leakySnippet func()) { |
