package main import ( "context" "fmt" "net/url" "sync" ) /* main goroutine: - manages urls channel. */ type Packet struct { url url.URL depth int } func (p Packet) String() string { return fmt.Sprintf("[%d] %s", p.depth, &p.url) } func workers(startURL url.URL, maxConcurrency, maxURLs int) { worklist := make(chan []Packet) // Unseen URLs. packets := make(chan Packet) go func() { startPacket := Packet{startURL, 0} worklist <- []Packet{startPacket} }() var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) // Create maxConcurrency worker goroutines to demultiplex from // the urls channel (unseen links.) for i := range maxConcurrency { wg.Go(func() { for p := range packets { batch := getBatch(p.url) var ps []Packet for _, u := range batch { newPacket := Packet{u, p.depth + 1} ps = append(ps, newPacket) } select { case <-ctx.Done(): fmt.Printf("exiting early %d\n", i+1) return default: go func() { worklist <- ps }() } } fmt.Printf("terminating %d\n", i+1) }) } // The main goroutine deduplicates worklist items and sends // unseen ones to the crawlers in a fan-out fashion. seen := make(map[url.URL]int) count := 1 loop: for batch := range worklist { for _, p := range batch { if _, ok := seen[p.url]; !ok { fmt.Printf("%d. %s\n", count, p) count++ seen[p.url] = p.depth if len(seen) == maxURLs { break loop } packets <- p } } } close(packets) cancel() wg.Wait() }