import asyncio import csv import sys import time import aiohttp def get_urls(filename: str, limit: int | None = None) -> list[str]: """Get a list of URLs from FILENAME. FILENAME should be that of a CSV file with a field 'Domain', denoting the site URL. The 'https://' schema is prefixed to each URL. If LIMIT is not None, then stop after LIMIT URLs have been read. Return the list of URLs. """ urls: list[str] = [] with open(filename) as f: reader = csv.DictReader(f) for i, row in enumerate(reader): if limit is not None and i == limit: break urls.append(f"https://{row['Domain']}") return urls async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> str: try: async with semaphore, session.get(url) as response: print(".", end="", flush=True) if response.ok: return "😁" else: return "😓" except aiohttp.ClientError: return "🤮" except TimeoutError: return "😴" async def ping(urls: list[str], max_concurrency: int = 1) -> list[str]: """Make a GET request to members of URLS. If MAX_CONCURRENCY is None, browse every site at once. Else, only browse MAX_CONCURRENCY number of sites at a time. Print the sites as they get browsed; don't return anything. """ async with ( aiohttp.ClientSession(max_field_size=8190 * 2, timeout=aiohttp.ClientTimeout(5)) as session, asyncio.TaskGroup() as tg, ): semaphore = asyncio.Semaphore(max_concurrency) tasks = [tg.create_task(fetch(session, url, semaphore)) for url in urls] return [t.result() for t in tasks] def main(): limit: int | None = None if len(sys.argv) > 1: limit = int(sys.argv[1]) urls = get_urls("majestic_million.csv", limit) start_time = time.perf_counter() results = asyncio.run(ping(urls)) print() print(results) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Execution time: {elapsed_time:.6f} seconds") if __name__ == "__main__": main()