summaryrefslogtreecommitdiff
path: root/main.py
blob: 44662b22b8d1de4c7716acbf7acd5d2a67111dde (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import asyncio
import csv
import sys
import time

import aiohttp


def get_urls(filename: str, limit: int | None = None) -> list[str]:
    """Get a list of URLs from FILENAME.

    FILENAME should be that of a CSV file with a field 'Domain',
    denoting the site URL.

    The 'https://' schema is prefixed to each URL.

    If LIMIT is not None, then stop after LIMIT URLs have been read.

    Return the list of URLs.

    """

    urls: list[str] = []

    with open(filename) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            if limit is not None and i == limit:
                break

            urls.append(f"https://{row['Domain']}")

        return urls


async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    """Fetch a URL under the given SESSION.

    Use SEMAPHORE to limit the number of tasks that can make an HTTP
    request at at time.

    This function is the atomic task that executes concurrently with
    its brethren to ping various websites.

    """

    try:
        async with session.get(url) as response:
            print(".", end="", flush=True)
            if response.ok:
                return "😁"
            else:
                return "😓"
    except aiohttp.ClientError:
        return "🤮"
    except TimeoutError:
        return "😴"


async def ping(urls: list[str], max_concurrency: int) -> list[str]:
    """Make a GET request to members of URLS.

    If MAX_CONCURRENCY is None, browse every site at once.

    Else, only browse MAX_CONCURRENCY number of sites at a time.

    Print the sites as they get browsed; don't return anything.

    """

    async with (
        aiohttp.ClientSession(max_field_size=8190 * 2, timeout=aiohttp.ClientTimeout(5)) as session,
        asyncio.TaskGroup() as tg,
    ):
        semaphore = asyncio.Semaphore(max_concurrency)
        tasks: list[asyncio.Task[str]] = []

        for url in urls:
            # Wrap the fetch-call with the semaphore.
            async def rate_limited():
                async with semaphore:
                    return await fetch(session, url)

            tasks.append(tg.create_task(rate_limited()))

    return [t.result() for t in tasks]


def main():
    limit: int | None = None

    if len(sys.argv) >= 2:
        limit = int(sys.argv[1])

    urls = get_urls("majestic_million.csv", limit)
    max_concurrency = len(urls)

    if len(sys.argv) == 3:
        max_concurrency = int(sys.argv[2])

    start_time = time.perf_counter()
    results = asyncio.run(ping(urls, max_concurrency))
    print()
    print(results)
    end_time = time.perf_counter()

    elapsed_time = end_time - start_time
    print(f"Execution time: {elapsed_time:.6f} seconds")


if __name__ == "__main__":
    main()