From 1cb18bbac4522b1f9bcc58a5e51a6ea05269fa14 Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Tue, 17 Feb 2026 21:04:47 -0500 Subject: [PATCH 1/8] Add optional exclusion rules for sensitive projects/chats (issue #1) - Add --exclude-rules PATH to app.py and scripts/export.py; default ~/.cursor-chat-browser/exclusion-rules.txt when file exists - New utils/exclusion_rules.py: rule file supports keywords (AND/OR) and double-quoted exact phrases; UTF-8 text, comments with # - Filter workspaces and tabs in api/workspaces.py (browser) - Filter export in api/export_api.py and scripts/export.py - Add tests in tests/test_exclusion_rules.py Co-authored-by: Cursor --- api/export_api.py | 14 ++- api/workspaces.py | 52 +++++++-- app.py | 22 +++- scripts/export.py | 31 +++++- tests/__init__.py | 1 + tests/test_exclusion_rules.py | 177 ++++++++++++++++++++++++++++++ utils/exclusion_rules.py | 197 ++++++++++++++++++++++++++++++++++ 7 files changed, 484 insertions(+), 10 deletions(-) create mode 100644 tests/__init__.py create mode 100644 tests/test_exclusion_rules.py create mode 100644 utils/exclusion_rules.py diff --git a/api/export_api.py b/api/export_api.py index 8771db6..274e4ad 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -14,12 +14,13 @@ from datetime import datetime from pathlib import Path -from flask import Blueprint, Response, jsonify, request +from flask import Blueprint, Response, current_app, jsonify, request from utils.workspace_path import resolve_workspace_path from utils.path_helpers import normalize_file_path, get_workspace_folder_paths, to_epoch_ms from utils.text_extract import extract_text_from_bubble from utils.tool_parser import parse_tool_call +from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules bp = Blueprint("export_api", __name__) @@ -155,6 +156,7 @@ def export_chats(): today = datetime.now().strftime("%Y-%m-%d") exported = [] + rules = current_app.config.get("EXCLUSION_RULES") or [] for row in composer_rows: composer_id = row["key"].split(":")[1] @@ -171,6 +173,16 @@ def export_chats(): ws_id = composer_id_to_ws.get(composer_id, "global") ws_slug = "other-chats" if ws_id == "global" else (ws_id_to_slug.get(ws_id) or _slug(ws_id[:12])) title = cd.get("name") or f"Chat {composer_id[:8]}" + model_config = cd.get("modelConfig") or {} + model_name = model_config.get("modelName") + model_names = [model_name] if model_name and model_name != "default" else None + searchable = build_searchable_text( + project_name=ws_slug, + chat_title=title, + model_names=model_names, + ) + if is_excluded_by_rules(rules, searchable): + continue title_slug = _slug(title) ts_ms = updated_at_ms or int(datetime.now().timestamp() * 1000) ts_str = datetime.fromtimestamp(ts_ms / 1000).strftime("%Y-%m-%dT%H-%M-%S") diff --git a/api/workspaces.py b/api/workspaces.py index d8d35fc..cbb818a 100644 --- a/api/workspaces.py +++ b/api/workspaces.py @@ -14,11 +14,12 @@ import sys from datetime import datetime, timezone -from flask import Blueprint, jsonify +from flask import Blueprint, current_app, jsonify from utils.workspace_path import resolve_workspace_path from utils.path_helpers import normalize_file_path, get_workspace_folder_paths, to_epoch_ms from utils.text_extract import extract_text_from_bubble, format_tool_action +from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules bp = Blueprint("workspaces", __name__) @@ -352,6 +353,9 @@ def list_workspaces(): if global_db: global_db.close() + # Exclusion rules (optional) + rules = current_app.config.get("EXCLUSION_RULES") or [] + # Build project list — merge workspace entries sharing the same folder from urllib.parse import unquote as _unquote @@ -407,10 +411,19 @@ def list_workspaces(): except Exception: pass - # Merge conversations from all workspace IDs in the group + # Merge conversations from all workspace IDs in the group; apply exclusion rules convos = [] for ws_id in all_ws_ids: - convos.extend(conversation_map.get(ws_id, [])) + for c in conversation_map.get(ws_id, []): + searchable = build_searchable_text( + project_name=workspace_name, + chat_title=c.get("name"), + ) + if not is_excluded_by_rules(rules, searchable): + convos.append(c) + + if is_excluded_by_rules(rules, workspace_name): + continue projects.append({ "id": primary["name"], @@ -422,8 +435,11 @@ def list_workspaces(): **({"aliasIds": all_ws_ids} if len(all_ws_ids) > 1 else {}), }) - # Global (unmatched) conversations - global_convos = conversation_map.get("global", []) + # Global (unmatched) conversations; apply exclusion rules + global_convos = [ + c for c in conversation_map.get("global", []) + if not is_excluded_by_rules(rules, c.get("name") or "") + ] if global_convos: last_updated = max((c.get("lastUpdatedAt") or 0 for c in global_convos), default=0) projects.append({ @@ -558,6 +574,24 @@ def get_workspace_tabs(workspace_id): if not os.path.isfile(global_db_path): return jsonify({"error": "Global storage not found"}), 404 + # Workspace display name for exclusion rules + workspace_display_name = "Other chats" if workspace_id == "global" else workspace_id + if workspace_id != "global": + wj_path = os.path.join(workspace_path, workspace_id, "workspace.json") + try: + wd = _read_json_file(wj_path) + first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") + if first_folder: + from urllib.parse import unquote as _unquote + parts = first_folder.replace("\\", "/").split("/") + fn = parts[-1] if parts else None + if fn: + workspace_display_name = _unquote(fn) + except Exception: + pass + + rules = current_app.config.get("EXCLUSION_RULES") or [] + global_db = sqlite3.connect(f"file:{global_db_path}?mode=ro", uri=True) global_db.row_factory = sqlite3.Row @@ -922,7 +956,13 @@ def get_workspace_tabs(workspace_id): if tab_meta: tab["metadata"] = tab_meta - response["tabs"].append(tab) + searchable = build_searchable_text( + project_name=workspace_display_name, + chat_title=title, + model_names=tab_meta.get("modelsUsed") if tab_meta else None, + ) + if not is_excluded_by_rules(rules, searchable): + response["tabs"].append(tab) except Exception as e: print(f"Error parsing composer data for {composer_id}: {e}") diff --git a/app.py b/app.py index b5a6760..55ee93c 100644 --- a/app.py +++ b/app.py @@ -4,6 +4,8 @@ from the Cursor editor's AI chat feature. """ +import os + from flask import Flask, render_template, send_from_directory from api.workspaces import bp as workspaces_bp @@ -13,12 +15,18 @@ from api.export_api import bp as export_bp from api.pdf import bp as pdf_bp from api.config_api import bp as config_bp +from utils.exclusion_rules import resolve_exclusion_rules_path, load_rules -def create_app(): +def create_app(exclusion_rules_path=None): app = Flask(__name__, static_folder="static", template_folder="templates") app.config["JSON_SORT_KEYS"] = False + # Exclusion rules: optional path (CLI or default ~/.cursor-chat-browser/exclusion-rules.txt) + resolved = resolve_exclusion_rules_path(exclusion_rules_path) + app.config["EXCLUSION_RULES_PATH"] = resolved + app.config["EXCLUSION_RULES"] = load_rules(resolved) if resolved and os.path.isfile(resolved) else [] + # Register API blueprints app.register_blueprint(workspaces_bp) app.register_blueprint(composers_bp) @@ -57,7 +65,17 @@ def favicon(): if __name__ == "__main__": import sys - app = create_app() + exclusion_path = None + argv = sys.argv[1:] + i = 0 + while i < len(argv): + if argv[i] in ("--exclude-rules", "-e") and i + 1 < len(argv): + exclusion_path = argv[i + 1] + i += 2 + continue + i += 1 + + app = create_app(exclusion_rules_path=exclusion_path) print("Cursor Chat Browser (Python) running at http://localhost:3000") # use_reloader=False avoids a Windows socket issue with Flask's stat reloader app.run( diff --git a/scripts/export.py b/scripts/export.py index 8a1c01e..3afe785 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -15,6 +15,18 @@ from datetime import datetime from pathlib import Path +# Ensure project root is on path when run as python scripts/export.py +_project_root = Path(__file__).resolve().parent.parent +if str(_project_root) not in sys.path: + sys.path.insert(0, str(_project_root)) + +from utils.exclusion_rules import ( + resolve_exclusion_rules_path, + load_rules, + build_searchable_text, + is_excluded_by_rules, +) + def get_default_workspace_path() -> str: home = str(Path.home()) @@ -178,13 +190,15 @@ def get_workspace_folder_paths(wd) -> list: --out DIR Output directory. Default: current working directory (.) --no-zip Write individual Markdown files instead of a zip archive. --no-composer Exclude composer logs (export only chat logs). + --exclude-rules P Path to exclusion rules file (sensitive projects/chats are omitted). + If omitted, uses ~/.cursor-chat-browser/exclusion-rules.txt if present. --help Show this help message and exit. """ def parse_args(): args = sys.argv[1:] - out = {"since": "all", "out_dir": ".", "include_composer": True, "zip": True} + out = {"since": "all", "out_dir": ".", "include_composer": True, "zip": True, "exclusion_rules_path": None} i = 0 while i < len(args): if args[i] in ("--help", "-h"): @@ -196,6 +210,9 @@ def parse_args(): elif args[i] == "--out" and i + 1 < len(args): i += 1 out["out_dir"] = args[i] + elif args[i] in ("--exclude-rules", "-e") and i + 1 < len(args): + i += 1 + out["exclusion_rules_path"] = args[i] elif args[i] == "--no-composer": out["include_composer"] = False elif args[i] == "--no-zip": @@ -209,6 +226,8 @@ def main(): since = opts["since"] out_dir = os.path.abspath(opts["out_dir"]) use_zip = opts["zip"] + exclusion_path = resolve_exclusion_rules_path(opts.get("exclusion_rules_path")) + exclusion_rules = load_rules(exclusion_path) if exclusion_path and os.path.isfile(exclusion_path or "") else [] workspace_path = resolve_workspace_path() global_path = os.path.normpath(os.path.join(workspace_path, "..", "globalStorage", "state.vscdb")) @@ -424,6 +443,16 @@ def assign_workspace(cd, cid): ws_id = assign_workspace(cd, composer_id) ws_slug = "other-chats" if ws_id == "global" else (workspace_id_to_slug.get(ws_id) or slug(ws_id[:12])) title = cd.get("name") or f"Chat {composer_id[:8]}" + model_config = cd.get("modelConfig") or {} + model_name = model_config.get("modelName") + model_names = [model_name] if model_name and model_name != "default" else None + searchable = build_searchable_text( + project_name=ws_slug, + chat_title=title, + model_names=model_names, + ) + if is_excluded_by_rules(exclusion_rules, searchable): + continue title_slug = slug(title) ts = updated_at or int(datetime.now().timestamp() * 1000) ts_str = datetime.fromtimestamp(ts / 1000).strftime("%Y-%m-%dT%H-%M-%S") diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..4fe4e36 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +# Tests for cursor-chat-browser-python diff --git a/tests/test_exclusion_rules.py b/tests/test_exclusion_rules.py new file mode 100644 index 0000000..283b6b1 --- /dev/null +++ b/tests/test_exclusion_rules.py @@ -0,0 +1,177 @@ +""" +Tests for exclusion rules (filtering sensitive projects/chats). +Run from project root: python -m pytest tests/test_exclusion_rules.py -v +or: python -m unittest tests.test_exclusion_rules -v +""" + +import os +import tempfile +import unittest + +# Ensure project root is on path when running tests +import sys +from pathlib import Path + +_root = Path(__file__).resolve().parent.parent +if str(_root) not in sys.path: + sys.path.insert(0, str(_root)) + +from utils.exclusion_rules import ( + load_rules, + is_excluded_by_rules, + build_searchable_text, + get_default_exclusion_rules_path, + resolve_exclusion_rules_path, +) + + +class TestBuildSearchableText(unittest.TestCase): + def test_empty(self): + self.assertEqual(build_searchable_text(), "") + + def test_project_only(self): + self.assertEqual( + build_searchable_text(project_name="my-project"), + "my-project", + ) + + def test_project_and_title(self): + t = build_searchable_text(project_name="proj", chat_title="Chat 1") + self.assertIn("proj", t) + self.assertIn("Chat 1", t) + + def test_model_names(self): + t = build_searchable_text( + project_name="p", + chat_title="t", + model_names=["gpt-4", "claude-3"], + ) + self.assertIn("gpt-4", t) + self.assertIn("claude-3", t) + + +class TestExclusionMatching(unittest.TestCase): + def test_no_rules(self): + self.assertFalse(is_excluded_by_rules([], "anything")) + self.assertFalse(is_excluded_by_rules([], "")) + + def test_single_word_rule(self): + rules = [[("word", "secret")]] + self.assertTrue(is_excluded_by_rules(rules, "this is secret stuff")) + self.assertTrue(is_excluded_by_rules(rules, "SECRET")) + self.assertFalse(is_excluded_by_rules(rules, "public")) + + def test_phrase_rule(self): + rules = [[("phrase", "project alpha")]] + self.assertTrue(is_excluded_by_rules(rules, "Confidential: project alpha internal")) + self.assertFalse(is_excluded_by_rules(rules, "project and alpha")) + + def test_or_rule(self): + # secret OR internal + rules = [[("word", "secret"), "OR", ("word", "internal")]] + self.assertTrue(is_excluded_by_rules(rules, "secret data")) + self.assertTrue(is_excluded_by_rules(rules, "internal only")) + self.assertTrue(is_excluded_by_rules(rules, "secret internal")) + self.assertFalse(is_excluded_by_rules(rules, "public data")) + + def test_and_rule(self): + # foo AND bar + rules = [[("word", "foo"), "AND", ("word", "bar")]] + self.assertTrue(is_excluded_by_rules(rules, "foo and bar")) + self.assertFalse(is_excluded_by_rules(rules, "foo only")) + self.assertFalse(is_excluded_by_rules(rules, "bar only")) + + def test_and_precedence_over_or(self): + # a OR b AND c => (a) OR (b AND c) + rules = [[("word", "a"), "OR", ("word", "b"), "AND", ("word", "c")]] + self.assertTrue(is_excluded_by_rules(rules, "a")) + self.assertFalse(is_excluded_by_rules(rules, "b")) + self.assertFalse(is_excluded_by_rules(rules, "c")) + self.assertTrue(is_excluded_by_rules(rules, "b and c")) + self.assertTrue(is_excluded_by_rules(rules, "a or b")) + + def test_any_rule_matches(self): + rules = [ + [("word", "x")], + [("word", "y")], + ] + self.assertTrue(is_excluded_by_rules(rules, "x")) + self.assertTrue(is_excluded_by_rules(rules, "y")) + self.assertFalse(is_excluded_by_rules(rules, "z")) + + +class TestLoadRules(unittest.TestCase): + def test_none_path(self): + self.assertEqual(load_rules(None), []) + + def test_missing_file(self): + self.assertEqual(load_rules("/nonexistent/path/rules.txt"), []) + + def test_empty_file(self): + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + f.write("") + path = f.name + try: + self.assertEqual(load_rules(path), []) + finally: + os.unlink(path) + + def test_comments_and_blank(self): + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + f.write("# comment\n\n \nsecret\n") + path = f.name + try: + rules = load_rules(path) + self.assertEqual(len(rules), 1) + self.assertEqual(rules[0], [("word", "secret")]) + finally: + os.unlink(path) + + def test_word_and_phrase(self): + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False) as f: + f.write('secret OR "project alpha"\n') + path = f.name + try: + rules = load_rules(path) + self.assertEqual(len(rules), 1) + self.assertEqual(len(rules[0]), 3) # (word, secret), OR, (phrase, project alpha) + self.assertEqual(rules[0][0], ("word", "secret")) + self.assertEqual(rules[0][1], "OR") + self.assertEqual(rules[0][2], ("phrase", "project alpha")) + finally: + os.unlink(path) + + def test_utf8(self): + with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", suffix=".txt", delete=False) as f: + f.write("секрет\n") + path = f.name + try: + rules = load_rules(path) + self.assertEqual(len(rules), 1) + self.assertTrue(is_excluded_by_rules(rules, "документ секрет")) + finally: + os.unlink(path) + + +class TestResolvePath(unittest.TestCase): + def test_default_none_when_no_file(self): + # Default path may or may not exist; we only care that when cli_path is None + # we get None if default file doesn't exist + result = resolve_exclusion_rules_path(None) + default_path = get_default_exclusion_rules_path() + if os.path.isfile(default_path): + self.assertEqual(result, default_path) + else: + self.assertIsNone(result) + + def test_cli_path_returned_when_given(self): + with tempfile.NamedTemporaryFile(suffix=".txt", delete=False) as f: + path = f.name + try: + self.assertEqual(resolve_exclusion_rules_path(path), path) + finally: + os.unlink(path) + + +if __name__ == "__main__": + unittest.main() diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py new file mode 100644 index 0000000..7a50f98 --- /dev/null +++ b/utils/exclusion_rules.py @@ -0,0 +1,197 @@ +""" +Exclusion rules for filtering sensitive projects/chats. + +Rule file: UTF-8 text. Lines starting with # or empty are ignored. +Each other line is one rule. If ANY rule matches the combined searchable text +(project title, chat title, model names, content), the item is excluded. + +Rule syntax: + - Terms separated by AND or OR (case-insensitive). + - AND has higher precedence: "a OR b AND c" means (a) OR (b AND c). + - Term = single word (substring match, case-insensitive) or "exact phrase" (exact phrase match). + - One rule per line. + +Example exclusion-rules.txt: + # Exclude anything mentioning secret or internal + secret OR internal + "project alpha" AND confidential + password +""" + +from __future__ import annotations + +import os +import re +from pathlib import Path + + +# Default path when no --exclude-rules is given: ~/.cursor-chat-browser/exclusion-rules.txt +DEFAULT_EXCLUSION_RULES_FILENAME = "exclusion-rules.txt" + + +def get_default_exclusion_rules_path() -> str: + """Path to the default exclusion rules file in user config dir.""" + return os.path.join(str(Path.home()), ".cursor-chat-browser", DEFAULT_EXCLUSION_RULES_FILENAME) + + +def resolve_exclusion_rules_path(cli_path: str | None) -> str | None: + """ + Resolve the exclusion rules file path. + - If cli_path is given and the file exists, return it (absolute or cwd-relative). + - Else if the default file exists in ~/.cursor-chat-browser/, return that path. + - Else return None (no filtering). + """ + if cli_path: + p = os.path.abspath(os.path.expanduser(cli_path)) + if os.path.isfile(p): + return p + return p # still use it; loader will report missing file + default = get_default_exclusion_rules_path() + if os.path.isfile(default): + return default + return None + + +def _tokenize_rule(line: str) -> list[str]: + """ + Tokenize a rule line into terms and operators. + Returns a list of tokens: "AND", "OR", or term (keyword or "phrase"). + """ + tokens = [] + rest = line.strip() + while rest: + # Skip whitespace + m = re.match(r"\s+", rest) + if m: + rest = rest[m.end() :] + continue + # AND (word boundary) + if re.match(r"\bAND\b", rest, re.IGNORECASE): + tokens.append("AND") + rest = rest[3:].lstrip() + continue + # OR (word boundary) + if re.match(r"\bOR\b", rest, re.IGNORECASE): + tokens.append("OR") + rest = rest[2:].lstrip() + continue + # Double-quoted phrase + if rest.startswith('"'): + end = rest.find('"', 1) + if end == -1: + # Unclosed quote: treat remainder as one word term + tokens.append(("word", rest[1:].strip())) + break + tokens.append(("phrase", rest[1:end])) + rest = rest[end + 1 :].lstrip() + continue + # Single word (until space or end) + m = re.match(r"\S+", rest) + if m: + tokens.append(("word", m.group(0))) + rest = rest[m.end() :].lstrip() + continue + break + return tokens + + +def _term_matches(term: tuple[str, str], text: str) -> bool: + """Check if a term (word or phrase) matches in text (case-insensitive).""" + kind, value = term + if not value: + return False + text_lower = text.lower() + if kind == "word": + return value.lower() in text_lower + # phrase: exact substring (case-insensitive) + return value.lower() in text_lower + + +def _rule_matches(tokens: list, text: str) -> bool: + """ + Evaluate a tokenized rule against text. + AND has higher precedence: a OR b AND c => (a) OR (b AND c). + """ + if not tokens: + return False + # Split by OR into clauses; each clause is AND of terms + clauses = [] + current = [] + for t in tokens: + if t == "OR": + if current: + clauses.append(current) + current = [] + elif t == "AND": + # just skip; we collect terms, AND is implicit between them + continue + else: + current.append(t) + if current: + clauses.append(current) + + for clause in clauses: + if not clause: + continue + # Clause matches if all terms match (AND) + if all(_term_matches(term, text) for term in clause if isinstance(term, tuple)): + return True + return False + + +def load_rules(path: str | None) -> list[list]: + """ + Load and parse the exclusion rule file. + Returns a list of tokenized rules (each is a list of tokens). + If path is None or file is missing/unreadable, returns []. + """ + if not path or not os.path.isfile(path): + return [] + rules = [] + try: + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + tokens = _tokenize_rule(line) + if tokens: + rules.append(tokens) + except Exception: + return [] + return rules + + +def is_excluded_by_rules(rules: list[list], searchable_text: str) -> bool: + """ + Return True if searchable_text should be excluded (any rule matches). + searchable_text is typically a combination of project name, chat title, model names, etc. + """ + if not searchable_text or not rules: + return False + for tokenized in rules: + if _rule_matches(tokenized, searchable_text): + return True + return False + + +def build_searchable_text( + *, + project_name: str | None = None, + chat_title: str | None = None, + model_names: list[str] | None = None, + chat_content_snippet: str | None = None, +) -> str: + """Build a single string to run exclusion rules against (e.g. for a chat or project).""" + parts = [] + if project_name: + parts.append(project_name) + if chat_title: + parts.append(chat_title) + if model_names: + parts.extend(model_names) + if chat_content_snippet: + # Limit size to avoid huge strings; first N chars is enough for keyword/phrase match + snippet = chat_content_snippet + parts.append(snippet[:50_000] if len(snippet) > 50_000 else snippet) + return "\n".join(p for p in parts if p) From d4919e7f3ecd92db10c2e062716cb277e2f0febd Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Tue, 17 Feb 2026 22:56:36 -0500 Subject: [PATCH 2/8] Address CodeRabbit review comments on PR #2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - utils/exclusion_rules.py: - Warn (via logger) when a CLI-provided rules file is missing - Log warning instead of bare except-and-swallow in load_rules - Simplify _term_matches: collapse redundant word/phrase branches, add TODO note for potential future phrase-boundary matching - Add docstrings to all public and private functions/module - app.py: - Remove redundant os.path.isfile guard before load_rules (it already handles None/missing paths internally) - Document startup-load behaviour in comment - scripts/export.py: - Remove redundant os.path.isfile guard before load_rules - Build workspace_id_to_display_name mapping (human-readable, URL-decoded folder name) so build_searchable_text receives the same unslugified name as the browser API — fixes phrase-match mismatches (e.g. 'my project' vs 'my-project') - api/workspaces.py: - Extract _get_workspace_display_name() helper to eliminate duplicated workspace.json/folder-name resolution logic shared between list_workspaces and get_workspace_tabs - Move workspace-level exclusion check before the per-conversation loop to avoid iterating all conversations for excluded workspaces - Move tab-level exclusion check right after title resolution (before expensive bubble processing and metadata aggregation), using modelConfig.modelName instead of post-build tab_meta['modelsUsed'] - api/export_api.py: - Reuse model_config/model_name extracted before exclusion check instead of re-extracting them again for frontmatter generation - Add docstring to export_chats() documenting the startup-load behaviour and the need for an app restart when the rules file changes - tests/test_exclusion_rules.py: - Add test_implicit_and_adjacent_terms: adjacent terms (no explicit AND) - Add test_unclosed_quote_treated_as_word: tokenizer edge case - Add test_quoted_logical_operator_is_literal: quoted 'AND'/'OR' as text Co-authored-by: Cursor --- api/export_api.py | 9 ++- api/workspaces.py | 84 +++++++++++++------------ app.py | 7 +-- scripts/export.py | 9 ++- tests/test_exclusion_rules.py | 42 +++++++++++++ utils/exclusion_rules.py | 113 ++++++++++++++++++++++------------ 6 files changed, 179 insertions(+), 85 deletions(-) diff --git a/api/export_api.py b/api/export_api.py index 274e4ad..fda5732 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -71,6 +71,13 @@ def get_export_state(): @bp.route("/api/export", methods=["POST"]) def export_chats(): + """Export chats as a zip archive. + + Exclusion rules (``EXCLUSION_RULES`` app config key) are evaluated against + each chat's project name, title, and model. Rules are loaded once at + application startup; an app restart is required to pick up changes to the + exclusion rules file. + """ try: body = request.get_json(silent=True) or {} since = "last" if body.get("since") == "last" else "all" @@ -276,8 +283,6 @@ def export_chats(): md += f"updated_at: {datetime.fromtimestamp(updated_at_ms / 1000).isoformat() if updated_at_ms else datetime.now().isoformat()}\n" md += f"workspace: {ws_slug}\n" md += f"message_count: {len(bubbles)}\n" - model_config = cd.get("modelConfig") or {} - model_name = model_config.get("modelName") if model_name: md += f"model: {model_name}\n" if total_response_ms: diff --git a/api/workspaces.py b/api/workspaces.py index cbb818a..b72715d 100644 --- a/api/workspaces.py +++ b/api/workspaces.py @@ -21,9 +21,35 @@ from utils.text_extract import extract_text_from_bubble, format_tool_action from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules +from urllib.parse import unquote as _url_unquote + bp = Blueprint("workspaces", __name__) +def _get_workspace_display_name(workspace_path: str, workspace_id: str) -> str: + """ + Return a human-readable display name for a workspace. + + Reads the workspace's ``workspace.json`` to extract the last path segment + of the first configured folder, URL-decodes it, and returns it. Falls back + to ``"Other chats"`` for the virtual ``"global"`` workspace and to + *workspace_id* if the JSON cannot be read. + """ + if workspace_id == "global": + return "Other chats" + wj_path = os.path.join(workspace_path, workspace_id, "workspace.json") + try: + wd = _read_json_file(wj_path) + first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") + if first_folder: + fn = first_folder.replace("\\", "/").split("/")[-1] + if fn: + return _url_unquote(fn) + except Exception: + pass + return workspace_id + + # --------------------------------------------------------------------------- # Shared helpers (duplicated in tabs route in the Node.js project) # --------------------------------------------------------------------------- @@ -357,7 +383,6 @@ def list_workspaces(): rules = current_app.config.get("EXCLUSION_RULES") or [] # Build project list — merge workspace entries sharing the same folder - from urllib.parse import unquote as _unquote # Group workspace entries by normalized folder path folder_to_entries: dict[str, list] = {} @@ -399,17 +424,13 @@ def list_workspaces(): except Exception: mtime = 0 - workspace_name = f"Project {primary['name'][:8]}" - try: - wd = _read_json_file(primary["workspaceJsonPath"]) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") - if first_folder: - parts = first_folder.replace("\\", "/").split("/") - fn = parts[-1] if parts else None - if fn: - workspace_name = _unquote(fn) - except Exception: - pass + workspace_name = _get_workspace_display_name(workspace_path, primary["name"]) + if workspace_name == primary["name"]: + workspace_name = f"Project {primary['name'][:8]}" + + # Skip entire workspace before iterating conversations + if is_excluded_by_rules(rules, workspace_name): + continue # Merge conversations from all workspace IDs in the group; apply exclusion rules convos = [] @@ -422,9 +443,6 @@ def list_workspaces(): if not is_excluded_by_rules(rules, searchable): convos.append(c) - if is_excluded_by_rules(rules, workspace_name): - continue - projects.append({ "id": primary["name"], "name": workspace_name, @@ -574,22 +592,7 @@ def get_workspace_tabs(workspace_id): if not os.path.isfile(global_db_path): return jsonify({"error": "Global storage not found"}), 404 - # Workspace display name for exclusion rules - workspace_display_name = "Other chats" if workspace_id == "global" else workspace_id - if workspace_id != "global": - wj_path = os.path.join(workspace_path, workspace_id, "workspace.json") - try: - wd = _read_json_file(wj_path) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") - if first_folder: - from urllib.parse import unquote as _unquote - parts = first_folder.replace("\\", "/").split("/") - fn = parts[-1] if parts else None - if fn: - workspace_display_name = _unquote(fn) - except Exception: - pass - + workspace_display_name = _get_workspace_display_name(workspace_path, workspace_id) rules = current_app.config.get("EXCLUSION_RULES") or [] global_db = sqlite3.connect(f"file:{global_db_path}?mode=ro", uri=True) @@ -832,6 +835,17 @@ def get_workspace_tabs(workspace_id): if len(title) == 100: title += "..." + # Early exclusion check — run before expensive metadata aggregation + _early_model_config = cd.get("modelConfig") or {} + _early_model_name = _early_model_config.get("modelName") + _early_model_names = [_early_model_name] if _early_model_name and _early_model_name != "default" else None + if is_excluded_by_rules(rules, build_searchable_text( + project_name=workspace_display_name, + chat_title=title, + model_names=_early_model_names, + )): + continue + # Code block diffs as extra bubbles diffs = code_block_diff_map.get(composer_id, []) for diff in diffs: @@ -956,13 +970,7 @@ def get_workspace_tabs(workspace_id): if tab_meta: tab["metadata"] = tab_meta - searchable = build_searchable_text( - project_name=workspace_display_name, - chat_title=title, - model_names=tab_meta.get("modelsUsed") if tab_meta else None, - ) - if not is_excluded_by_rules(rules, searchable): - response["tabs"].append(tab) + response["tabs"].append(tab) except Exception as e: print(f"Error parsing composer data for {composer_id}: {e}") diff --git a/app.py b/app.py index 55ee93c..8c1c497 100644 --- a/app.py +++ b/app.py @@ -4,8 +4,6 @@ from the Cursor editor's AI chat feature. """ -import os - from flask import Flask, render_template, send_from_directory from api.workspaces import bp as workspaces_bp @@ -22,10 +20,11 @@ def create_app(exclusion_rules_path=None): app = Flask(__name__, static_folder="static", template_folder="templates") app.config["JSON_SORT_KEYS"] = False - # Exclusion rules: optional path (CLI or default ~/.cursor-chat-browser/exclusion-rules.txt) + # Exclusion rules: optional path (CLI or default ~/.cursor-chat-browser/exclusion-rules.txt). + # Rules are loaded once at startup; an app restart is required to pick up changes to the file. resolved = resolve_exclusion_rules_path(exclusion_rules_path) app.config["EXCLUSION_RULES_PATH"] = resolved - app.config["EXCLUSION_RULES"] = load_rules(resolved) if resolved and os.path.isfile(resolved) else [] + app.config["EXCLUSION_RULES"] = load_rules(resolved) # Register API blueprints app.register_blueprint(workspaces_bp) diff --git a/scripts/export.py b/scripts/export.py index 3afe785..804ee19 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -226,8 +226,7 @@ def main(): since = opts["since"] out_dir = os.path.abspath(opts["out_dir"]) use_zip = opts["zip"] - exclusion_path = resolve_exclusion_rules_path(opts.get("exclusion_rules_path")) - exclusion_rules = load_rules(exclusion_path) if exclusion_path and os.path.isfile(exclusion_path or "") else [] + exclusion_rules = load_rules(resolve_exclusion_rules_path(opts.get("exclusion_rules_path"))) workspace_path = resolve_workspace_path() global_path = os.path.normpath(os.path.join(workspace_path, "..", "globalStorage", "state.vscdb")) @@ -266,15 +265,18 @@ def main(): workspace_path_to_id = {} project_name_to_ws = {} workspace_id_to_slug = {} + workspace_id_to_display_name: dict[str, str] = {} # human-readable, URL-decoded folder name for e in workspace_entries: try: with open(e["workspaceJsonPath"], "r", encoding="utf-8") as f: wd = json.load(f) first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") if first_folder: + from urllib.parse import unquote as _unquote fn = re.sub(r"^file://", "", first_folder).replace("\\", "/").split("/")[-1] if fn: workspace_id_to_slug[e["name"]] = slug(fn) + workspace_id_to_display_name[e["name"]] = _unquote(fn) for folder in get_workspace_folder_paths(wd): norm = normalize_file_path(folder) workspace_path_to_id[norm] = e["name"] @@ -442,12 +444,13 @@ def assign_workspace(cd, cid): ws_id = assign_workspace(cd, composer_id) ws_slug = "other-chats" if ws_id == "global" else (workspace_id_to_slug.get(ws_id) or slug(ws_id[:12])) + ws_display_name = "Other chats" if ws_id == "global" else (workspace_id_to_display_name.get(ws_id) or ws_slug) title = cd.get("name") or f"Chat {composer_id[:8]}" model_config = cd.get("modelConfig") or {} model_name = model_config.get("modelName") model_names = [model_name] if model_name and model_name != "default" else None searchable = build_searchable_text( - project_name=ws_slug, + project_name=ws_display_name, chat_title=title, model_names=model_names, ) diff --git a/tests/test_exclusion_rules.py b/tests/test_exclusion_rules.py index 283b6b1..d4b949d 100644 --- a/tests/test_exclusion_rules.py +++ b/tests/test_exclusion_rules.py @@ -99,6 +99,48 @@ def test_any_rule_matches(self): self.assertTrue(is_excluded_by_rules(rules, "y")) self.assertFalse(is_excluded_by_rules(rules, "z")) + def test_implicit_and_adjacent_terms(self): + """Adjacent terms without an explicit AND operator are treated as AND.""" + rules = [[("word", "foo"), ("word", "bar")]] + self.assertTrue(is_excluded_by_rules(rules, "foo bar")) + self.assertTrue(is_excluded_by_rules(rules, "bar and foo")) + self.assertFalse(is_excluded_by_rules(rules, "foo only")) + self.assertFalse(is_excluded_by_rules(rules, "bar only")) + + def test_unclosed_quote_treated_as_word(self): + """An unclosed double-quote falls back to a plain word/substring match.""" + # Tokenizer produces ("word", "unclosed phrase") for `"unclosed phrase` + from utils.exclusion_rules import _tokenize_rule + tokens = _tokenize_rule('"unclosed phrase') + self.assertEqual(len(tokens), 1) + self.assertEqual(tokens[0][0], "word") + rules = [tokens] + self.assertTrue(is_excluded_by_rules(rules, "text with unclosed phrase inside")) + self.assertFalse(is_excluded_by_rules(rules, "something unrelated")) + + def test_quoted_logical_operator_is_literal(self): + """A quoted "AND" or "OR" is a literal term, not a boolean operator.""" + from utils.exclusion_rules import _tokenize_rule + # "AND" (quoted) should produce a phrase token, not the "AND" string + tokens_and = _tokenize_rule('"AND"') + self.assertEqual(len(tokens_and), 1) + self.assertIsInstance(tokens_and[0], tuple) + self.assertEqual(tokens_and[0][1], "AND") + + tokens_or = _tokenize_rule('"OR"') + self.assertEqual(len(tokens_or), 1) + self.assertIsInstance(tokens_or[0], tuple) + self.assertEqual(tokens_or[0][1], "OR") + + # The quoted term matches text containing the literal word + rules_and = [tokens_and] + self.assertTrue(is_excluded_by_rules(rules_and, "foo AND bar")) + self.assertFalse(is_excluded_by_rules(rules_and, "foo bar")) + + rules_or = [tokens_or] + self.assertTrue(is_excluded_by_rules(rules_or, "foo OR bar")) + self.assertFalse(is_excluded_by_rules(rules_or, "foo bar")) + class TestLoadRules(unittest.TestCase): def test_none_path(self): diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py index 7a50f98..0945b33 100644 --- a/utils/exclusion_rules.py +++ b/utils/exclusion_rules.py @@ -16,46 +16,62 @@ secret OR internal "project alpha" AND confidential password + +Note: Rules are loaded once at startup (or at the start of a CLI export run). +Changes to the exclusion rules file require an application restart (or re-running +the CLI export) to take effect. """ from __future__ import annotations +import logging import os import re from pathlib import Path +_logger = logging.getLogger(__name__) # Default path when no --exclude-rules is given: ~/.cursor-chat-browser/exclusion-rules.txt DEFAULT_EXCLUSION_RULES_FILENAME = "exclusion-rules.txt" def get_default_exclusion_rules_path() -> str: - """Path to the default exclusion rules file in user config dir.""" + """Return the path to the default exclusion rules file in the user config directory.""" return os.path.join(str(Path.home()), ".cursor-chat-browser", DEFAULT_EXCLUSION_RULES_FILENAME) def resolve_exclusion_rules_path(cli_path: str | None) -> str | None: """ Resolve the exclusion rules file path. - - If cli_path is given and the file exists, return it (absolute or cwd-relative). - - Else if the default file exists in ~/.cursor-chat-browser/, return that path. - - Else return None (no filtering). + + - If *cli_path* is given: expand and return its absolute path. If the + file doesn't exist a warning is emitted so the user knows their rules + aren't being applied (the path is still returned so load_rules can + explain the absence). + - If *cli_path* is None and the default file + (``~/.cursor-chat-browser/exclusion-rules.txt``) exists, return that. + - Otherwise return None (no filtering). """ if cli_path: p = os.path.abspath(os.path.expanduser(cli_path)) - if os.path.isfile(p): - return p - return p # still use it; loader will report missing file + if not os.path.isfile(p): + _logger.warning( + "Exclusion rules file not found: %s — no filtering will be applied.", p + ) + return p default = get_default_exclusion_rules_path() if os.path.isfile(default): return default return None -def _tokenize_rule(line: str) -> list[str]: +def _tokenize_rule(line: str) -> list: """ Tokenize a rule line into terms and operators. - Returns a list of tokens: "AND", "OR", or term (keyword or "phrase"). + + Returns a list of tokens where each token is either the string ``"AND"``, + the string ``"OR"``, or a ``(kind, value)`` tuple where *kind* is + ``"word"`` or ``"phrase"``. """ tokens = [] rest = line.strip() @@ -63,14 +79,14 @@ def _tokenize_rule(line: str) -> list[str]: # Skip whitespace m = re.match(r"\s+", rest) if m: - rest = rest[m.end() :] + rest = rest[m.end():] continue - # AND (word boundary) + # AND keyword (word boundary, case-insensitive) if re.match(r"\bAND\b", rest, re.IGNORECASE): tokens.append("AND") rest = rest[3:].lstrip() continue - # OR (word boundary) + # OR keyword (word boundary, case-insensitive) if re.match(r"\bOR\b", rest, re.IGNORECASE): tokens.append("OR") rest = rest[2:].lstrip() @@ -79,51 +95,61 @@ def _tokenize_rule(line: str) -> list[str]: if rest.startswith('"'): end = rest.find('"', 1) if end == -1: - # Unclosed quote: treat remainder as one word term + # Unclosed quote: treat remainder as a word term tokens.append(("word", rest[1:].strip())) break tokens.append(("phrase", rest[1:end])) - rest = rest[end + 1 :].lstrip() + rest = rest[end + 1:].lstrip() continue - # Single word (until space or end) + # Unquoted word (until next whitespace) m = re.match(r"\S+", rest) if m: tokens.append(("word", m.group(0))) - rest = rest[m.end() :].lstrip() + rest = rest[m.end():].lstrip() continue break return tokens -def _term_matches(term: tuple[str, str], text: str) -> bool: - """Check if a term (word or phrase) matches in text (case-insensitive).""" - kind, value = term +def _term_matches(term: tuple, text: str) -> bool: + """ + Return True if *term* matches anywhere in *text* (case-insensitive). + + Both ``"word"`` and ``"phrase"`` terms use a case-insensitive substring + check. A ``"phrase"`` term matches when the quoted string appears as a + contiguous substring (spaces included). + + .. note:: + Future versions may tighten ``"phrase"`` matching to require exact + word-boundary anchoring (e.g. via a regex) for stricter phrase + semantics. + """ + _kind, value = term if not value: return False - text_lower = text.lower() - if kind == "word": - return value.lower() in text_lower - # phrase: exact substring (case-insensitive) - return value.lower() in text_lower + return value.lower() in text.lower() def _rule_matches(tokens: list, text: str) -> bool: """ - Evaluate a tokenized rule against text. - AND has higher precedence: a OR b AND c => (a) OR (b AND c). + Evaluate a tokenized rule against *text*. + + Operator precedence: AND binds tighter than OR, so + ``a OR b AND c`` is parsed as ``(a) OR (b AND c)``. + Adjacent terms without an explicit operator are treated as AND. """ if not tokens: return False - # Split by OR into clauses; each clause is AND of terms - clauses = [] - current = [] + # Split by OR into clauses; each clause is the AND of its terms + clauses: list[list] = [] + current: list = [] for t in tokens: if t == "OR": if current: clauses.append(current) current = [] elif t == "AND": - # just skip; we collect terms, AND is implicit between them + # Explicit AND: terms are already collected sequentially, skip token continue else: current.append(t) @@ -133,7 +159,7 @@ def _rule_matches(tokens: list, text: str) -> bool: for clause in clauses: if not clause: continue - # Clause matches if all terms match (AND) + # Clause matches when every term in it matches (implicit AND) if all(_term_matches(term, text) for term in clause if isinstance(term, tuple)): return True return False @@ -141,9 +167,11 @@ def _rule_matches(tokens: list, text: str) -> bool: def load_rules(path: str | None) -> list[list]: """ - Load and parse the exclusion rule file. - Returns a list of tokenized rules (each is a list of tokens). - If path is None or file is missing/unreadable, returns []. + Load and parse the exclusion rule file at *path*. + + Returns a list of tokenized rules (each rule is a list of tokens as + produced by :func:`_tokenize_rule`). Returns an empty list when *path* + is ``None``, the file doesn't exist, or the file cannot be read. """ if not path or not os.path.isfile(path): return [] @@ -158,14 +186,18 @@ def load_rules(path: str | None) -> list[list]: if tokens: rules.append(tokens) except Exception: + _logger.warning("Failed to read exclusion rules from %s", path, exc_info=True) return [] return rules def is_excluded_by_rules(rules: list[list], searchable_text: str) -> bool: """ - Return True if searchable_text should be excluded (any rule matches). - searchable_text is typically a combination of project name, chat title, model names, etc. + Return ``True`` if *searchable_text* matches any exclusion rule. + + *searchable_text* is typically a combination of project name, chat title, + model names, etc., joined by newlines via :func:`build_searchable_text`. + Returns ``False`` when *rules* is empty or *searchable_text* is empty. """ if not searchable_text or not rules: return False @@ -182,7 +214,13 @@ def build_searchable_text( model_names: list[str] | None = None, chat_content_snippet: str | None = None, ) -> str: - """Build a single string to run exclusion rules against (e.g. for a chat or project).""" + """ + Combine chat/project metadata into a single string for rule matching. + + All non-empty, non-None parts are joined with newlines. A + *chat_content_snippet* longer than 50 000 characters is truncated since + keyword/phrase presence can be detected from the first portion alone. + """ parts = [] if project_name: parts.append(project_name) @@ -191,7 +229,6 @@ def build_searchable_text( if model_names: parts.extend(model_names) if chat_content_snippet: - # Limit size to avoid huge strings; first N chars is enough for keyword/phrase match snippet = chat_content_snippet parts.append(snippet[:50_000] if len(snippet) > 50_000 else snippet) return "\n".join(p for p in parts if p) From 6c79375a16f4fe62abb71185cbd8458bf84c1c61 Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Wed, 18 Feb 2026 16:40:42 -0500 Subject: [PATCH 3/8] Address second-round CodeRabbit review comments on PR #2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - api/export_api.py (major bug): build_searchable_text was receiving ws_slug (e.g. 'my-project') instead of the human-readable display name (e.g. 'my project'), causing quoted-phrase exclusion rules to match in the browser listing and CLI export but silently miss in the web-export endpoint. Added ws_id_to_display_name mapping alongside ws_id_to_slug and pass ws_display_name into build_searchable_text, consistent with all other call sites. - api/workspaces.py: global (unmatched) conversations were filtered with is_excluded_by_rules(rules, c.get('name') or '') — passing the raw name directly instead of going through build_searchable_text. Changed to build_searchable_text(project_name='Other chats', chat_title=c.get('name')) for consistency with how regular conversations are evaluated. - utils/exclusion_rules.py: all() on an empty iterable returns True, so a clause that somehow contained no tuple terms would falsely match. Collect tuple terms into a list first and only evaluate all(...) when the list is non-empty (latent safety fix). - tests/test_exclusion_rules.py: test_and_precedence_over_or used single-letter tokens ('a', 'b', 'c') that cause substring false-positives (e.g. 'a' hits inside 'and'). Replaced with non-overlapping multi-character tokens ('xx', 'yy', 'zz') so the test genuinely validates AND-over-OR precedence. - scripts/export.py: moved 'from urllib.parse import unquote as _unquote' out of the per-workspace loop body to module level (renamed _url_unquote to match export_api.py convention). Co-authored-by: Cursor --- api/export_api.py | 6 +++++- api/workspaces.py | 5 ++++- scripts/export.py | 4 ++-- tests/test_exclusion_rules.py | 16 +++++++++------- utils/exclusion_rules.py | 6 ++++-- 5 files changed, 24 insertions(+), 13 deletions(-) diff --git a/api/export_api.py b/api/export_api.py index fda5732..0757e45 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -102,8 +102,10 @@ def export_chats(): conn.row_factory = sqlite3.Row # Build workspace mapping + from urllib.parse import unquote as _url_unquote workspace_entries = [] ws_id_to_slug = {} + ws_id_to_display_name = {} # human-readable, URL-decoded folder name for name in os.listdir(workspace_path): full = os.path.join(workspace_path, name) wj = os.path.join(full, "workspace.json") @@ -117,6 +119,7 @@ def export_chats(): fn = first_folder.replace("\\", "/").split("/")[-1] if fn: ws_id_to_slug[name] = _slug(fn) + ws_id_to_display_name[name] = _url_unquote(fn) except Exception: pass @@ -179,12 +182,13 @@ def export_chats(): ws_id = composer_id_to_ws.get(composer_id, "global") ws_slug = "other-chats" if ws_id == "global" else (ws_id_to_slug.get(ws_id) or _slug(ws_id[:12])) + ws_display_name = "Other chats" if ws_id == "global" else (ws_id_to_display_name.get(ws_id) or ws_slug) title = cd.get("name") or f"Chat {composer_id[:8]}" model_config = cd.get("modelConfig") or {} model_name = model_config.get("modelName") model_names = [model_name] if model_name and model_name != "default" else None searchable = build_searchable_text( - project_name=ws_slug, + project_name=ws_display_name, chat_title=title, model_names=model_names, ) diff --git a/api/workspaces.py b/api/workspaces.py index b72715d..63c84d2 100644 --- a/api/workspaces.py +++ b/api/workspaces.py @@ -456,7 +456,10 @@ def list_workspaces(): # Global (unmatched) conversations; apply exclusion rules global_convos = [ c for c in conversation_map.get("global", []) - if not is_excluded_by_rules(rules, c.get("name") or "") + if not is_excluded_by_rules( + rules, + build_searchable_text(project_name="Other chats", chat_title=c.get("name")), + ) ] if global_convos: last_updated = max((c.get("lastUpdatedAt") or 0 for c in global_convos), default=0) diff --git a/scripts/export.py b/scripts/export.py index 804ee19..d612ddc 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -14,6 +14,7 @@ import zipfile from datetime import datetime from pathlib import Path +from urllib.parse import unquote as _url_unquote # Ensure project root is on path when run as python scripts/export.py _project_root = Path(__file__).resolve().parent.parent @@ -272,11 +273,10 @@ def main(): wd = json.load(f) first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") if first_folder: - from urllib.parse import unquote as _unquote fn = re.sub(r"^file://", "", first_folder).replace("\\", "/").split("/")[-1] if fn: workspace_id_to_slug[e["name"]] = slug(fn) - workspace_id_to_display_name[e["name"]] = _unquote(fn) + workspace_id_to_display_name[e["name"]] = _url_unquote(fn) for folder in get_workspace_folder_paths(wd): norm = normalize_file_path(folder) workspace_path_to_id[norm] = e["name"] diff --git a/tests/test_exclusion_rules.py b/tests/test_exclusion_rules.py index d4b949d..03c7cb4 100644 --- a/tests/test_exclusion_rules.py +++ b/tests/test_exclusion_rules.py @@ -82,13 +82,15 @@ def test_and_rule(self): self.assertFalse(is_excluded_by_rules(rules, "bar only")) def test_and_precedence_over_or(self): - # a OR b AND c => (a) OR (b AND c) - rules = [[("word", "a"), "OR", ("word", "b"), "AND", ("word", "c")]] - self.assertTrue(is_excluded_by_rules(rules, "a")) - self.assertFalse(is_excluded_by_rules(rules, "b")) - self.assertFalse(is_excluded_by_rules(rules, "c")) - self.assertTrue(is_excluded_by_rules(rules, "b and c")) - self.assertTrue(is_excluded_by_rules(rules, "a or b")) + # xx OR yy AND zz => (xx) OR (yy AND zz) + # Uses multi-character non-overlapping tokens to avoid substring false-positives + # (e.g. single-letter "a" would falsely match inside the word "and"). + rules = [[("word", "xx"), "OR", ("word", "yy"), "AND", ("word", "zz")]] + self.assertTrue(is_excluded_by_rules(rules, "xx")) # first OR clause matches + self.assertFalse(is_excluded_by_rules(rules, "yy")) # second clause needs both yy AND zz + self.assertFalse(is_excluded_by_rules(rules, "zz")) # second clause needs both yy AND zz + self.assertTrue(is_excluded_by_rules(rules, "yy and zz")) # second clause matches + self.assertTrue(is_excluded_by_rules(rules, "xx or yy")) # first clause matches via xx def test_any_rule_matches(self): rules = [ diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py index 0945b33..276cb3f 100644 --- a/utils/exclusion_rules.py +++ b/utils/exclusion_rules.py @@ -159,8 +159,10 @@ def _rule_matches(tokens: list, text: str) -> bool: for clause in clauses: if not clause: continue - # Clause matches when every term in it matches (implicit AND) - if all(_term_matches(term, text) for term in clause if isinstance(term, tuple)): + # Clause matches when every term in it matches (implicit AND). + # Collect tuple terms first to avoid all([]) == True on an empty sequence. + terms = [t for t in clause if isinstance(t, tuple)] + if terms and all(_term_matches(term, text) for term in terms): return True return False From 5d7014f254cf847a86c3c35105d03d5389b725c0 Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Thu, 19 Feb 2026 15:37:36 -0500 Subject: [PATCH 4/8] Fix exclusion filtering and multi-workspace project assignment. Apply exclusion rules across full searchable chat content for search/export and improve workspace resolution for multi-directory and indirection-based Cursor workspaces to prevent fallback project hashes and misbucketed chats. Co-authored-by: Cursor --- api/search.py | 153 ++++++++-- api/workspaces.py | 308 ++++++++++++++++++-- scripts/export.py | 102 +++++-- tests/test_export_exclusion_filtering.py | 170 +++++++++++ tests/test_invalid_workspace_aliases.py | 50 ++++ tests/test_search_exclusion_filtering.py | 235 +++++++++++++++ tests/test_workspace_assignment_fallback.py | 42 +++ tests/test_workspace_display_name.py | 38 +++ tests/test_workspace_name_inference.py | 86 ++++++ utils/exclusion_rules.py | 11 +- utils/path_helpers.py | 54 +++- 11 files changed, 1171 insertions(+), 78 deletions(-) create mode 100644 tests/test_export_exclusion_filtering.py create mode 100644 tests/test_invalid_workspace_aliases.py create mode 100644 tests/test_search_exclusion_filtering.py create mode 100644 tests/test_workspace_assignment_fallback.py create mode 100644 tests/test_workspace_display_name.py create mode 100644 tests/test_workspace_name_inference.py diff --git a/api/search.py b/api/search.py index 45c21b0..a4cfa6c 100644 --- a/api/search.py +++ b/api/search.py @@ -8,9 +8,11 @@ import re import sqlite3 from datetime import datetime +from urllib.parse import unquote as _url_unquote -from flask import Blueprint, jsonify, request +from flask import Blueprint, current_app, jsonify, request +from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules from utils.workspace_path import resolve_workspace_path from utils.path_helpers import normalize_file_path, get_workspace_folder_paths, to_epoch_ms from utils.text_extract import extract_text_from_bubble @@ -18,11 +20,54 @@ bp = Blueprint("search", __name__) +def _json_dump_safe(value) -> str: + """Best-effort JSON string conversion for exclusion matching.""" + try: + return json.dumps(value, ensure_ascii=False, sort_keys=True) + except Exception: + return str(value) if value is not None else "" + + +def _workspace_display_name_from_folder(folder: str | None, fallback: str | None = None) -> str: + """Extract a human-readable workspace name from workspace folder path.""" + if folder: + raw = str(folder).strip() + cleaned = re.sub(r"^file://", "", raw).replace("\\", "/") + parts = cleaned.split("/") + leaf = parts[-1] if parts else "" + if leaf: + return _url_unquote(leaf) + return fallback or "Other chats" + + +def _build_exclusion_searchable( + *, + project_name: str | None, + chat_title: str | None, + model_names: list[str] | None = None, + content_parts: list[str] | None = None, + metadata_parts: list[str] | None = None, +) -> str: + """Build broad searchable text so exclusion rules cover visible output.""" + combined = [] + if content_parts: + combined.extend(p for p in content_parts if p) + if metadata_parts: + combined.extend(p for p in metadata_parts if p) + return build_searchable_text( + project_name=project_name, + chat_title=chat_title, + model_names=model_names, + chat_content_snippet="\n\n".join(combined) if combined else None, + ) + + @bp.route("/api/search") def search(): try: query = request.args.get("q", "").strip() search_type = request.args.get("type", "all") + rules = current_app.config.get("EXCLUSION_RULES") or [] if not query: return jsonify({"error": "No search query provided"}), 400 @@ -58,7 +103,7 @@ def search(): parts = first_folder.replace("\\", "/").split("/") fn = parts[-1] if parts else None if fn: - ws_id_to_name[name] = fn + ws_id_to_name[name] = _url_unquote(fn) except Exception: pass except Exception: @@ -114,11 +159,49 @@ def search(): if not headers: continue + title = cd.get("name") or "" + ws_id = composer_id_to_ws.get(composer_id, "global") + ws_name = ws_id_to_name.get(ws_id) + project_name = ws_name or ("Other chats" if ws_id == "global" else ws_id) + + model_config = cd.get("modelConfig") or {} + model_name = model_config.get("modelName") + model_names = [model_name] if model_name and model_name != "default" else None + + bubble_texts = [] + bubble_meta = [] + for header in headers: + bid = header.get("bubbleId") + bubble_entry = bubble_map.get(bid) + if not bubble_entry: + continue + text = bubble_entry.get("text") or "" + if text: + bubble_texts.append(text) + raw_bubble = bubble_entry.get("raw") + if raw_bubble: + bubble_meta.append(_json_dump_safe(raw_bubble)) + + exclusion_text = _build_exclusion_searchable( + project_name=project_name, + chat_title=title, + model_names=model_names, + content_parts=bubble_texts, + metadata_parts=[ + _json_dump_safe(model_config), + _json_dump_safe(cd.get("conversationSummary")), + _json_dump_safe(cd.get("usage")), + _json_dump_safe(cd.get("requestMetadata")), + _json_dump_safe(cd), + "\n".join(bubble_meta), + ], + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + # Check if any bubble text matches has_match = False matching_text = "" - title = cd.get("name") or "" - # Check title if title and query_lower in title.lower(): has_match = True @@ -126,29 +209,22 @@ def search(): # Check bubble texts if not has_match: - for header in headers: - bid = header.get("bubbleId") - bubble_entry = bubble_map.get(bid) - if bubble_entry: - text = bubble_entry["text"] - if text and query_lower in text.lower(): - has_match = True - # Extract a snippet around the match - idx = text.lower().find(query_lower) - start = max(0, idx - 80) - end = min(len(text), idx + len(query) + 120) - matching_text = ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "") - break + for text in bubble_texts: + if text and query_lower in text.lower(): + has_match = True + # Extract a snippet around the match + idx = text.lower().find(query_lower) + start = max(0, idx - 80) + end = min(len(text), idx + len(query) + 120) + matching_text = ("..." if start > 0 else "") + text[start:end] + ("..." if end < len(text) else "") + break if has_match: - ws_id = composer_id_to_ws.get(composer_id, "global") - ws_name = ws_id_to_name.get(ws_id) if not title: # Derive title from first bubble - for header in headers: - be = bubble_map.get(header.get("bubbleId")) - if be and be["text"]: - first_lines = [l for l in be["text"].split("\n") if l.strip()] + for text in bubble_texts: + if text: + first_lines = [l for l in text.split("\n") if l.strip()] if first_lines: title = first_lines[0][:100] break @@ -191,6 +267,7 @@ def search(): workspace_folder = wd.get("folder") except Exception: pass + workspace_name = _workspace_display_name_from_folder(workspace_folder, fallback=name) try: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) @@ -203,10 +280,38 @@ def search(): if chat_row and chat_row[0]: data = json.loads(chat_row[0]) for tab in (data.get("tabs") or []): + ct = tab.get("chatTitle") or "" + tab_model_names = None + tab_meta = tab.get("metadata") + if isinstance(tab_meta, dict): + models_used = tab_meta.get("modelsUsed") + if isinstance(models_used, list): + tab_model_names = [str(m) for m in models_used if m] + elif tab_meta.get("model"): + tab_model_names = [str(tab_meta.get("model"))] + + tab_bubble_texts = [] + for bubble in (tab.get("bubbles") or []): + text = bubble.get("text") or "" + if text: + tab_bubble_texts.append(text) + + exclusion_text = _build_exclusion_searchable( + project_name=workspace_name, + chat_title=ct, + model_names=tab_model_names, + content_parts=tab_bubble_texts, + metadata_parts=[ + _json_dump_safe(tab), + _json_dump_safe(workspace_folder), + ], + ) + if is_excluded_by_rules(rules, exclusion_text): + continue + has_match = False matching_text = "" - ct = tab.get("chatTitle") or "" if ct.lower().find(query_lower) != -1: has_match = True matching_text = ct diff --git a/api/workspaces.py b/api/workspaces.py index 63c84d2..f5ac83d 100644 --- a/api/workspaces.py +++ b/api/workspaces.py @@ -13,16 +13,20 @@ import sqlite3 import sys from datetime import datetime, timezone +from urllib.parse import unquote, urlparse from flask import Blueprint, current_app, jsonify from utils.workspace_path import resolve_workspace_path -from utils.path_helpers import normalize_file_path, get_workspace_folder_paths, to_epoch_ms +from utils.path_helpers import ( + normalize_file_path, + get_workspace_folder_paths, + get_workspace_display_name, + to_epoch_ms, +) from utils.text_extract import extract_text_from_bubble, format_tool_action from utils.exclusion_rules import build_searchable_text, is_excluded_by_rules -from urllib.parse import unquote as _url_unquote - bp = Blueprint("workspaces", __name__) @@ -40,11 +44,9 @@ def _get_workspace_display_name(workspace_path: str, workspace_id: str) -> str: wj_path = os.path.join(workspace_path, workspace_id, "workspace.json") try: wd = _read_json_file(wj_path) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") - if first_folder: - fn = first_folder.replace("\\", "/").split("/")[-1] - if fn: - return _url_unquote(fn) + name = get_workspace_display_name(wd) + if name: + return name except Exception: pass return workspace_id @@ -55,8 +57,157 @@ def _get_workspace_display_name(workspace_path: str, workspace_id: str) -> str: # --------------------------------------------------------------------------- def _read_json_file(path: str): + return _resolve_workspace_descriptor(path) + + +def _uri_or_path_to_fs_path(value: str, base_dir: str | None = None) -> str: + """Convert a file URI or plain path to a filesystem path.""" + raw = (value or "").strip() + if not raw: + return "" + + if raw.startswith("file://"): + parsed = urlparse(raw) + path = unquote(parsed.path or "") + if sys.platform == "win32" and path.startswith("/") and len(path) > 2 and path[2] == ":": + path = path[1:] + return os.path.normpath(path) + + expanded = os.path.expanduser(raw) + if base_dir and not os.path.isabs(expanded): + expanded = os.path.join(base_dir, expanded) + return os.path.normpath(expanded) + + +def _resolve_workspace_descriptor(path: str, depth: int = 0): + """ + Read and normalize a workspace descriptor. + + Handles indirection via {"workspace": ""} and resolves relative + folder paths in multi-root workspace files against the file's directory. + """ with open(path, "r", encoding="utf-8") as f: - return json.load(f) + data = json.load(f) + + # Cursor workspaceStorage entry may point to an external workspace file. + if ( + isinstance(data, dict) + and data.get("workspace") + and not data.get("folder") + and not data.get("folders") + and depth < 3 + ): + target = _uri_or_path_to_fs_path(str(data.get("workspace", "")), base_dir=os.path.dirname(path)) + if target and os.path.isfile(target): + return _resolve_workspace_descriptor(target, depth + 1) + + if not isinstance(data, dict): + return data + + out = dict(data) + base_dir = os.path.dirname(path) + folders = out.get("folders") + if isinstance(folders, list): + normalized = [] + for folder in folders: + if isinstance(folder, dict): + fd = dict(folder) + p = fd.get("path") + if isinstance(p, str) and p: + if not p.startswith("file://") and not os.path.isabs(p): + fd["path"] = os.path.normpath(os.path.join(base_dir, p)) + normalized.append(fd) + else: + normalized.append(folder) + out["folders"] = normalized + return out + + +def _basename_from_pathish(path_value: str | None) -> str | None: + """Extract a readable leaf folder name from file URI or filesystem path.""" + if not path_value: + return None + cleaned = re.sub(r"^file://", "", str(path_value).strip()) + cleaned = unquote(cleaned).replace("\\", "/").rstrip("/") + if not cleaned: + return None + parts = [p for p in cleaned.split("/") if p] + if not parts: + return None + leaf = parts[-1] + return leaf or None + + +def _infer_workspace_name_from_context(workspace_path: str, workspace_id: str) -> str | None: + """ + Infer workspace display name from projectLayouts of chats in this workspace. + + Useful when workspace.json only references a deleted/opaque workspace file. + """ + if workspace_id == "global": + return "Other chats" + + # Composer IDs from per-workspace state db + local_db_path = os.path.join(workspace_path, workspace_id, "state.vscdb") + if not os.path.isfile(local_db_path): + return None + composer_ids: list[str] = [] + try: + lconn = sqlite3.connect(f"file:{local_db_path}?mode=ro", uri=True) + row = lconn.execute( + "SELECT value FROM ItemTable WHERE [key] = 'composer.composerData'" + ).fetchone() + if row and row[0]: + data = json.loads(row[0]) + for c in (data.get("allComposers") or []): + cid = c.get("composerId") if isinstance(c, dict) else None + if cid: + composer_ids.append(cid) + lconn.close() + except Exception: + return None + if not composer_ids: + return None + + # Gather folder-name hints from global messageRequestContext.projectLayouts + gconn, _ = _open_global_db(workspace_path) + if not gconn: + return None + counts: dict[str, int] = {} + try: + for cid in composer_ids: + rows = gconn.execute( + "SELECT value FROM cursorDiskKV WHERE key LIKE ?", + (f"messageRequestContext:{cid}:%",), + ).fetchall() + for row in rows: + try: + ctx = json.loads(row["value"]) + except Exception: + continue + layouts = ctx.get("projectLayouts") + if not isinstance(layouts, list): + continue + for layout in layouts: + obj = None + if isinstance(layout, str): + try: + obj = json.loads(layout) + except Exception: + obj = None + elif isinstance(layout, dict): + obj = layout + if not isinstance(obj, dict): + continue + hint = _basename_from_pathish(obj.get("rootPath")) + if hint: + counts[hint] = counts.get(hint, 0) + 1 + finally: + gconn.close() + + if not counts: + return None + return max(counts.items(), key=lambda kv: kv[1])[0] def _get_project_from_file_path( @@ -117,10 +268,13 @@ def _determine_project_for_conversation( workspace_entries: list, bubble_map: dict, composer_id_to_workspace_id: dict | None = None, + invalid_workspace_ids: set[str] | None = None, ) -> str | None: # Primary: definitive per-workspace mapping if composer_id_to_workspace_id and composer_id in composer_id_to_workspace_id: - return composer_id_to_workspace_id[composer_id] + mapped = composer_id_to_workspace_id[composer_id] + if not invalid_workspace_ids or mapped not in invalid_workspace_ids: + return mapped # Try projectLayouts project_layouts = project_layouts_map.get(composer_id, []) @@ -244,6 +398,70 @@ def _collect_workspace_entries(workspace_path: str) -> list[dict]: return entries +def _collect_invalid_workspace_ids(workspace_entries: list[dict]) -> set[str]: + """Workspace IDs whose descriptors have no resolvable folder paths.""" + invalid: set[str] = set() + for entry in workspace_entries: + try: + wd = _read_json_file(entry["workspaceJsonPath"]) + folders = get_workspace_folder_paths(wd) + if not folders: + invalid.add(entry["name"]) + except Exception: + invalid.add(entry["name"]) + return invalid + + +def _infer_invalid_workspace_aliases( + composer_rows: list, + project_layouts_map: dict, + project_name_map: dict, + workspace_path_map: dict, + workspace_entries: list, + bubble_map: dict, + composer_id_to_ws: dict, + invalid_workspace_ids: set[str], +) -> dict[str, str]: + """ + Infer replacement workspace IDs for invalid workspace entries. + + For each composer mapped to an invalid workspace ID, compute an evidence- + based assignment (without trusting composer_id_to_ws). Use majority voting + to map each invalid workspace ID to the most likely valid workspace ID. + """ + votes: dict[str, dict[str, int]] = {} + for row in composer_rows: + cid = row["key"].split(":")[1] + mapped = composer_id_to_ws.get(cid) + if mapped not in invalid_workspace_ids: + continue + try: + cd = json.loads(row["value"]) + except Exception: + continue + inferred = _determine_project_for_conversation( + cd, + cid, + project_layouts_map, + project_name_map, + workspace_path_map, + workspace_entries, + bubble_map, + composer_id_to_workspace_id=None, + invalid_workspace_ids=None, + ) + if inferred and inferred not in invalid_workspace_ids: + votes.setdefault(mapped, {}) + votes[mapped][inferred] = votes[mapped].get(inferred, 0) + 1 + + aliases: dict[str, str] = {} + for invalid_id, counts in votes.items(): + if not counts: + continue + aliases[invalid_id] = max(counts.items(), key=lambda kv: kv[1])[0] + return aliases + + def _build_composer_id_to_workspace_id(workspace_path: str, workspace_entries: list) -> dict: """Build mapping: composerId -> workspaceId from per-workspace state.vscdb.""" mapping = {} @@ -290,6 +508,7 @@ def list_workspaces(): try: workspace_path = resolve_workspace_path() workspace_entries = _collect_workspace_entries(workspace_path) + invalid_workspace_ids = _collect_invalid_workspace_ids(workspace_entries) project_name_map = _create_project_name_to_workspace_id_map(workspace_entries) workspace_path_map = _create_workspace_path_to_id_map(workspace_entries) @@ -349,6 +568,16 @@ def list_workspaces(): pass # Process each composer + invalid_workspace_aliases = _infer_invalid_workspace_aliases( + composer_rows=composer_rows, + project_layouts_map=project_layouts_map, + project_name_map=project_name_map, + workspace_path_map=workspace_path_map, + workspace_entries=workspace_entries, + bubble_map=bubble_map, + composer_id_to_ws=composer_id_to_ws, + invalid_workspace_ids=invalid_workspace_ids, + ) for row in composer_rows: cid = row["key"].split(":")[1] try: @@ -356,8 +585,11 @@ def list_workspaces(): pid = _determine_project_for_conversation( cd, cid, project_layouts_map, project_name_map, workspace_path_map, - workspace_entries, bubble_map, composer_id_to_ws + workspace_entries, bubble_map, composer_id_to_ws, invalid_workspace_ids ) + mapped_ws = composer_id_to_ws.get(cid) + if not pid and mapped_ws in invalid_workspace_ids: + pid = invalid_workspace_aliases.get(mapped_ws) assigned = pid if pid else "global" headers = cd.get("fullConversationHeadersOnly") or [] @@ -391,7 +623,8 @@ def list_workspaces(): norm_folder = "" try: wd = _read_json_file(entry["workspaceJsonPath"]) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") + folders = get_workspace_folder_paths(wd) + first_folder = folders[0] if folders else None if first_folder: norm_folder = normalize_file_path(first_folder) except Exception: @@ -426,7 +659,8 @@ def list_workspaces(): workspace_name = _get_workspace_display_name(workspace_path, primary["name"]) if workspace_name == primary["name"]: - workspace_name = f"Project {primary['name'][:8]}" + inferred = _infer_workspace_name_from_context(workspace_path, primary["name"]) + workspace_name = inferred or f"Project {primary['name'][:8]}" # Skip entire workspace before iterating conversations if is_excluded_by_rules(rules, workspace_name): @@ -443,6 +677,10 @@ def list_workspaces(): if not is_excluded_by_rules(rules, searchable): convos.append(c) + # Hide workspace shells that currently have no visible conversations. + if not convos: + continue + projects.append({ "id": primary["name"], "name": workspace_name, @@ -509,17 +747,20 @@ def get_workspace(workspace_id): folder = None workspace_name = workspace_id try: - from urllib.parse import unquote wd = _read_json_file(wj_path) - folder = wd.get("folder") - first_folder = folder or (wd.get("folders", [{}])[0] or {}).get("path") - if first_folder: - parts = first_folder.replace("\\", "/").split("/") - fn = parts[-1] if parts else None - if fn: - workspace_name = unquote(fn) + folder_paths = get_workspace_folder_paths(wd) + folder = folder_paths[0] if folder_paths else wd.get("folder") + derived_name = get_workspace_display_name(wd) + if derived_name: + workspace_name = derived_name + elif workspace_name == workspace_id: + inferred = _infer_workspace_name_from_context(workspace_path, workspace_id) + if inferred: + workspace_name = inferred except Exception: - pass + inferred = _infer_workspace_name_from_context(workspace_path, workspace_id) + if inferred: + workspace_name = inferred return jsonify({ "id": workspace_id, @@ -561,6 +802,7 @@ def get_workspace_tabs(workspace_id): response = {"tabs": []} workspace_entries = _collect_workspace_entries(workspace_path) + invalid_workspace_ids = _collect_invalid_workspace_ids(workspace_entries) project_name_map = _create_project_name_to_workspace_id_map(workspace_entries) workspace_path_map = _create_workspace_path_to_id_map(workspace_entries) composer_id_to_ws = _build_composer_id_to_workspace_id(workspace_path, workspace_entries) @@ -573,7 +815,8 @@ def get_workspace_tabs(workspace_id): wj_path = os.path.join(workspace_path, workspace_id, "workspace.json") try: wd = _read_json_file(wj_path) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") + folders = get_workspace_folder_paths(wd) + first_folder = folders[0] if folders else None if first_folder: target_folder = normalize_file_path(first_folder) except Exception: @@ -582,7 +825,8 @@ def get_workspace_tabs(workspace_id): for entry in workspace_entries: try: wd2 = _read_json_file(entry["workspaceJsonPath"]) - f2 = wd2.get("folder") or (wd2.get("folders", [{}])[0] or {}).get("path") + folders2 = get_workspace_folder_paths(wd2) + f2 = folders2[0] if folders2 else None if f2 and normalize_file_path(f2) == target_folder: matching_ws_ids.add(entry["name"]) except Exception: @@ -671,6 +915,17 @@ def get_workspace_tabs(workspace_id): " AND value NOT LIKE '%fullConversationHeadersOnly\":[]%'" ).fetchall() + invalid_workspace_aliases = _infer_invalid_workspace_aliases( + composer_rows=composer_rows, + project_layouts_map=project_layouts_map, + project_name_map=project_name_map, + workspace_path_map=workspace_path_map, + workspace_entries=workspace_entries, + bubble_map=bubble_map, + composer_id_to_ws=composer_id_to_ws, + invalid_workspace_ids=invalid_workspace_ids, + ) + for row in composer_rows: composer_id = row["key"].split(":")[1] try: @@ -680,8 +935,11 @@ def get_workspace_tabs(workspace_id): pid = _determine_project_for_conversation( cd, composer_id, project_layouts_map, project_name_map, workspace_path_map, - workspace_entries, bubble_map, composer_id_to_ws + workspace_entries, bubble_map, composer_id_to_ws, invalid_workspace_ids ) + mapped_ws = composer_id_to_ws.get(composer_id) + if not pid and mapped_ws in invalid_workspace_ids: + pid = invalid_workspace_aliases.get(mapped_ws) assigned = pid if pid else "global" if assigned not in matching_ws_ids: diff --git a/scripts/export.py b/scripts/export.py index d612ddc..02fb842 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -29,6 +29,45 @@ ) +def _json_dump_safe(value) -> str: + """Best-effort JSON serialization for exclusion matching.""" + try: + return json.dumps(value, ensure_ascii=False, sort_keys=True) + except Exception: + return str(value) if value is not None else "" + + +def _load_manifest_entries(manifest_path: str) -> dict: + """Load manifest entries keyed by log_id from a JSONL file.""" + existing = {} + if not os.path.isfile(manifest_path): + return existing + try: + with open(manifest_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + log_id = entry.get("log_id") + if log_id: + existing[log_id] = entry + except Exception: + pass + except Exception: + pass + return existing + + +def _write_manifest_entries(manifest_path: str, entries_by_id: dict): + """Write manifest entries to JSONL.""" + os.makedirs(os.path.dirname(manifest_path), exist_ok=True) + with open(manifest_path, "w", encoding="utf-8") as f: + for entry in entries_by_id.values(): + f.write(json.dumps(entry) + "\n") + + def get_default_workspace_path() -> str: home = str(Path.home()) release = "" @@ -449,10 +488,38 @@ def assign_workspace(cd, cid): model_config = cd.get("modelConfig") or {} model_name = model_config.get("modelName") model_names = [model_name] if model_name and model_name != "default" else None + + # Build broad text for exclusion checks so any visible output term can match. + # Includes user/assistant bubble text plus raw metadata that can surface in exports. + bubble_texts = [] + bubble_meta_parts = [] + for h in headers: + b = bubble_map.get(h.get("bubbleId")) + if not b: + continue + text = extract_text_from_bubble(b) + if text: + bubble_texts.append(text) + bubble_meta_parts.append(_json_dump_safe(b)) + + code_diff_parts = [_json_dump_safe(d) for d in code_block_diff_map.get(composer_id, [])] searchable = build_searchable_text( project_name=ws_display_name, chat_title=title, model_names=model_names, + chat_content_snippet="\n\n".join( + p + for p in ( + bubble_texts + + bubble_meta_parts + + code_diff_parts + + [ + _json_dump_safe(model_config), + _json_dump_safe(cd), + ] + ) + if p + ), ) if is_excluded_by_rules(exclusion_rules, searchable): continue @@ -600,23 +667,9 @@ def assign_workspace(cd, cid): with open(e["out_path"], "w", encoding="utf-8") as f: f.write(e["content"]) - # Manifest + # Manifest in output directory manifest_path = os.path.join(out_dir, "manifest.jsonl") - existing = {} - if os.path.isfile(manifest_path): - try: - with open(manifest_path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line: - try: - entry = json.loads(line) - if entry.get("log_id"): - existing[entry["log_id"]] = entry - except Exception: - pass - except Exception: - pass + existing = _load_manifest_entries(manifest_path) for e in exported: existing[e["id"]] = { @@ -624,10 +677,21 @@ def assign_workspace(cd, cid): "path": os.path.relpath(e["out_path"], out_dir), "updated_at": datetime.fromtimestamp(e["updatedAt"] / 1000).isoformat() if e["updatedAt"] else datetime.now().isoformat(), } + if existing: - with open(manifest_path, "w", encoding="utf-8") as f: - for entry in existing.values(): - f.write(json.dumps(entry) + "\n") + _write_manifest_entries(manifest_path, existing) + + # Canonical manifest in user state dir so tracking survives changing --out paths + global_manifest_path = os.path.join(state_dir, "manifest.jsonl") + global_existing = _load_manifest_entries(global_manifest_path) + for e in exported: + global_existing[e["id"]] = { + "log_id": e["id"], + "path": e["out_path"], + "updated_at": datetime.fromtimestamp(e["updatedAt"] / 1000).isoformat() if e["updatedAt"] else datetime.now().isoformat(), + } + if global_existing: + _write_manifest_entries(global_manifest_path, global_existing) print(f"Exported {count} chat(s) to {out_dir}") # Save state diff --git a/tests/test_export_exclusion_filtering.py b/tests/test_export_exclusion_filtering.py new file mode 100644 index 0000000..57950bc --- /dev/null +++ b/tests/test_export_exclusion_filtering.py @@ -0,0 +1,170 @@ +""" +Integration tests for CLI export exclusion filtering. + +Run: + python -m unittest tests.test_export_exclusion_filtering -v +""" + +import json +import os +import sqlite3 +import subprocess +import sys +import tempfile +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parent.parent +EXPORT_SCRIPT = REPO_ROOT / "scripts" / "export.py" + + +class TestExportExclusionFiltering(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.base = Path(self.tmp.name) + self.fake_home = self.base / "home" + self.fake_home.mkdir(parents=True, exist_ok=True) + self.workspace_path = self.base / "workspaceStorage" + self.global_storage_path = self.base / "globalStorage" + self.workspace_path.mkdir(parents=True, exist_ok=True) + self.global_storage_path.mkdir(parents=True, exist_ok=True) + self.global_db_path = self.global_storage_path / "state.vscdb" + self._create_global_db() + + def tearDown(self): + self.tmp.cleanup() + + def _create_global_db(self): + conn = sqlite3.connect(self.global_db_path) + conn.execute("CREATE TABLE cursorDiskKV ([key] TEXT PRIMARY KEY, value TEXT)") + conn.commit() + conn.close() + + def _insert_bubble(self, composer_id: str, bubble_id: str, bubble_obj: dict): + conn = sqlite3.connect(self.global_db_path) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + (f"bubbleId:{composer_id}:{bubble_id}", json.dumps(bubble_obj)), + ) + conn.commit() + conn.close() + + def _insert_composer(self, composer_id: str, title: str, model_name: str, bubble_ids: list[str]): + payload = { + "name": title, + "modelConfig": {"modelName": model_name}, + "fullConversationHeadersOnly": [{"bubbleId": bid, "type": 1} for bid in bubble_ids], + "lastUpdatedAt": 1739300000000, + "createdAt": 1739200000000, + } + conn = sqlite3.connect(self.global_db_path) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + (f"composerData:{composer_id}", json.dumps(payload)), + ) + conn.commit() + conn.close() + + def _run_export(self, rules_text: str): + rules_file = self.base / "exclusion-rules.txt" + rules_file.write_text(rules_text, encoding="utf-8") + out_dir = self.base / "out" + env = dict(os.environ) + env["WORKSPACE_PATH"] = str(self.workspace_path) + env["HOME"] = str(self.fake_home) + env["USERPROFILE"] = str(self.fake_home) + + proc = subprocess.run( + [ + sys.executable, + str(EXPORT_SCRIPT), + "--since", + "all", + "--no-zip", + "--out", + str(out_dir), + "--exclude-rules", + str(rules_file), + ], + cwd=str(REPO_ROOT), + env=env, + capture_output=True, + text=True, + ) + self.assertEqual(proc.returncode, 0, msg=f"stdout:\n{proc.stdout}\nstderr:\n{proc.stderr}") + return out_dir + + def _collect_exported_markdown(self, out_dir: Path): + return sorted(out_dir.rglob("*.md")) + + def test_filters_by_chat_content_case_insensitive_substring(self): + # "kwd" rule must match and exclude content containing "kwds". + self._insert_bubble("cmp-kwd", "b-kwd-1", {"type": "user", "text": "Please summarize all kwds for Q1."}) + self._insert_bubble("cmp-safe", "b-safe-1", {"type": "user", "text": "Create a project roadmap for Q3."}) + self._insert_composer("cmp-kwd", "Finance thread", "gpt-4.1", ["b-kwd-1"]) + self._insert_composer("cmp-safe", "Roadmap notes", "gpt-4.1-mini", ["b-safe-1"]) + + out_dir = self._run_export("kwd\n") + md_files = self._collect_exported_markdown(out_dir) + + self.assertEqual(len(md_files), 1) + content = md_files[0].read_text(encoding="utf-8").lower() + self.assertIn("roadmap", content) + self.assertNotIn("kwd", content) + self.assertNotIn("kwds", content) + + def test_filters_by_metadata_model_name(self): + # Rule matches model metadata even when message text doesn't include the term. + self._insert_bubble("cmp-meta", "b-meta-1", {"type": "user", "text": "Debug API timeout behavior."}) + self._insert_bubble("cmp-safe", "b-safe-2", {"type": "assistant", "text": "Roadmap items are now listed."}) + self._insert_composer("cmp-meta", "API notes", "claude-3.5-sonnet", ["b-meta-1"]) + self._insert_composer("cmp-safe", "Roadmap", "gpt-4.1-mini", ["b-safe-2"]) + + out_dir = self._run_export("claude-3.5-sonnet\n") + md_files = self._collect_exported_markdown(out_dir) + + self.assertEqual(len(md_files), 1) + content = md_files[0].read_text(encoding="utf-8").lower() + self.assertIn("roadmap", content) + self.assertNotIn("claude-3.5-sonnet", content) + + def test_filters_when_term_appears_after_long_prefix(self): + # Regression: exclusion matching must scan beyond first 50k chars. + very_long_text = ("a" * 60000) + " kwds appear near the tail" + self._insert_bubble("cmp-long", "b-long-1", {"type": "assistant", "text": very_long_text}) + self._insert_bubble("cmp-safe", "b-safe-3", {"type": "assistant", "text": "General roadmap update."}) + self._insert_composer("cmp-long", "Long transcript", "gpt-4.1", ["b-long-1"]) + self._insert_composer("cmp-safe", "Roadmap", "gpt-4.1-mini", ["b-safe-3"]) + + out_dir = self._run_export("kwd\n") + md_files = self._collect_exported_markdown(out_dir) + + self.assertEqual(len(md_files), 1) + content = md_files[0].read_text(encoding="utf-8").lower() + self.assertIn("roadmap", content) + self.assertNotIn("kwd", content) + + def test_writes_manifest_to_global_state_dir(self): + self._insert_bubble("cmp-safe", "b-safe-4", {"type": "assistant", "text": "General roadmap update."}) + self._insert_composer("cmp-safe", "Roadmap", "gpt-4.1-mini", ["b-safe-4"]) + + out_dir = self._run_export("kwd\n") + local_manifest = out_dir / "manifest.jsonl" + global_manifest = self.fake_home / ".cursor-chat-browser" / "manifest.jsonl" + export_state = self.fake_home / ".cursor-chat-browser" / "export_state.json" + + self.assertTrue(local_manifest.is_file()) + self.assertTrue(global_manifest.is_file()) + self.assertTrue(export_state.is_file()) + + global_lines = [l for l in global_manifest.read_text(encoding="utf-8").splitlines() if l.strip()] + self.assertTrue(global_lines) + row = json.loads(global_lines[0]) + self.assertIn("log_id", row) + self.assertIn("path", row) + self.assertTrue(Path(row["path"]).is_absolute()) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_invalid_workspace_aliases.py b/tests/test_invalid_workspace_aliases.py new file mode 100644 index 0000000..eb0b294 --- /dev/null +++ b/tests/test_invalid_workspace_aliases.py @@ -0,0 +1,50 @@ +""" +Tests for invalid-workspace alias inference. +""" + +import json +import unittest + +from api.workspaces import _infer_invalid_workspace_aliases + + +class TestInvalidWorkspaceAliases(unittest.TestCase): + def test_majority_vote_alias_selection(self): + composer_rows = [ + {"key": "composerData:cid-1", "value": json.dumps({"fullConversationHeadersOnly": []})}, + {"key": "composerData:cid-2", "value": json.dumps({"fullConversationHeadersOnly": []})}, + {"key": "composerData:cid-3", "value": json.dumps({"fullConversationHeadersOnly": []})}, + ] + composer_id_to_ws = { + "cid-1": "invalid-ws", + "cid-2": "invalid-ws", + "cid-3": "invalid-ws", + } + + # Drive inference through project_layouts_map -> workspace_path_map + project_layouts_map = { + "cid-1": [r"d:\_Cpp_Digest\boostbacklog"], + "cid-2": [r"d:\_Cpp_Digest\boostbacklog"], + "cid-3": [r"d:\_Cpp_Digest\team-brain"], + } + workspace_path_map = { + r"d:\_cpp_digest\boostbacklog": "boost-ws", + r"d:\_cpp_digest\team-brain": "team-ws", + } + + aliases = _infer_invalid_workspace_aliases( + composer_rows=composer_rows, + project_layouts_map=project_layouts_map, + project_name_map={}, + workspace_path_map=workspace_path_map, + workspace_entries=[], + bubble_map={}, + composer_id_to_ws=composer_id_to_ws, + invalid_workspace_ids={"invalid-ws"}, + ) + + self.assertEqual(aliases.get("invalid-ws"), "boost-ws") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_search_exclusion_filtering.py b/tests/test_search_exclusion_filtering.py new file mode 100644 index 0000000..8aa3773 --- /dev/null +++ b/tests/test_search_exclusion_filtering.py @@ -0,0 +1,235 @@ +""" +Integration tests for exclusion filtering in /api/search output. + +Run: + python -m unittest tests.test_search_exclusion_filtering -v +""" + +import json +import os +import sqlite3 +import tempfile +import unittest +from pathlib import Path + +from flask import Flask + +# Ensure project root is importable when running directly. +_root = Path(__file__).resolve().parent.parent +if str(_root) not in os.sys.path: + os.sys.path.insert(0, str(_root)) + +from api.search import bp as search_bp +from utils.exclusion_rules import load_rules + + +class TestSearchExclusionFiltering(unittest.TestCase): + def setUp(self): + self._tmp = tempfile.TemporaryDirectory() + self.base_dir = self._tmp.name + self.workspace_path = os.path.join(self.base_dir, "workspaceStorage") + self.global_storage_path = os.path.join(self.base_dir, "globalStorage") + os.makedirs(self.workspace_path, exist_ok=True) + os.makedirs(self.global_storage_path, exist_ok=True) + + self.ws_kwd_id = "workspace-kwd" + self.ws_kwd_dir = os.path.join(self.workspace_path, self.ws_kwd_id) + os.makedirs(self.ws_kwd_dir, exist_ok=True) + with open(os.path.join(self.ws_kwd_dir, "workspace.json"), "w", encoding="utf-8") as f: + json.dump({"folder": "file:///d%3A/_hjb_cpp/gigs/options/kwds"}, f) + + self.ws_public_id = "workspace-public" + self.ws_public_dir = os.path.join(self.workspace_path, self.ws_public_id) + os.makedirs(self.ws_public_dir, exist_ok=True) + with open(os.path.join(self.ws_public_dir, "workspace.json"), "w", encoding="utf-8") as f: + json.dump({"folder": "file:///d%3A/_hjb_cpp/gigs/options/public-project"}, f) + + self._build_workspace_dbs() + self._build_global_db() + + self._old_workspace_path = os.environ.get("WORKSPACE_PATH") + os.environ["WORKSPACE_PATH"] = self.workspace_path + + app = Flask(__name__) + app.config["TESTING"] = True + app.config["EXCLUSION_RULES"] = [] + app.register_blueprint(search_bp) + self.client = app.test_client() + self.app = app + + def tearDown(self): + if self._old_workspace_path is None: + os.environ.pop("WORKSPACE_PATH", None) + else: + os.environ["WORKSPACE_PATH"] = self._old_workspace_path + self._tmp.cleanup() + + def _build_workspace_dbs(self): + db_path = os.path.join(self.ws_kwd_dir, "state.vscdb") + conn = sqlite3.connect(db_path) + conn.execute("CREATE TABLE ItemTable ([key] TEXT PRIMARY KEY, value TEXT)") + + # Used by /api/search to map composer IDs to workspace IDs. + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "composer.composerData", + json.dumps( + { + "allComposers": [ + {"composerId": "cmp-kwd"}, + ] + } + ), + ), + ) + + # Legacy chat storage (fallback path in /api/search). + legacy_chat = { + "tabs": [ + { + "tabId": "tab-kwd", + "chatTitle": "kwd Archive Thread", + "lastSendTime": "2026-02-11T15:00:00Z", + "metadata": {"model": "gpt-4.1"}, + "bubbles": [ + {"type": "user", "text": "Where is kwd 2026-001?"}, + {"type": "assistant", "text": "kwd metadata is attached."}, + ], + } + ] + } + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ("workbench.panel.aichat.view.aichat.chatdata", json.dumps(legacy_chat)), + ) + + conn.commit() + conn.close() + + db_path_public = os.path.join(self.ws_public_dir, "state.vscdb") + conn_public = sqlite3.connect(db_path_public) + conn_public.execute("CREATE TABLE ItemTable ([key] TEXT PRIMARY KEY, value TEXT)") + conn_public.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "composer.composerData", + json.dumps({"allComposers": [{"composerId": "cmp-roadmap"}]}), + ), + ) + conn_public.commit() + conn_public.close() + + def _build_global_db(self): + db_path = os.path.join(self.global_storage_path, "state.vscdb") + conn = sqlite3.connect(db_path) + conn.execute("CREATE TABLE cursorDiskKV ([key] TEXT PRIMARY KEY, value TEXT)") + + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "bubbleId:cmp-kwd:b-kwd-1", + json.dumps({"type": "user", "text": "Please extract kwd PDF metadata."}), + ), + ) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "bubbleId:cmp-kwd:b-kwd-2", + json.dumps({"type": "assistant", "text": "kwd details parsed successfully."}), + ), + ) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "bubbleId:cmp-roadmap:b-roadmap-1", + json.dumps({"type": "user", "text": "Create a roadmap for Q3 delivery."}), + ), + ) + + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "composerData:cmp-kwd", + json.dumps( + { + "name": "kwd PDF metadata extraction", + "modelConfig": {"modelName": "gpt-4.1"}, + "fullConversationHeadersOnly": [ + {"bubbleId": "b-kwd-1"}, + {"bubbleId": "b-kwd-2"}, + ], + "lastUpdatedAt": 1739270000000, + } + ), + ), + ) + conn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "composerData:cmp-roadmap", + json.dumps( + { + "name": "Roadmap planning notes", + "modelConfig": {"modelName": "claude-3.5-sonnet"}, + "fullConversationHeadersOnly": [ + {"bubbleId": "b-roadmap-1"}, + ], + "lastUpdatedAt": 1739271000000, + } + ), + ), + ) + + conn.commit() + conn.close() + + def _set_rules(self, rules_text: str): + with tempfile.NamedTemporaryFile(mode="w", suffix=".txt", delete=False, encoding="utf-8") as f: + f.write(rules_text) + path = f.name + try: + self.app.config["EXCLUSION_RULES"] = load_rules(path) + finally: + os.unlink(path) + + def _search(self, query: str, search_type: str = "all"): + resp = self.client.get(f"/api/search?q={query}&type={search_type}") + self.assertEqual(resp.status_code, 200) + payload = resp.get_json() + self.assertIsInstance(payload, dict) + self.assertIn("results", payload) + return payload["results"] + + def test_exact_exclusion_keywords_hide_matches_case_insensitive(self): + self._set_rules("kwd\n") + + results_lower = self._search("kwd", "all") + results_upper = self._search("kwd", "all") + + self.assertEqual(results_lower, []) + self.assertEqual(results_upper, []) + + def test_non_excluded_query_still_returns_visible_results(self): + self._set_rules("kwd\n") + + results = self._search("roadmap", "all") + + self.assertTrue(results) + self.assertTrue(any((r.get("chatTitle") or "").lower().find("roadmap") != -1 for r in results)) + self.assertTrue(all((r.get("chatTitle") or "").lower().find("kwd") == -1 for r in results)) + + def test_filtering_uses_workspace_title_and_metadata(self): + # Workspace folder resolves to ".../kwds" which must exclude kwd-workspace chat output. + self._set_rules("kwds\n") + results_by_workspace = self._search("archive", "all") + self.assertEqual(results_by_workspace, []) + + # Metadata match (model name) must also exclude the matching composer entry. + self._set_rules("gpt-4.1\n") + results_by_metadata = self._search("extraction", "all") + self.assertEqual(results_by_metadata, []) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_workspace_assignment_fallback.py b/tests/test_workspace_assignment_fallback.py new file mode 100644 index 0000000..e287171 --- /dev/null +++ b/tests/test_workspace_assignment_fallback.py @@ -0,0 +1,42 @@ +""" +Tests for conversation-to-workspace assignment fallback behavior. +""" + +import unittest + +from api.workspaces import _determine_project_for_conversation + + +class TestWorkspaceAssignmentFallback(unittest.TestCase): + def test_ignores_invalid_composer_to_workspace_mapping(self): + composer_data = { + "fullConversationHeadersOnly": [], + "newlyCreatedFiles": [], + "codeBlockData": {}, + } + composer_id = "cmp-123" + project_layouts_map = {"cmp-123": ["/d%3A/_Cpp_Digest/boostbacklog"]} + project_name_to_workspace_id = {"boostbacklog": "good-ws"} + workspace_path_to_id = {"d:\\_cpp_digest\\boostbacklog": "good-ws"} + workspace_entries = [] + bubble_map = {} + composer_id_to_workspace_id = {"cmp-123": "broken-ws"} + invalid_workspace_ids = {"broken-ws"} + + assigned = _determine_project_for_conversation( + composer_data=composer_data, + composer_id=composer_id, + project_layouts_map=project_layouts_map, + project_name_to_workspace_id=project_name_to_workspace_id, + workspace_path_to_id=workspace_path_to_id, + workspace_entries=workspace_entries, + bubble_map=bubble_map, + composer_id_to_workspace_id=composer_id_to_workspace_id, + invalid_workspace_ids=invalid_workspace_ids, + ) + + self.assertEqual(assigned, "good-ws") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_workspace_display_name.py b/tests/test_workspace_display_name.py new file mode 100644 index 0000000..894e537 --- /dev/null +++ b/tests/test_workspace_display_name.py @@ -0,0 +1,38 @@ +""" +Tests for workspace folder parsing and display-name extraction. +""" + +import unittest + +from utils.path_helpers import get_workspace_display_name, get_workspace_folder_paths + + +class TestWorkspaceFolderParsing(unittest.TestCase): + def test_get_workspace_folder_paths_handles_multi_root_uri_shape(self): + wd = { + "folders": [ + {"uri": {"scheme": "file", "path": "/d%3A/_Cpp_Digest/cppdigest-github-app"}}, + {"uri": {"scheme": "file", "path": "/d%3A/_Cpp_Digest/boostbacklog"}}, + ] + } + paths = get_workspace_folder_paths(wd) + self.assertEqual(len(paths), 2) + self.assertIn("/d%3A/_Cpp_Digest/cppdigest-github-app", paths) + self.assertIn("/d%3A/_Cpp_Digest/boostbacklog", paths) + + def test_get_workspace_display_name_prefers_first_valid_folder(self): + wd = { + "folders": [ + {"uri": {"scheme": "file", "path": "/d%3A/_Cpp_Digest/cppdigest-github-app"}}, + {"uri": {"scheme": "file", "path": "/d%3A/_Cpp_Digest/boostbacklog"}}, + ] + } + self.assertEqual(get_workspace_display_name(wd, fallback="workspace-id"), "cppdigest-github-app") + + def test_get_workspace_display_name_fallback_when_no_paths(self): + wd = {"folders": [{"uri": {"scheme": "file"}}]} + self.assertEqual(get_workspace_display_name(wd, fallback="workspace-id"), "workspace-id") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_workspace_name_inference.py b/tests/test_workspace_name_inference.py new file mode 100644 index 0000000..eba1a48 --- /dev/null +++ b/tests/test_workspace_name_inference.py @@ -0,0 +1,86 @@ +""" +Tests for fallback workspace-name inference from messageRequestContext. +""" + +import json +import os +import sqlite3 +import tempfile +import unittest + +from api.workspaces import _infer_workspace_name_from_context + + +class TestWorkspaceNameInference(unittest.TestCase): + def test_infers_name_from_project_layouts(self): + with tempfile.TemporaryDirectory() as tmp: + workspace_path = os.path.join(tmp, "workspaceStorage") + global_storage = os.path.join(tmp, "globalStorage") + ws_id = "deadbeef1234" + ws_dir = os.path.join(workspace_path, ws_id) + os.makedirs(ws_dir, exist_ok=True) + os.makedirs(global_storage, exist_ok=True) + + # Local workspace DB with composer IDs + local_db = os.path.join(ws_dir, "state.vscdb") + conn = sqlite3.connect(local_db) + conn.execute("CREATE TABLE ItemTable ([key] TEXT PRIMARY KEY, value TEXT)") + conn.execute( + "INSERT INTO ItemTable ([key], value) VALUES (?, ?)", + ( + "composer.composerData", + json.dumps( + { + "allComposers": [ + {"composerId": "cmp-1"}, + {"composerId": "cmp-2"}, + ] + } + ), + ), + ) + conn.commit() + conn.close() + + # Global DB with projectLayouts for those composers + global_db = os.path.join(global_storage, "state.vscdb") + gconn = sqlite3.connect(global_db) + gconn.execute("CREATE TABLE cursorDiskKV ([key] TEXT PRIMARY KEY, value TEXT)") + gconn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "messageRequestContext:cmp-1:ctx-a", + json.dumps( + { + "projectLayouts": [ + json.dumps({"rootPath": "file:///d%3A/_Cpp_Digest/boostbacklog"}), + ] + } + ), + ), + ) + gconn.execute( + "INSERT INTO cursorDiskKV ([key], value) VALUES (?, ?)", + ( + "messageRequestContext:cmp-2:ctx-b", + json.dumps( + { + "projectLayouts": [ + json.dumps({"rootPath": "file:///d%3A/_Cpp_Digest/boostbacklog"}), + json.dumps({"rootPath": "file:///d%3A/_Cpp_Digest/cppdigest-github-app"}), + ] + } + ), + ), + ) + gconn.commit() + gconn.close() + + self.assertEqual( + _infer_workspace_name_from_context(workspace_path, ws_id), + "boostbacklog", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py index 276cb3f..9d79a22 100644 --- a/utils/exclusion_rules.py +++ b/utils/exclusion_rules.py @@ -219,9 +219,11 @@ def build_searchable_text( """ Combine chat/project metadata into a single string for rule matching. - All non-empty, non-None parts are joined with newlines. A - *chat_content_snippet* longer than 50 000 characters is truncated since - keyword/phrase presence can be detected from the first portion alone. + All non-empty, non-None parts are joined with newlines. + + The full *chat_content_snippet* is preserved so exclusion matching can + catch terms anywhere in rendered output, including long transcripts and + tool outputs. """ parts = [] if project_name: @@ -231,6 +233,5 @@ def build_searchable_text( if model_names: parts.extend(model_names) if chat_content_snippet: - snippet = chat_content_snippet - parts.append(snippet[:50_000] if len(snippet) > 50_000 else snippet) + parts.append(chat_content_snippet) return "\n".join(p for p in parts if p) diff --git a/utils/path_helpers.py b/utils/path_helpers.py index 2cd8dd4..c0d40f1 100644 --- a/utils/path_helpers.py +++ b/utils/path_helpers.py @@ -3,6 +3,7 @@ import os import sys from datetime import datetime +from urllib.parse import unquote def expand_tilde_path(input_path: str) -> str: @@ -83,13 +84,56 @@ def to_epoch_ms(value) -> int: def get_workspace_folder_paths(workspace_data: dict) -> list: - """Extract folder paths from workspace.json data.""" + """Extract folder paths from workspace.json data. + + Supports legacy and newer multi-root entry shapes: + - {"folder": ""} + - {"folder": {"path": ""}} (defensive) + - {"folders": [{"path": ""}]} + - {"folders": [{"uri": {"path": ""}}]} + - {"folders": [""]} (defensive) + """ + + def _extract_path(entry) -> str | None: + if isinstance(entry, str): + return entry + if not isinstance(entry, dict): + return None + if isinstance(entry.get("path"), str): + return entry["path"] + uri = entry.get("uri") + if isinstance(uri, str): + return uri + if isinstance(uri, dict): + if isinstance(uri.get("path"), str): + return uri["path"] + if isinstance(uri.get("fsPath"), str): + return uri["fsPath"] + return None + paths = [] - if workspace_data.get("folder"): - paths.append(workspace_data["folder"]) + folder = workspace_data.get("folder") + folder_path = _extract_path(folder) + if folder_path: + paths.append(folder_path) + folders = workspace_data.get("folders") if isinstance(folders, list): for f in folders: - if isinstance(f, dict) and f.get("path"): - paths.append(f["path"]) + p = _extract_path(f) + if p: + paths.append(p) return paths + + +def get_workspace_display_name(workspace_data: dict, fallback: str | None = None) -> str: + """Return a user-friendly workspace name from workspace.json data.""" + for folder in get_workspace_folder_paths(workspace_data): + raw = str(folder).strip() + cleaned = raw.replace("\\", "/").rstrip("/") + leaf = cleaned.split("/")[-1] if cleaned else "" + if leaf: + decoded = unquote(leaf) + if decoded: + return decoded + return fallback or "" From d2686d1a5c13b9a90ef5b5feff697dad1c0b3772 Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Thu, 19 Feb 2026 15:55:09 -0500 Subject: [PATCH 5/8] Address CodeRabbit follow-up findings for exclusion filtering and tests. Include bubble message text in API export exclusion matching, strengthen case-insensitive search test coverage, remove redundant import, improve cross-platform path normalization in workspace tests, and narrow exclusion-rules file read exception handling. Co-authored-by: Cursor --- api/export_api.py | 9 +++++++++ tests/test_invalid_workspace_aliases.py | 11 ++++++----- tests/test_search_exclusion_filtering.py | 2 +- tests/test_workspace_assignment_fallback.py | 5 +++-- utils/exclusion_rules.py | 9 +++++++-- utils/path_helpers.py | 1 - 6 files changed, 26 insertions(+), 11 deletions(-) diff --git a/api/export_api.py b/api/export_api.py index 0757e45..1f7815e 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -187,10 +187,19 @@ def export_chats(): model_config = cd.get("modelConfig") or {} model_name = model_config.get("modelName") model_names = [model_name] if model_name and model_name != "default" else None + bubble_texts = [] + for h in headers: + b = bubble_map.get(h.get("bubbleId")) + if not b: + continue + bt = extract_text_from_bubble(b) + if bt: + bubble_texts.append(bt) searchable = build_searchable_text( project_name=ws_display_name, chat_title=title, model_names=model_names, + chat_content_snippet="\n\n".join(bubble_texts) if bubble_texts else None, ) if is_excluded_by_rules(rules, searchable): continue diff --git a/tests/test_invalid_workspace_aliases.py b/tests/test_invalid_workspace_aliases.py index eb0b294..1236f7f 100644 --- a/tests/test_invalid_workspace_aliases.py +++ b/tests/test_invalid_workspace_aliases.py @@ -6,6 +6,7 @@ import unittest from api.workspaces import _infer_invalid_workspace_aliases +from utils.path_helpers import normalize_file_path class TestInvalidWorkspaceAliases(unittest.TestCase): @@ -23,13 +24,13 @@ def test_majority_vote_alias_selection(self): # Drive inference through project_layouts_map -> workspace_path_map project_layouts_map = { - "cid-1": [r"d:\_Cpp_Digest\boostbacklog"], - "cid-2": [r"d:\_Cpp_Digest\boostbacklog"], - "cid-3": [r"d:\_Cpp_Digest\team-brain"], + "cid-1": [normalize_file_path(r"d:\_Cpp_Digest\boostbacklog")], + "cid-2": [normalize_file_path(r"d:\_Cpp_Digest\boostbacklog")], + "cid-3": [normalize_file_path(r"d:\_Cpp_Digest\team-brain")], } workspace_path_map = { - r"d:\_cpp_digest\boostbacklog": "boost-ws", - r"d:\_cpp_digest\team-brain": "team-ws", + normalize_file_path(r"d:\_cpp_digest\boostbacklog"): "boost-ws", + normalize_file_path(r"d:\_cpp_digest\team-brain"): "team-ws", } aliases = _infer_invalid_workspace_aliases( diff --git a/tests/test_search_exclusion_filtering.py b/tests/test_search_exclusion_filtering.py index 8aa3773..2b72bdd 100644 --- a/tests/test_search_exclusion_filtering.py +++ b/tests/test_search_exclusion_filtering.py @@ -205,7 +205,7 @@ def test_exact_exclusion_keywords_hide_matches_case_insensitive(self): self._set_rules("kwd\n") results_lower = self._search("kwd", "all") - results_upper = self._search("kwd", "all") + results_upper = self._search("KWD", "all") self.assertEqual(results_lower, []) self.assertEqual(results_upper, []) diff --git a/tests/test_workspace_assignment_fallback.py b/tests/test_workspace_assignment_fallback.py index e287171..119bf80 100644 --- a/tests/test_workspace_assignment_fallback.py +++ b/tests/test_workspace_assignment_fallback.py @@ -5,6 +5,7 @@ import unittest from api.workspaces import _determine_project_for_conversation +from utils.path_helpers import normalize_file_path class TestWorkspaceAssignmentFallback(unittest.TestCase): @@ -15,9 +16,9 @@ def test_ignores_invalid_composer_to_workspace_mapping(self): "codeBlockData": {}, } composer_id = "cmp-123" - project_layouts_map = {"cmp-123": ["/d%3A/_Cpp_Digest/boostbacklog"]} + project_layouts_map = {"cmp-123": [normalize_file_path("/d%3A/_Cpp_Digest/boostbacklog")]} project_name_to_workspace_id = {"boostbacklog": "good-ws"} - workspace_path_to_id = {"d:\\_cpp_digest\\boostbacklog": "good-ws"} + workspace_path_to_id = {normalize_file_path("d:\\_cpp_digest\\boostbacklog"): "good-ws"} workspace_entries = [] bubble_map = {} composer_id_to_workspace_id = {"cmp-123": "broken-ws"} diff --git a/utils/exclusion_rules.py b/utils/exclusion_rules.py index 9d79a22..2fe8531 100644 --- a/utils/exclusion_rules.py +++ b/utils/exclusion_rules.py @@ -187,8 +187,13 @@ def load_rules(path: str | None) -> list[list]: tokens = _tokenize_rule(line) if tokens: rules.append(tokens) - except Exception: - _logger.warning("Failed to read exclusion rules from %s", path, exc_info=True) + except (OSError, UnicodeDecodeError) as e: + _logger.warning( + "Failed to read exclusion rules from %s (%s)", + path, + e.__class__.__name__, + exc_info=True, + ) return [] return rules diff --git a/utils/path_helpers.py b/utils/path_helpers.py index c0d40f1..350bc74 100644 --- a/utils/path_helpers.py +++ b/utils/path_helpers.py @@ -28,7 +28,6 @@ def expand_tilde_path(input_path: str) -> str: def normalize_file_path(file_path: str) -> str: """Normalize a file path: strip file:// protocol, URL-decode, fix slashes.""" import re - from urllib.parse import unquote normalized = file_path # Remove file:// protocol From 5622e323b0851a54aa3577a510398214294f1fbb Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Thu, 19 Feb 2026 16:05:44 -0500 Subject: [PATCH 6/8] Address latest CodeRabbit follow-up nits for workspace folder handling. Fix safe first-folder extraction in export path, reuse shared workspace-folder parsing helper in CLI export to avoid logic drift, and switch test path setup to idiomatic sys.path usage. Co-authored-by: Cursor --- api/export_api.py | 3 ++- scripts/export.py | 14 ++++---------- tests/test_search_exclusion_filtering.py | 5 +++-- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/api/export_api.py b/api/export_api.py index 1f7815e..4d87fe0 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -114,7 +114,8 @@ def export_chats(): try: with open(wj, "r", encoding="utf-8") as f: wd = json.load(f) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") + folders = get_workspace_folder_paths(wd) + first_folder = wd.get("folder") or (folders[0] if folders else None) if first_folder: fn = first_folder.replace("\\", "/").split("/")[-1] if fn: diff --git a/scripts/export.py b/scripts/export.py index 02fb842..3075fbf 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -27,6 +27,7 @@ build_searchable_text, is_excluded_by_rules, ) +from utils.path_helpers import get_workspace_folder_paths as _shared_get_workspace_folder_paths def _json_dump_safe(value) -> str: @@ -204,15 +205,7 @@ def extract_text_from_bubble(bubble) -> str: def get_workspace_folder_paths(wd) -> list: - paths = [] - if wd.get("folder"): - paths.append(wd["folder"]) - folders = wd.get("folders") - if isinstance(folders, list): - for f in folders: - if isinstance(f, dict) and f.get("path"): - paths.append(f["path"]) - return paths + return _shared_get_workspace_folder_paths(wd) HELP_TEXT = """\ @@ -310,7 +303,8 @@ def main(): try: with open(e["workspaceJsonPath"], "r", encoding="utf-8") as f: wd = json.load(f) - first_folder = wd.get("folder") or (wd.get("folders", [{}])[0] or {}).get("path") + folders = get_workspace_folder_paths(wd) + first_folder = wd.get("folder") or (folders[0] if folders else None) if first_folder: fn = re.sub(r"^file://", "", first_folder).replace("\\", "/").split("/")[-1] if fn: diff --git a/tests/test_search_exclusion_filtering.py b/tests/test_search_exclusion_filtering.py index 2b72bdd..96b9540 100644 --- a/tests/test_search_exclusion_filtering.py +++ b/tests/test_search_exclusion_filtering.py @@ -8,6 +8,7 @@ import json import os import sqlite3 +import sys import tempfile import unittest from pathlib import Path @@ -16,8 +17,8 @@ # Ensure project root is importable when running directly. _root = Path(__file__).resolve().parent.parent -if str(_root) not in os.sys.path: - os.sys.path.insert(0, str(_root)) +if str(_root) not in sys.path: + sys.path.insert(0, str(_root)) from api.search import bp as search_bp from utils.exclusion_rules import load_rules From b3fdfc28b48faa81b8d478d400b143e387d378a6 Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Thu, 19 Feb 2026 16:13:15 -0500 Subject: [PATCH 7/8] Handle non-string workspace folder values safely in export paths. Use parsed folder-path lists for workspace naming in API and CLI export with explicit string guards, and add debug logging for resilient manifest parsing while documenting intentional broad CLI searchable-text behavior. Co-authored-by: Cursor --- api/export_api.py | 4 ++-- scripts/export.py | 18 +++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/api/export_api.py b/api/export_api.py index 4d87fe0..b2b3637 100644 --- a/api/export_api.py +++ b/api/export_api.py @@ -115,8 +115,8 @@ def export_chats(): with open(wj, "r", encoding="utf-8") as f: wd = json.load(f) folders = get_workspace_folder_paths(wd) - first_folder = wd.get("folder") or (folders[0] if folders else None) - if first_folder: + first_folder = folders[0] if folders else None + if isinstance(first_folder, str) and first_folder: fn = first_folder.replace("\\", "/").split("/")[-1] if fn: ws_id_to_slug[name] = _slug(fn) diff --git a/scripts/export.py b/scripts/export.py index 3075fbf..7cd84b5 100644 --- a/scripts/export.py +++ b/scripts/export.py @@ -7,6 +7,7 @@ """ import json +import logging import os import re import sqlite3 @@ -29,6 +30,8 @@ ) from utils.path_helpers import get_workspace_folder_paths as _shared_get_workspace_folder_paths +_logger = logging.getLogger(__name__) + def _json_dump_safe(value) -> str: """Best-effort JSON serialization for exclusion matching.""" @@ -54,10 +57,10 @@ def _load_manifest_entries(manifest_path: str) -> dict: log_id = entry.get("log_id") if log_id: existing[log_id] = entry - except Exception: - pass - except Exception: - pass + except Exception as e: + _logger.debug("Skipping malformed manifest line in %s: %s", manifest_path, e) + except Exception as e: + _logger.debug("Failed to read manifest %s: %s", manifest_path, e) return existing @@ -304,8 +307,8 @@ def main(): with open(e["workspaceJsonPath"], "r", encoding="utf-8") as f: wd = json.load(f) folders = get_workspace_folder_paths(wd) - first_folder = wd.get("folder") or (folders[0] if folders else None) - if first_folder: + first_folder = folders[0] if folders else None + if isinstance(first_folder, str) and first_folder: fn = re.sub(r"^file://", "", first_folder).replace("\\", "/").split("/")[-1] if fn: workspace_id_to_slug[e["name"]] = slug(fn) @@ -484,7 +487,8 @@ def assign_workspace(cd, cid): model_names = [model_name] if model_name and model_name != "default" else None # Build broad text for exclusion checks so any visible output term can match. - # Includes user/assistant bubble text plus raw metadata that can surface in exports. + # CLI export intentionally includes metadata/tool payload text in addition to + # bubble text because these fields are emitted into exported markdown. bubble_texts = [] bubble_meta_parts = [] for h in headers: From 1e1372e6ee9717c44ed56e2e7babf67fd41f6f8a Mon Sep 17 00:00:00 2001 From: iTinkerBell Date: Thu, 19 Feb 2026 16:27:48 -0500 Subject: [PATCH 8/8] Trigger CI rerun. Add an empty commit to retrigger GitHub hooks/checks. Co-authored-by: Cursor