Process Log Files
This page walks through how to process log files with Python: read input, extract fields, aggregate, and report. The script uses the standard library only — no third-party packages and no regular expressions. You add one piece at a time and run python process_logs.py after each step so nothing piles up in your head.
Create a file process_logs.py and an app.log in the same folder (use the sample below). Python 3.9+ is enough for the typing used here. For more on looping over files, see Loops.
Scenario and Goal
Section titled “Scenario and Goal”You have plain-text application logs. Each line looks like: date, time, severity, then the rest of the message. You want to:
- Count how many lines you see per severity (for example
INFO,WARN,ERROR). - For
ERRORlines, count how many fall into each hour so you can spot spikes. - Print a short summary to the terminal.
Real logs are often messy: skip lines that do not match the shape you expect instead of crashing.
Sample Log Format
Section titled “Sample Log Format”Create app.log in the same folder as your script with content like this:
2024-03-19 10:15:01 INFO service=auth msg="Startup complete"2024-03-19 10:45:22 ERROR service=auth msg="Login failed" user=guest2024-03-19 10:47:01 WARN service=api msg="High latency"2024-03-19 11:02:15 ERROR service=db msg="Connection timeout"2024-03-19 11:15:00 INFO service=auth msg="User logged in" user=atifEach line: YYYY-MM-DD HH:MM:SS LEVEL message... (fields separated by spaces; the message may contain spaces).
What you’ll build: A generator that reads lines without loading the whole file, a split-based parser, an analyze function that counts and buckets, and a small report printer. You’ll replace the if __name__ == "__main__": block in each step (or grow it as shown).
Step 1 — Read Lines From a File
Section titled “Step 1 — Read Lines From a File”Goal: Open app.log and stream lines one at a time. Prove it with a line count and a short preview.
pathlib.Path keeps paths readable. Iterating for line in f avoids reading the entire file into memory at once (important for large logs).
The preview truncates long lines with first[:80] — string slicing, same idea as list slices in Language basics.
from pathlib import Path
LOG_FILE = Path("app.log")
def read_lines(path: Path): """Yield stripped lines one at a time (memory-friendly for big files).""" # utf-8 is typical; errors="replace" avoids crashing on odd bytes with path.open("r", encoding="utf-8", errors="replace") as f: for line in f: yield line.strip()
if __name__ == "__main__": if not LOG_FILE.is_file(): print(f"Missing {LOG_FILE}. Create it using the sample log above.") raise SystemExit(1)
# One pass: count every line; keep only # the first line for preview (no list of all lines). n = 0 first = None for line in read_lines(LOG_FILE): if first is None: first = line n += 1
print("Step 1 OK:", n, "lines")
# If the file had at least one line, # show a short preview (truncate long lines). if first: preview = first if len(first) <= 80 else first[:80] + "..." print("First line preview:", preview)Check: Run python process_logs.py. You should see Step 1 OK: 5 lines (or your line count) and a preview of the first line.
Step 2 — Parse One Line With split
Section titled “Step 2 — Parse One Line With split”Goal: Turn a single log line into a small dict (timestamp, severity, message), or None if the line does not fit.
line.split(maxsplit=3) splits into at most four parts: date, time, severity, and everything else as the message (so spaces inside the message stay intact).
Add parse_line below read_lines (still above if __name__). Replace your __main__ block with the one below. Highlighted lines are what you add or change in this step (the rest matches Step 1).
from pathlib import Pathfrom typing import Optional
LOG_FILE = Path("app.log")
def read_lines(path: Path): """Yield stripped lines one at a time (memory-friendly for big files).""" with path.open("r", encoding="utf-8", errors="replace") as f: for line in f: yield line.strip()
def parse_line(line: str) -> Optional[dict[str, str]]: """ Expect: date time LEVEL rest... maxsplit=3 → four parts; the last part is the full message tail. """ if not line: return None # Split on whitespace into: date, time, severity, and the remaining message. parts = line.split(maxsplit=3) if len(parts) < 4: # malformed / too short — skip later instead of crashing return None date_s, time_s, severity, message = parts return { "timestamp": f"{date_s} {time_s}", "severity": severity, "message": message, }
# One known-good line so you can test the parser without a fileSAMPLE_LINE = '2024-03-19 10:15:01 INFO service=auth msg="Startup complete"'
if __name__ == "__main__": print("Step 2 sample parse:", parse_line(SAMPLE_LINE))
if not LOG_FILE.is_file(): print(f"Missing {LOG_FILE}. Create it using the sample log above.") raise SystemExit(1)
for line in read_lines(LOG_FILE): row = parse_line(line) print(line[:60], "->", row) break # only show first parsed row from fileCheck: Run the script. The sample should print a dict with timestamp, severity, and message. The first file line should show a similar shape.
Match your parser to a stable log layout; if the layout changes, update the parser (or move to structured logs — see below).
Step 3 — Count Severities and Bucket Errors by Hour
Section titled “Step 3 — Count Severities and Bucket Errors by Hour”Goal: Walk the whole file, count each severity, and for ERROR rows count how many fall in each clock hour.
Add imports: Counter, defaultdict, and datetime. Add analyze after parse_line. Replace __main__ with the version below (you can remove SAMPLE_LINE and the single-line loop from Step 2, or leave SAMPLE_LINE for quick experiments).
Counter tallies severities. defaultdict(int) lets you do errors_by_hour[hour_bucket] += 1 without checking if the key exists. strptime / strftime turn the timestamp string into an hour bucket.
from collections import Counter, defaultdictfrom datetime import datetimefrom pathlib import Pathfrom typing import Optional
LOG_FILE = Path("app.log")
def read_lines(path: Path): """Yield stripped lines one at a time (memory-friendly for big files).""" with path.open("r", encoding="utf-8", errors="replace") as f: for line in f: yield line.strip()
def parse_line(line: str) -> Optional[dict[str, str]]: if not line: return None parts = line.split(maxsplit=3) if len(parts) < 4: return None date_s, time_s, severity, message = parts return { "timestamp": f"{date_s} {time_s}", "severity": severity, "message": message, }
def analyze(path: Path): severity_counts: Counter[str] = Counter() errors_by_hour: dict[str, int] = defaultdict(int) errors: list[dict] = []
for line in read_lines(path): row = parse_line(line) if row is None: continue # skip garbage lines
severity_counts[row["severity"]] += 1
if row["severity"] == "ERROR": # Parse timestamp once, then bucket to the top of the hour ts = datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S") hour_bucket = ts.strftime("%Y-%m-%d %H:00") errors_by_hour[hour_bucket] += 1 errors.append(row)
return severity_counts, errors_by_hour, errors
if __name__ == "__main__": if not LOG_FILE.is_file(): print(f"Missing {LOG_FILE}. Create it using the sample log above.") raise SystemExit(1)
counts, by_hour, err_rows = analyze(LOG_FILE) print("Step 3 OK — severity counts:", dict(counts)) print("Errors by hour:", dict(by_hour)) print("Total ERROR rows stored:", len(err_rows))Check: With the sample app.log, you should see two ERROR lines split across hours 2024-03-19 10:00 and 2024-03-19 11:00, and counts for INFO, WARN, and ERROR.
Step 4 — Print a Readable Report
Section titled “Step 4 — Print a Readable Report”Goal: Format the aggregates for humans: severity totals, a simple per-hour bar for errors, and the last few error messages.
Add print_report after analyze. Replace __main__ to call print_report instead of printing raw dicts. The # bar uses min(n, 40) so one huge count does not flood the terminal.
from collections import Counter, defaultdictfrom datetime import datetimefrom pathlib import Pathfrom typing import Optional
LOG_FILE = Path("app.log")
def read_lines(path: Path): """Yield stripped lines one at a time (memory-friendly for big files).""" with path.open("r", encoding="utf-8", errors="replace") as f: for line in f: yield line.strip()
def parse_line(line: str) -> Optional[dict[str, str]]: if not line: return None parts = line.split(maxsplit=3) if len(parts) < 4: return None date_s, time_s, severity, message = parts return { "timestamp": f"{date_s} {time_s}", "severity": severity, "message": message, }
def analyze(path: Path): severity_counts: Counter[str] = Counter() errors_by_hour: dict[str, int] = defaultdict(int) errors: list[dict] = []
for line in read_lines(path): row = parse_line(line) if row is None: continue # skip malformed lines
severity_counts[row["severity"]] += 1
if row["severity"] == "ERROR": ts = datetime.strptime(row["timestamp"], "%Y-%m-%d %H:%M:%S") hour_bucket = ts.strftime("%Y-%m-%d %H:00") errors_by_hour[hour_bucket] += 1 errors.append(row)
return severity_counts, errors_by_hour, errors
def print_report(severity_counts, errors_by_hour, errors) -> None: print("=== Counts by Severity ===") for severity, n in severity_counts.most_common(): print(f" {severity}: {n}")
print("\n=== Errors by Hour ===") for hour in sorted(errors_by_hour): n = errors_by_hour[hour] bar = "#" * min(n, 40) # cap width for huge counts print(f" {hour} {bar} ({n})")
print("\n=== Last Few Errors ===") for row in errors[-5:]: print(f" [{row['timestamp']}] {row['message']}")
if __name__ == "__main__": if not LOG_FILE.is_file(): print(f"Missing {LOG_FILE}. Create it using the sample log above.") raise SystemExit(1) counts, by_hour, err_rows = analyze(LOG_FILE) print_report(counts, by_hour, err_rows)Check: Run the script. You should see three sections: counts by severity, error bars by hour, and the last few error lines.
Other Simple Log Formats
Section titled “Other Simple Log Formats”These patterns also avoid regex. Use whichever matches how the logs are actually written.
| Situation | Approach |
|---|---|
| One JSON object per line | json.loads(line) inside try / except json.JSONDecodeError; then read fields from the dict. |
Mostly key=value tokens | Loop tokens with str.partition("=") and build a dict (strip quotes from values if needed). |
| Comma- or tab-separated exports | The csv module or a careful split if quoting is simple. |
Some logs are free-form or inconsistent; teams often prefer structured logging (JSON) so analysis stays simple. You do not need regex for the common cases above.
Patterns Worth Keeping
Section titled “Patterns Worth Keeping”- Memory: Iterate over the file object instead of
read()orreadlines()for large inputs. - Robustness: Malformed lines → skip or log a count; do not assume every line is perfect.
- Aggregation:
Counteranddefaultdictare compact ways to tally and bucket in the standard library. - Next steps: You can extend the same idea with
Path.globfor multiple files, writing CSV or JSON output, or alerting when counts cross a threshold in a time window.