Read from API and Process Data
This page shows a simple pattern for API-driven scripts: fetch JSON, normalize records, summarize, and print a clear report. You will build one piece at a time and run python read_from_api_process_data.py after each step.
Create a file named read_from_api_process_data.py. This walkthrough uses Python standard library modules only (urllib, json, collections, datetime), so no package install is needed.
Scenario and Goal
Section titled “Scenario and Goal”You want to read recent commit data from the GitHub API endpoint:
GET /repos/{owner}/{repo}/commits
Then process the response into useful outputs:
- Total commits fetched
- Commits per author
- Commits per day
- A short latest-commit list
This example uses public repositories only and no token. Unauthenticated requests have lower rate limits.
Step 1 — Fetch Commits JSON
Section titled “Step 1 — Fetch Commits JSON”Goal: Request commit data from GitHub and confirm you received a JSON list.
Use urllib.request to avoid extra dependencies. Add a User-Agent header because many APIs expect one.
import jsonfrom urllib.request import Request, urlopen
OWNER = "python" # GitHub org or usernameREPO = "cpython" # repository namePER_PAGE = 10 # how many commits to ask for (API may cap this)
def fetch_commits(owner: str, repo: str, per_page: int = 10): url = f"https://api.github.com/repos/{owner}/{repo}/commits?per_page={per_page}" # Build a GET request (URL + headers); no network call until urlopen. req = Request(url, headers={"User-Agent": "python-api-example"})
# Send the request and return the response stream (10s max wait). with urlopen(req, timeout=10) as resp: status = resp.status # HTTP result code from the server (200 = success). data = json.load(resp) # Read response body and parse JSON into Python data. return status, data
if __name__ == "__main__": status, items = fetch_commits(OWNER, REPO, PER_PAGE) print("Step 1 OK — status:", status) print("Items fetched:", len(items))Check: Run the script. You should see a 200 status and a non-zero item count for active repositories.
Step 2 — Parse and Normalize One Commit
Section titled “Step 2 — Parse and Normalize One Commit”Goal: Convert each raw API item into a small, consistent dict.
Add parse_commit and apply it to the fetched list. Keep only a few fields you actually need. Highlighted lines are the new parse_commit function.
import jsonfrom urllib.request import Request, urlopen
OWNER = "python" # GitHub org or usernameREPO = "cpython" # repository namePER_PAGE = 10 # how many commits to ask for (API may cap this)
def fetch_commits(owner: str, repo: str, per_page: int = 10): url = f"https://api.github.com/repos/{owner}/{repo}/commits?per_page={per_page}" req = Request(url, headers={"User-Agent": "python-api-example"}) # Build a GET request (URL + headers); no network call until urlopen. with urlopen(req, timeout=10) as resp: # Send the request and return the response stream (10s max wait). status = resp.status # HTTP result code from the server (200 = success). data = json.load(resp) # Read response body and parse JSON into Python data. return status, data
def parse_commit(item: dict) -> dict | None: sha = item.get("sha") # full commit hash from the API commit = item.get("commit", {}) # nested author/message metadata author = commit.get("author", {}) message = commit.get("message", "") date = author.get("date") # ISO-8601 timestamp string author_name = author.get("name")
if not sha or not date: return None # skip incomplete API rows
return { "sha_short": sha[:7], "author_name": author_name or "Unknown", "date": date[:10], # YYYY-MM-DD "message_first_line": message.splitlines()[0] if message else "(no message)", }
if __name__ == "__main__": status, items = fetch_commits(OWNER, REPO, PER_PAGE) parsed = [] for item in items: # each item is one commit object from JSON row = parse_commit(item) if row is not None: parsed.append(row)
print("Step 2 OK — status:", status) print("Parsed commits:", len(parsed)) if parsed: print("First parsed commit:", parsed[0])Check: You should see at least one parsed commit with sha_short, author_name, date, and message_first_line.
Step 3 — Summarize the Parsed Data
Section titled “Step 3 — Summarize the Parsed Data”Goal: Build simple aggregates: commits by author and commits by day.
Use Counter and defaultdict(int) for compact summary logic. Highlighted lines are the new summarize function.
import jsonfrom collections import Counter, defaultdictfrom urllib.request import Request, urlopen
OWNER = "python" # GitHub org or usernameREPO = "cpython" # repository namePER_PAGE = 10 # how many commits to ask for (API may cap this)
def fetch_commits(owner: str, repo: str, per_page: int = 10): url = f"https://api.github.com/repos/{owner}/{repo}/commits?per_page={per_page}" req = Request(url, headers={"User-Agent": "python-api-example"}) # Build a GET request (URL + headers); no network call until urlopen. with urlopen(req, timeout=10) as resp: # Send the request and return the response stream (10s max wait). status = resp.status # HTTP result code from the server (200 = success). data = json.load(resp) # Read response body and parse JSON into Python data. return status, data
def parse_commit(item: dict) -> dict | None: sha = item.get("sha") # full commit hash from the API commit = item.get("commit", {}) # nested author/message metadata author = commit.get("author", {}) message = commit.get("message", "") date = author.get("date") # ISO-8601 timestamp string author_name = author.get("name") if not sha or not date: return None # skip incomplete API rows return { "sha_short": sha[:7], "author_name": author_name or "Unknown", "date": date[:10], "message_first_line": message.splitlines()[0] if message else "(no message)", }
def summarize(rows: list[dict]): # tally commits by author and by calendar day commits_by_author: Counter[str] = Counter() commits_by_day: dict[str, int] = defaultdict(int)
for row in rows: commits_by_author[row["author_name"]] += 1 commits_by_day[row["date"]] += 1
return commits_by_author, commits_by_day
if __name__ == "__main__": status, items = fetch_commits(OWNER, REPO, PER_PAGE) parsed = [row for row in (parse_commit(item) for item in items) if row is not None] # drop None parses by_author, by_day = summarize(parsed)
print("Step 3 OK — status:", status) print("Parsed commits:", len(parsed)) print("Top authors:", by_author.most_common(3)) print("By day:", dict(sorted(by_day.items())))Check: You should see top-author counts and one or more date buckets. Results vary by repository activity.
Step 4 — Print a Readable Report
Section titled “Step 4 — Print a Readable Report”Goal: Format summary output in clear sections for quick scanning. Highlighted lines are the new print_report function.
import jsonfrom collections import Counter, defaultdictfrom urllib.request import Request, urlopen
OWNER = "python" # GitHub org or usernameREPO = "cpython" # repository namePER_PAGE = 10 # how many commits to ask for (API may cap this)
def fetch_commits(owner: str, repo: str, per_page: int = 10): url = f"https://api.github.com/repos/{owner}/{repo}/commits?per_page={per_page}" req = Request(url, headers={"User-Agent": "python-api-example"}) # Build a GET request (URL + headers); no network call until urlopen. with urlopen(req, timeout=10) as resp: # Send the request and return the response stream (10s max wait). status = resp.status # HTTP result code from the server (200 = success). data = json.load(resp) # Read response body and parse JSON into Python data. return status, data
def parse_commit(item: dict) -> dict | None: sha = item.get("sha") # full commit hash from the API commit = item.get("commit", {}) # nested author/message metadata author = commit.get("author", {}) message = commit.get("message", "") date = author.get("date") # ISO-8601 timestamp string author_name = author.get("name") if not sha or not date: return None # skip incomplete API rows return { "sha_short": sha[:7], "author_name": author_name or "Unknown", "date": date[:10], "message_first_line": message.splitlines()[0] if message else "(no message)", }
def summarize(rows: list[dict]): # tally commits by author and by calendar day commits_by_author: Counter[str] = Counter() commits_by_day: dict[str, int] = defaultdict(int) for row in rows: commits_by_author[row["author_name"]] += 1 commits_by_day[row["date"]] += 1 return commits_by_author, commits_by_day
def print_report(status: int, rows: list[dict], by_author: Counter, by_day: dict[str, int]) -> None: # print labeled sections to stdout print("=== API Fetch Summary ===") print("HTTP status:", status) print("Parsed commits:", len(rows))
print("\n=== Top Authors ===") for name, count in by_author.most_common(5): # Counter returns highest counts first print(f" {name}: {count}")
print("\n=== Commits by Day ===") for day in sorted(by_day): # chronological order by date string print(f" {day}: {by_day[day]}")
print("\n=== Latest Commits ===") for row in rows[:5]: # API returns newest commits first print(f" {row['sha_short']} {row['author_name']} {row['message_first_line']}")
if __name__ == "__main__": status, items = fetch_commits(OWNER, REPO, PER_PAGE) parsed = [row for row in (parse_commit(item) for item in items) if row is not None] # drop None parses by_author, by_day = summarize(parsed) print_report(status, parsed, by_author, by_day)Check: Run the script. You should see summary, top authors, per-day counts, and a short latest-commits list.
Notes for Real-World Usage
Section titled “Notes for Real-World Usage”- Handle non-200 responses explicitly (for example 403 rate limits).
- Add retries/timeouts/backoff if this script is run in automation.
- For larger result sets, add pagination with
page=and loop until empty. - If you later add auth, use an environment variable and
Authorizationheader.