1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import asyncio
import csv
import sys
import time
import aiohttp
def get_urls(filename: str, limit: int | None = None) -> list[str]:
"""Get a list of URLs from FILENAME.
FILENAME should be that of a CSV file with a field 'Domain',
denoting the site URL.
The 'https://' schema is prefixed to each URL.
If LIMIT is not None, then stop after LIMIT URLs have been read.
Return the list of URLs.
"""
urls: list[str] = []
with open(filename) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
if limit is not None and i == limit:
break
urls.append(f"https://{row['Domain']}")
return urls
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> str:
"""Fetch a URL under the given SESSION.
Use SEMAPHORE to limit the number of tasks that can make an HTTP
request at at time.
This function is the atomic task that executes concurrently with
its brethren to ping various websites.
"""
try:
async with semaphore, session.get(url) as response:
print(".", end="", flush=True)
if response.ok:
return "😁"
else:
return "😓"
except aiohttp.ClientError:
return "🤮"
except TimeoutError:
return "😴"
async def ping(urls: list[str], max_concurrency: int) -> list[str]:
"""Make a GET request to members of URLS.
If MAX_CONCURRENCY is None, browse every site at once.
Else, only browse MAX_CONCURRENCY number of sites at a time.
Print the sites as they get browsed; don't return anything.
"""
async with (
aiohttp.ClientSession(max_field_size=8190 * 2, timeout=aiohttp.ClientTimeout(5)) as session,
asyncio.TaskGroup() as tg,
):
semaphore = asyncio.Semaphore(max_concurrency)
tasks = [tg.create_task(fetch(session, url, semaphore)) for url in urls]
return [t.result() for t in tasks]
def main():
limit: int | None = None
if len(sys.argv) >= 2:
limit = int(sys.argv[1])
urls = get_urls("majestic_million.csv", limit)
max_concurrency = len(urls)
if len(sys.argv) == 3:
max_concurrency = int(sys.argv[2])
start_time = time.perf_counter()
results = asyncio.run(ping(urls, max_concurrency))
print()
print(results)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Execution time: {elapsed_time:.6f} seconds")
if __name__ == "__main__":
main()
|