Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,25 @@ jobs:

- name: Run tests
run: pytest --tb=short -q

mypy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: pip
cache-dependency-path: |
requirements.txt
requirements-dev.txt

- name: Install dev dependencies
run: pip install -r requirements-dev.txt

- name: Run mypy (strict, production packages)
run: mypy -p api -p utils -p models

- name: Run mypy on tests (non-strict smoke)
run: mypy tests --config-file mypy-tests.ini --follow-imports skip
20 changes: 20 additions & 0 deletions api/_flask_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Shared Flask handler return types for mypy."""

from typing import Any, Union, cast

from flask import Response, jsonify

# Narrow scope: handlers here return Response or (Response, status) only.
# Widen if adding (Response, int, headers) or plain-text tuples.
FlaskReturn = Union[Response, tuple[Response, int]]
Comment thread
clean6378-max-it marked this conversation as resolved.


def json_response(*args: Any, **kwargs: Any) -> Response:
"""Typed wrapper around :func:`flask.jsonify` for JSON bodies."""
return cast(Response, jsonify(*args, **kwargs))


def json_error(payload: str | dict[str, Any], status: int) -> tuple[Response, int]:
"""JSON error body with explicit HTTP status (avoids trailing `, 404` at call sites)."""
body: dict[str, Any] = {"error": payload} if isinstance(payload, str) else payload
return jsonify(body), status
100 changes: 51 additions & 49 deletions api/export_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import os
import zipfile
from datetime import datetime
from typing import Any

from flask import Blueprint, current_app, jsonify, request, send_file
from flask import Blueprint, current_app, request, send_file

from api._flask_types import FlaskReturn, json_error, json_response
from models.export import ExportStateDict

from utils.export_state_store import (
EXPORT_STATE_FILE,
Expand All @@ -33,24 +37,24 @@
_STATE_FILE = EXPORT_STATE_FILE


def _state_lock():
def _state_lock() -> Any:
return export_state_lock(_STATE_FILE)


def _load_state_from_disk() -> dict:
def _load_state_from_disk() -> ExportStateDict:
return load_export_state_from_disk(_STATE_FILE)


def _atomic_write_state(state: dict) -> None:
def _atomic_write_state(state: ExportStateDict) -> None:
atomic_write_export_state(state, _STATE_FILE)


def _read_state() -> dict:
def _read_state() -> ExportStateDict:
with _state_lock():
return _load_state_from_disk()


def _write_state(sessions_map: dict, count: int) -> None:
def _write_state(sessions_map: dict[str, float], count: int) -> None:
"""Persist merge of *sessions_map* and update last-export metadata (*count* = this run only)."""
with _state_lock():
state = _load_state_from_disk()
Expand All @@ -61,10 +65,10 @@ def _write_state(sessions_map: dict, count: int) -> None:


@export_bp.route("/api/export/state")
def get_export_state():
def get_export_state() -> FlaskReturn:
state = _read_state()
n = state.get("exportedCount", 0)
return jsonify(
return json_response(
{
"last_export_time": state.get("lastExportTime"),
# Sessions exported in the last completed bulk export (not a lifetime total).
Expand All @@ -75,16 +79,16 @@ def get_export_state():


@export_bp.route("/api/export", methods=["POST"])
def bulk_export():
def bulk_export() -> FlaskReturn:
body = request.get_json(silent=True)
if body is None:
body = {}
if not isinstance(body, dict):
return jsonify({"error": "Invalid request body"}), 400
return json_error("Invalid request body", 400)

since = body.get("since", "all")
if since not in ("all", "last", "incremental"):
return jsonify({"error": "Invalid since mode", "since": since}), 400
return json_error({"error": "Invalid since mode", "since": since}, 400)

base = (
current_app.config.get("CLAUDE_PROJECTS_DIR")
Expand All @@ -94,14 +98,14 @@ def bulk_export():
rules = current_app.config.get("EXCLUSION_RULES") or []

state = _read_state()
last_export_sessions: dict = (
last_export_sessions: dict[str, float] = (
state.get("sessions", {}) if since == "incremental" else {}
)

buf = io.BytesIO()
count = 0
manifest = []
new_sessions_map: dict = {}
manifest: list[dict[str, Any]] = []
new_sessions_map: dict[str, float] = {}
latest_day = None

with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
Expand Down Expand Up @@ -226,15 +230,7 @@ def bulk_export():
_write_state(new_sessions_map, count)

if count == 0:
return (
jsonify(
{
"error": "Nothing to export",
"since": since,
}
),
422,
)
return json_error({"error": "Nothing to export", "since": since}, 422)

buf.seek(0)
date_tag = datetime.now().strftime("%Y-%m-%d")
Expand All @@ -251,12 +247,12 @@ def bulk_export():
buf,
mimetype="application/zip",
as_attachment=True,
download_name=f"claude-code-export{suffix}-{date_tag}.zip",
download_name=f"claude-code-export{suffix}-{date_tag}.zip", # type: ignore[call-arg]
)


@export_bp.route("/api/export/session/<path:project_name>/<session_id>")
def export_session(project_name, session_id):
def export_session(project_name: str, session_id: str) -> FlaskReturn:
import os
from utils.session_path import safe_join

Expand All @@ -267,36 +263,42 @@ def export_session(project_name, session_id):
try:
filepath = safe_join(base, project_name, f"{session_id}.jsonl")
except ValueError:
return jsonify({"error": "Invalid path"}), 400
return json_error("Invalid path", 400)

if not os.path.isfile(filepath):
return jsonify({"error": "Session not found"}), 404
return json_error("Session not found", 404)

fmt = request.args.get("format", "md")
session = parse_session(filepath)
rules = current_app.config.get("EXCLUSION_RULES") or []
if is_session_excluded(rules, session, project_name):
return jsonify({"error": "Session not found"}), 404
stats = compute_stats(session)
title_slug = slugify(session["title"], default="session")

if fmt == "json":
content = session_to_json(session, stats)
buf = io.BytesIO(content.encode("utf-8"))
try:
session = parse_session(filepath)
rules = current_app.config.get("EXCLUSION_RULES") or []
if is_session_excluded(rules, session, project_name):
return json_error("Session not found", 404)
stats = compute_stats(session)
title_slug = slugify(session["title"], default="session")

if fmt == "json":
content = session_to_json(session, stats)
buf = io.BytesIO(content.encode("utf-8"))
buf.seek(0)
return send_file(
buf,
mimetype="application/json",
as_attachment=True,
download_name=f"{title_slug}.json", # type: ignore[call-arg]
)

md = session_to_markdown(session, stats)
buf = io.BytesIO(md.encode("utf-8"))
buf.seek(0)
return send_file(
buf,
mimetype="application/json",
mimetype="text/markdown",
as_attachment=True,
download_name=f"{title_slug}.json",
download_name=f"{title_slug}.md", # type: ignore[call-arg]
)

md = session_to_markdown(session, stats)
buf = io.BytesIO(md.encode("utf-8"))
buf.seek(0)
return send_file(
buf,
mimetype="text/markdown",
as_attachment=True,
download_name=f"{title_slug}.md",
)
except Exception:
current_app.logger.exception(
"Failed to export session %s/%s", project_name, session_id
)
return json_error("Internal server error exporting session", 500)
58 changes: 40 additions & 18 deletions api/projects.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
"""Project listing endpoints."""

from flask import Blueprint, current_app, jsonify
from flask import Blueprint, current_app

from api._flask_types import FlaskReturn, json_error, json_response
from models.project import ProjectSessionRowDict, SessionListItemDict
from models.session import SessionDict
from utils.session_path import get_claude_projects_dir, list_projects, list_sessions, safe_join
from utils.exclusion_rules import is_session_excluded

projects_bp = Blueprint("projects", __name__)


def _session_row_ok(s: SessionListItemDict, parsed: SessionDict) -> ProjectSessionRowDict:
meta = parsed["metadata"]
models = meta.get("models_used", [])
return {
"id": s["id"],
"path": s["path"],
"size_bytes": s["size_bytes"],
"modified": s["modified"],
"title": parsed["title"],
"models": sorted(models) if isinstance(models, set) else list(models),
"tokens": meta["total_input_tokens"] + meta["total_output_tokens"],
"tool_calls": meta["total_tool_calls"],
"first_timestamp": meta["first_timestamp"],
"last_timestamp": meta["last_timestamp"],
}


def _session_row_error(s: SessionListItemDict) -> ProjectSessionRowDict:
return {
"id": s["id"],
"path": s["path"],
"size_bytes": s["size_bytes"],
"modified": s["modified"],
"title": "Error parsing session",
"error": True,
}


@projects_bp.route("/api/projects")
def get_projects():
def get_projects() -> FlaskReturn:
base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir()
projects = list_projects(base)

Expand All @@ -36,44 +67,35 @@ def get_projects():
if latest_ts:
project["last_modified"] = latest_ts

return jsonify(projects)
return json_response(projects)


@projects_bp.route("/api/projects/<path:project_name>/sessions")
def get_project_sessions(project_name):
def get_project_sessions(project_name: str) -> FlaskReturn:
base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir()
try:
project_dir = safe_join(base, project_name)
except ValueError:
return jsonify([]), 400
return json_response([]), 400
sessions = list_sessions(project_dir)
# Add summary preview for each session
from utils.jsonl_parser import parse_session
rules = current_app.config.get("EXCLUSION_RULES") or []
result = []
result: list[ProjectSessionRowDict] = []
for s in sessions:
try:
parsed = parse_session(s["path"])
meta = parsed["metadata"]
# Skip untitled sessions (no real conversation)
if parsed["title"] == "Untitled Session":
continue
if is_session_excluded(rules, parsed, project_name):
continue
result.append({
**s,
"title": parsed["title"],
"models": meta["models_used"],
"tokens": meta["total_input_tokens"] + meta["total_output_tokens"],
"tool_calls": meta["total_tool_calls"],
"first_timestamp": meta["first_timestamp"],
"last_timestamp": meta["last_timestamp"],
})
result.append(_session_row_ok(s, parsed))
except Exception:
# Full detail (class, message, traceback) to the server log via
# logger.exception. The per-session card carries only `error: True`
# — the class-name+message string was a leak (issue #25). The
# operator looks at the server log for triage.
current_app.logger.exception("Failed to parse session %s", s["id"])
result.append({**s, "title": "Error parsing session", "error": True})
return jsonify(result)
result.append(_session_row_error(s))
return json_response(result)
30 changes: 25 additions & 5 deletions api/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,45 @@

from flask import Blueprint, current_app, jsonify, request

from api._flask_types import FlaskReturn, json_error, json_response
from models.search import SearchHitDict
from utils.session_path import get_claude_projects_dir, list_projects, list_sessions
from utils.jsonl_parser import parse_session
from utils.exclusion_rules import is_session_excluded

search_bp = Blueprint("search", __name__)

_DEFAULT_LIMIT = 50


def _parse_limit(raw: str | None, default: int = _DEFAULT_LIMIT) -> int:
"""Parse a positive integer limit from a query string value."""
if raw is None or raw.strip() == "":
return default
try:
value = int(raw)
except ValueError:
raise ValueError("Invalid limit: must be a positive integer") from None
if value < 1:
raise ValueError("Invalid limit: must be a positive integer")
return value
Comment thread
clean6378-max-it marked this conversation as resolved.


@search_bp.route("/api/search")
def search():
def search() -> FlaskReturn:
query = request.args.get("q", "").strip().lower()
if not query:
return jsonify([])
return json_response([])

max_results = int(request.args.get("limit", 50))
try:
max_results = _parse_limit(request.args.get("limit"))
except ValueError as e:
return json_error(str(e), 400)
base = current_app.config.get("CLAUDE_PROJECTS_DIR") or get_claude_projects_dir()
projects = list_projects(base)

rules = current_app.config.get("EXCLUSION_RULES") or []
results = []
results: list[SearchHitDict] = []
for project in projects:
sessions = list_sessions(project["path"])
for sess_info in sessions:
Expand Down Expand Up @@ -56,4 +76,4 @@ def search():
if len(results) >= max_results:
break

return jsonify(results)
return json_response(results)
Loading
Loading