import asyncio import csv import sys import time import aiohttp def get_urls(filename: str, limit: int | None = None) -> list[str]: """Get a list of URLs from FILENAME. FILENAME should be that of a CSV file with a field 'Domain', denoting the site URL. The 'https://' schema is prefixed to each URL. If LIMIT is not None, then stop after LIMIT URLs have been read. Return the list of URLs. """ urls: list[str] = [] with open(filename) as f: reader = csv.DictReader(f) for i, row in enumerate(reader): if limit is not None and i == limit: break urls.append(f"https://{row['Domain']}") return urls async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> str: """Fetch a URL under the given SESSION. Use SEMAPHORE to limit the number of tasks that can make an HTTP request at at time. This function is the atomic task that executes concurrently with its brethren to ping various websites. """ try: async with semaphore, session.get(url) as response: print(".", end="", flush=True) if response.ok: return "😁" else: return "😓" except aiohttp.ClientError: return "🤮" except TimeoutError: return "😴" async def ping(urls: list[str], max_concurrency: int = 1) -> list[str]: """Make a GET request to members of URLS. If MAX_CONCURRENCY is None, browse every site at once. Else, only browse MAX_CONCURRENCY number of sites at a time. Print the sites as they get browsed; don't return anything. """ async with ( aiohttp.ClientSession(max_field_size=8190 * 2, timeout=aiohttp.ClientTimeout(5)) as session, asyncio.TaskGroup() as tg, ): semaphore = asyncio.Semaphore(max_concurrency) tasks = [tg.create_task(fetch(session, url, semaphore)) for url in urls] return [t.result() for t in tasks] def main(): limit: int | None = None if len(sys.argv) > 1: limit = int(sys.argv[1]) urls = get_urls("majestic_million.csv", limit) start_time = time.perf_counter() results = asyncio.run(ping(urls)) print() print(results) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Execution time: {elapsed_time:.6f} seconds") if __name__ == "__main__": main()