package main import ( "context" "fmt" "net/url" "sync" ) /* CHANNEL/GOROUTINE NOTES main goroutine: - manages urls channel. */ // workers launches a worker queue for crawling a given Web domain. func workers(startURL url.URL, maxConcurrency, maxURLs int) { worklist := make(chan []packet) // Unseen URLs. packets := make(chan packet) go func() { startPacket := packet{startURL, 0} worklist <- []packet{startPacket} }() var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) // Create maxConcurrency worker goroutines to demultiplex from // the urls channel (unseen links.) for i := range maxConcurrency { wg.Go(func() { for p := range packets { batch := getBatch(p.url) // Convert URLs to Packets. In the // process, bump up the depth by 1. ps := convertToPackets(batch, p.depth+1) select { case <-ctx.Done(): fmt.Printf("exiting early %d\n", i+1) return default: go func() { worklist <- ps }() } } fmt.Printf("terminating %d\n", i+1) }) } // The main goroutine deduplicates worklist items and sends // unseen ones to the crawlers in a fan-out fashion. seen := make(map[url.URL]int) // Used to prettify the running URL listing. count := 1 loop: for batch := range worklist { for _, p := range batch { // We're tracking _depth_ with the seen-map // now, so any unseen URL doesn't have any // depth-entry registered yet. if _, ok := seen[p.url]; !ok { fmt.Printf("%d. %s\n", count, p) count++ seen[p.url] = p.depth if len(seen) == maxURLs { break loop } packets <- p } } } // We're done writing to the packets channel, so close it. close(packets) // There are some in-flight workers as of this point, so // signal a cancel to them. cancel() wg.Wait() }