// URLs implements a breadth-first search webcrawler based on the // example given in section 8.6 of The Go Programming Language. package main import ( "flag" "fmt" "log" "net/url" "runtime/pprof" "strings" "sync" "time" ) func main() { maxConcurrency := flag.Int("c", 0, "Maximum number of concurrent queue pushes") startRawURL := flag.String("url", "", "Entry-point URL") maxURLs := flag.Int("max", 0, "Maximum number of URLs to collect (omitted or 0 means no limit)") flag.Parse() if *maxConcurrency == 0 { log.Fatal("Missing -c argument") } if *maxConcurrency < 1 { log.Fatalf("Invalid -c argument: %d", *maxConcurrency) } if *startRawURL == "" { log.Fatal("Missing -url argument") } if *maxURLs < 0 { log.Fatalf("Invalid -max argument: %d", *maxURLs) } startURL, err := url.Parse(*startRawURL) if err != nil { log.Fatal(err) } getLeakProfile(func() { pool(*startURL, *maxConcurrency, *maxURLs) }) } func pool(startURL url.URL, maxConcurrency, maxURLs int) { var wg sync.WaitGroup urls := make(chan url.URL) outlets := make([]<-chan []url.URL, maxConcurrency) for i := range maxConcurrency { outlets[i] = worker(&wg, i+1, urls) } batches := fanIn(outlets...) go deduplicate(batches, urls) urls <- startURL fmt.Println("waiting") wg.Wait() fmt.Println("done") } func deduplicate(batches <-chan []url.URL, urls chan url.URL) { seen := make(map[url.URL]bool) for batch := range batches { for _, u := range batch { fmt.Println("(deduplicate) got url") if !seen[u] { seen[u] = true fmt.Printf("(seen %d) %s\n", len(seen), &u) // Sending to urls channel has to be // out of phase with deduplicate, // because of the way the pipeline is // set up: each channel is immediately // synchronized with the next, // creating a transitive effect where // urls channel is absurdly // synchronized with itself. So we // have to do it this way. go func() { urls <- u fmt.Println("(deduplicate) finished url") }() } } } } func worker(wg *sync.WaitGroup, id int, urls <-chan url.URL) <-chan []url.URL { out := make(chan []url.URL) wg.Go(func() { defer close(out) for u := range urls { fmt.Printf("(worker %d) got url\n", id) doc, err := fetch(u) if err != nil { log.Print(err) } batch := findURLs(u, doc) out <- batch fmt.Printf("(worker %d) finished url\n", id) } }) return out } func fanIn(chans ...<-chan []url.URL) <-chan []url.URL { out := make(chan []url.URL) var wg sync.WaitGroup for i, ch := range chans { wg.Go(func() { for batch := range ch { fmt.Printf("(fanin %d) got batch\n", i+1) out <- batch fmt.Printf("(fanin %d) finished batch\n", i+1) } }) } go func() { wg.Wait() close(out) }() return out } // getLeakProfile runs a leaky program snippet, extracts the goroutine leak profile, // and writes it to stdout. func getLeakProfile(leakySnippet func()) { prof := pprof.Lookup("goroutineleak") defer func() { time.Sleep(2 * time.Second) var content strings.Builder prof.WriteTo(&content, 2) // Ignore non leaked goroutines leaks := strings.SplitSeq(content.String(), "\n\n") for leak := range leaks { if strings.Contains(leak, "(leaked)") { fmt.Println(leak + "\n") } } }() leakySnippet() }