package main import ( "context" "fmt" "net/url" "sync" ) /* CHANNEL/GOROUTINE NOTES main goroutine: - manages urls channel. */ // Packet accrues data as it passes through our concurrent // pipeline. Formerly the web crawler only transmitted [url.URL]'s, // but usingn a compound data type allows us to add URL // depth-tracking. type Packet struct { url url.URL depth int } // String implements the Stringer interface. We need this mainly // because a [url.URL]'s String method only works when that URL is a // pointer. func (p Packet) String() string { return fmt.Sprintf("[%d] %s", p.depth, &p.url) } // workers launches a worker queue for crawling a given Web domain. func workers(startURL url.URL, maxConcurrency, maxURLs int) { worklist := make(chan []Packet) // Unseen URLs. packets := make(chan Packet) go func() { startPacket := Packet{startURL, 0} worklist <- []Packet{startPacket} }() var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) // Create maxConcurrency worker goroutines to demultiplex from // the urls channel (unseen links.) for i := range maxConcurrency { wg.Go(func() { for p := range packets { batch := getBatch(p.url) // Convert URLs to Packets. In the // process, bump up the depth by 1. var ps []Packet for _, u := range batch { newPacket := Packet{u, p.depth + 1} ps = append(ps, newPacket) } select { case <-ctx.Done(): fmt.Printf("exiting early %d\n", i+1) return default: go func() { worklist <- ps }() } } fmt.Printf("terminating %d\n", i+1) }) } // The main goroutine deduplicates worklist items and sends // unseen ones to the crawlers in a fan-out fashion. seen := make(map[url.URL]int) // Used to prettify the running URL listing. count := 1 loop: for batch := range worklist { for _, p := range batch { // We're tracking _depth_ with the seen-map // now, so any unseen URL doesn't have any // depth-entry registered yet. if _, ok := seen[p.url]; !ok { fmt.Printf("%d. %s\n", count, p) count++ seen[p.url] = p.depth if len(seen) == maxURLs { break loop } packets <- p } } } // We're done writing to the packets channel, so close it. close(packets) // There are some in-flight workers as of this point, so // signal a cancel to them. cancel() wg.Wait() }