diff options
| -rw-r--r-- | classic.go | 49 | ||||
| -rw-r--r-- | main.go | 99 |
2 files changed, 50 insertions, 98 deletions
diff --git a/classic.go b/classic.go new file mode 100644 index 0000000..9adb5bf --- /dev/null +++ b/classic.go @@ -0,0 +1,49 @@ +package main + +import ( + "fmt" + "log" + "net/url" +) + +func classic(startURL url.URL, maxConcurrency, maxURLs int) { + worklist := make(chan []url.URL) + var numPendingSends int + + numPendingSends++ + go func() { + worklist <- []url.URL{startURL} + }() + + // Crawl the web concurrently. + seen := make(map[url.URL]bool) + count := 1 + + for ; numPendingSends > 0; numPendingSends-- { + batch := <-worklist + for _, u := range batch { + if !seen[u] { + fmt.Printf("%d. %s\n", count, &u) + count++ + + seen[u] = true + + numPendingSends++ + go func() { + worklist <- getBatch(u) + }() + } + } + } +} + +func getBatch(u url.URL) []url.URL { + doc, err := fetch(u) + if err != nil { + log.Print(err) + } + + batch := findURLs(u, doc) + + return batch +} @@ -9,7 +9,6 @@ import ( "net/url" "runtime/pprof" "strings" - "sync" "time" ) @@ -42,106 +41,10 @@ func main() { } getLeakProfile(func() { - pool(*startURL, *maxConcurrency, *maxURLs) + classic(*startURL, *maxConcurrency, *maxURLs) }) } -func pool(startURL url.URL, maxConcurrency, maxURLs int) { - var wg sync.WaitGroup - urls := make(chan url.URL) - - outlets := make([]<-chan []url.URL, maxConcurrency) - - for i := range maxConcurrency { - outlets[i] = worker(&wg, i+1, urls) - } - - batches := fanIn(outlets...) - - go func() { - urls <- startURL - }() - - deduplicate(batches, urls) - - fmt.Println("waiting") - wg.Wait() - fmt.Println("done") -} - -func deduplicate(batches <-chan []url.URL, urls chan url.URL) { - seen := make(map[url.URL]bool) - - for batch := range batches { - for _, u := range batch { - fmt.Println("(deduplicate) got url") - if !seen[u] { - seen[u] = true - fmt.Printf("(seen %d) %s\n", len(seen), &u) - - // Sending to urls channel has to be - // out of phase with deduplicate, - // because of the way the pipeline is - // set up: each channel is immediately - // synchronized with the next, - // creating a transitive effect where - // urls channel is absurdly - // synchronized with itself. So we - // have to do it this way. - go func() { - urls <- u - fmt.Println("(deduplicate) finished url") - }() - } - } - } -} - -func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { - out := make(chan []url.URL) - - wg.Go(func() { - defer close(out) - - for u := range urls { - fmt.Printf("(worker %d) got url\n", id) - - doc, err := fetch(u) - if err != nil { - log.Print(err) - } - - batch := findURLs(u, doc) - out <- batch - fmt.Printf("(worker %d) finished url\n", id) - } - }) - - return out -} - -func fanIn(chans ...<-chan []url.URL) <-chan []url.URL { - out := make(chan []url.URL) - var wg sync.WaitGroup - - for i, ch := range chans { - wg.Go(func() { - for batch := range ch { - fmt.Printf("(fanin %d) got batch\n", i+1) - out <- batch - fmt.Printf("(fanin %d) finished batch\n", i+1) - } - }) - } - - go func() { - wg.Wait() - close(out) - }() - - return out -} - // getLeakProfile runs a leaky program snippet, extracts the goroutine leak profile, // and writes it to stdout. func getLeakProfile(leakySnippet func()) { |
