Skip to content

util

Checkpointer

Source code in src/ursa/util/__init__.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class Checkpointer:
    @classmethod
    def from_workspace(
        cls,
        workspace: Path,
        db_dir: str = "db",
        db_name: str = "checkpointer.db",
    ) -> SqliteSaver:
        (db_path := workspace / db_dir).mkdir(parents=True, exist_ok=True)
        conn = sqlite3.connect(str(db_path / db_name), check_same_thread=False)
        return SqliteSaver(conn)

    @classmethod
    def from_path(
        cls, db_path: Path, db_name: str = "checkpointer.db"
    ) -> SqliteSaver:
        """Make checkpointer sqlite db.

        Args
        ====
        * db_path: The path to the SQLite database file (e.g. ./checkpoint.db) to be created.
        """

        db_path.parent.mkdir(parents=True, exist_ok=True)
        conn = sqlite3.connect(str(db_path / db_name), check_same_thread=False)
        return SqliteSaver(conn)

from_path(db_path, db_name='checkpointer.db') classmethod

Make checkpointer sqlite db.

Args

  • db_path: The path to the SQLite database file (e.g. ./checkpoint.db) to be created.
Source code in src/ursa/util/__init__.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@classmethod
def from_path(
    cls, db_path: Path, db_name: str = "checkpointer.db"
) -> SqliteSaver:
    """Make checkpointer sqlite db.

    Args
    ====
    * db_path: The path to the SQLite database file (e.g. ./checkpoint.db) to be created.
    """

    db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(str(db_path / db_name), check_same_thread=False)
    return SqliteSaver(conn)

diff_renderer

DiffRenderer

Renderable diff—console.print(DiffRenderer(...))

Source code in src/ursa/util/diff_renderer.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class DiffRenderer:
    """Renderable diff—`console.print(DiffRenderer(...))`"""

    def __init__(self, content: str, updated: str, filename: str):
        # total lines in each version
        self._old_total = len(content.splitlines())
        self._new_total = len(updated.splitlines())

        # number of digits in the largest count
        self._num_width = len(str(max(self._old_total, self._new_total))) + 2

        # get the diff
        self._diff_lines = list(
            difflib.unified_diff(
                content.splitlines(),
                updated.splitlines(),
                fromfile=f"{filename} (original)",
                tofile=f"{filename} (modified)",
                lineterm="",
            )
        )

        # get syntax style
        try:
            self._lexer_name = Syntax.guess_lexer(filename, updated)
        except Exception:
            self._lexer_name = "text"

    def __rich_console__(
        self, console: Console, opts: ConsoleOptions
    ) -> RenderResult:
        old_line = new_line = None
        width = console.width

        for raw in self._diff_lines:
            # grab line numbers from hunk header
            if m := _HUNK_RE.match(raw):
                old_line, new_line = map(int, m.groups())
                # build a marker
                n = self._num_width
                tick_col = "." * (n - 1)
                indent_ticks = f" {tick_col} {tick_col}"
                # pad to the indent width
                full_indent = indent_ticks.ljust(2 * n + 3)
                yield Text(
                    f"{full_indent}{raw}".ljust(width), style="white on grey30"
                )
                continue

            # skip header lines
            if raw.startswith(("---", "+++")):
                continue

            # split the line
            if raw.startswith("+"):
                style = _STYLE["add"]
                code = raw[1:]
            elif raw.startswith("-"):
                style = _STYLE["del"]
                code = raw[1:]
            else:
                style = _STYLE["ctx"]
                code = raw.lstrip()

            # compute line numbers
            if raw.startswith("+"):
                old_num, new_num = None, new_line
                new_line += 1
            elif raw.startswith("-"):
                old_num, new_num = old_line, None
                old_line += 1
            else:
                old_num, new_num = old_line, new_line
                old_line += 1
                new_line += 1

            old_str = str(old_num) if old_num is not None else " "
            new_str = str(new_num) if new_num is not None else " "

            # Syntax-highlight the code part
            syntax = Syntax(
                code, self._lexer_name, line_numbers=False, word_wrap=False
            )
            text_code: Text = syntax.highlight(code)
            if text_code.plain.endswith("\n"):
                text_code = text_code[:-1]
            # apply background
            text_code.stylize(style.bg)

            # line numbers + code
            nums = Text(
                f"{old_str:>{self._num_width}}{new_str:>{self._num_width}} ",
                style=f"white {style.bg}",
            )
            diff_mark = Text(style.prefix, style=f"bright_white {style.bg}")
            line_text = nums + diff_mark + text_code

            # pad to console width
            pad_len = width - line_text.cell_len
            if pad_len > 0:
                line_text.append(" " * pad_len, style=style.bg)

            yield line_text

github_research

Fetch recent issues & PRs from GitHub repos for planning context.

Uses the gh CLI (https://cli.github.com/) which handles authentication transparently. Falls back gracefully when gh is not installed or when a repo URL does not point at GitHub.

fetch_repo_context(owner, repo, *, max_issues=10, max_prs=10, issue_state='all', pr_state='all')

Fetch recent issues and PRs for a single GitHub repo.

Returns a formatted text block suitable for inclusion in a planner prompt.

Source code in src/ursa/util/github_research.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def fetch_repo_context(
    owner: str,
    repo: str,
    *,
    max_issues: int = 10,
    max_prs: int = 10,
    issue_state: str = "all",
    pr_state: str = "all",
) -> str:
    """Fetch recent issues and PRs for a single GitHub repo.

    Returns a formatted text block suitable for inclusion in a planner prompt.
    """
    sections: list[str] = []
    sections.append(f"## {owner}/{repo}")

    # Recent issues
    try:
        issues = _gh_api(
            f"/repos/{owner}/{repo}/issues?state={issue_state}"
            f"&per_page={max_issues}&sort=updated&direction=desc"
        )
        # gh api may return PRs mixed with issues; filter them out
        pure_issues = [i for i in issues if "pull_request" not in i][
            :max_issues
        ]
        if pure_issues:
            sections.append(f"### Recent issues ({len(pure_issues)})")
            sections.extend(_format_issue(issue) for issue in pure_issues)
        else:
            sections.append("### Recent issues: none")
    except Exception as exc:  # noqa: BLE001
        sections.append(f"### Issues: could not fetch ({exc})")

    # Recent PRs
    try:
        prs = _gh_api(
            f"/repos/{owner}/{repo}/pulls?state={pr_state}"
            f"&per_page={max_prs}&sort=updated&direction=desc"
        )
        if prs:
            sections.append(f"### Recent pull requests ({len(prs[:max_prs])})")
            sections.extend(_format_pr(pr) for pr in prs[:max_prs])
        else:
            sections.append("### Recent pull requests: none")
    except Exception as exc:  # noqa: BLE001
        sections.append(f"### PRs: could not fetch ({exc})")

    return "\n".join(sections)

gather_github_context(repos, *, max_issues=10, max_prs=10)

Gather GitHub context for all repos that have GitHub URLs.

Parameters

repos: List of repo config dicts (each must have at least url and name). max_issues: Maximum recent issues to fetch per repo. max_prs: Maximum recent PRs to fetch per repo.

Returns

Formatted text block with issues/PRs across repos, or None if nothing was fetched (e.g. no GitHub URLs, gh not installed).

Source code in src/ursa/util/github_research.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def gather_github_context(
    repos: list[dict],
    *,
    max_issues: int = 10,
    max_prs: int = 10,
) -> str | None:
    """Gather GitHub context for all repos that have GitHub URLs.

    Parameters
    ----------
    repos:
        List of repo config dicts (each must have at least ``url`` and ``name``).
    max_issues:
        Maximum recent issues to fetch per repo.
    max_prs:
        Maximum recent PRs to fetch per repo.

    Returns
    -------
    Formatted text block with issues/PRs across repos, or ``None`` if nothing
    was fetched (e.g. no GitHub URLs, ``gh`` not installed).
    """
    if not _gh_available():
        return None

    blocks: list[str] = []
    for repo in repos:
        parsed = parse_github_owner_repo(repo.get("url", ""))
        if not parsed:
            continue
        owner, name = parsed
        try:
            block = fetch_repo_context(
                owner, name, max_issues=max_issues, max_prs=max_prs
            )
            blocks.append(block)
        except Exception:  # noqa: BLE001, S112
            # Network issue, auth issue, etc. -- skip silently
            continue

    if not blocks:
        return None
    return "\n\n".join(blocks)

parse_github_owner_repo(url)

Extract (owner, repo) from a GitHub clone URL.

Supports both HTTPS and SSH URLs. Returns None for non-GitHub URLs.

Source code in src/ursa/util/github_research.py
21
22
23
24
25
26
27
28
29
def parse_github_owner_repo(url: str) -> tuple[str, str] | None:
    """Extract ``(owner, repo)`` from a GitHub clone URL.

    Supports both HTTPS and SSH URLs.  Returns ``None`` for non-GitHub URLs.
    """
    m = _GH_URL_RE.search(url or "")
    if m:
        return m.group("owner"), m.group("repo")
    return None

has_optional_dep_group

has_optional_dep_group(dep)

Check whether an optional dependency group is installed.

Attempts to import each module.

Source code in src/ursa/util/has_optional_dep_group.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def has_optional_dep_group(dep: str | list[str]) -> bool:
    """
    Check whether an optional dependency group is installed.

    Attempts to import each module.
    """
    if isinstance(dep, str):
        deps: Iterable[str] = [dep]
    else:
        deps = dep

    for d in deps:
        try:
            importlib.import_module(d)
        except Exception:
            return False
    return True

helperFunctions

run_tool_calls(ai_msg, tools)

Parameters:

Name Type Description Default
ai_msg AIMessage

The LLM's AIMessage containing tool calls.

required
tools ToolRegistry | Iterable[Runnable | Callable[..., Any]]

Either a dict {name: tool} or an iterable of tools (must have .name for mapping). Each tool can be a Runnable or a plain callable.

required

Returns:

Name Type Description
out list[BaseMessage]

list[BaseMessage] to feed back to the model

Source code in src/ursa/util/helperFunctions.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def run_tool_calls(
    ai_msg: AIMessage,
    tools: ToolRegistry | Iterable[Runnable | Callable[..., Any]],
) -> list[BaseMessage]:
    """
    Args:
        ai_msg: The LLM's AIMessage containing tool calls.
        tools: Either a dict {name: tool} or an iterable of tools (must have `.name`
               for mapping). Each tool can be a Runnable or a plain callable.

    Returns:
        out: list[BaseMessage] to feed back to the model
    """
    # Build a name->tool map
    if isinstance(tools, dict):
        registry: ToolRegistry = tools  # type: ignore
    else:
        registry = {}
        for t in tools:
            name = getattr(t, "name", None) or getattr(t, "__name__", None)
            if not name:
                raise ValueError(f"Tool {t!r} has no discoverable name.")
            registry[name] = t  # type: ignore

    calls = extract_tool_calls(ai_msg)

    if not calls:
        return []

    out: list[BaseMessage] = []
    for call in calls:
        name = call.get("name")
        args = call.get("args", {}) or {}
        call_id = call.get("id") or f"call_{uuid.uuid4().hex}"

        # 1) the AIMessage that generated the call
        out.append(ai_msg)

        # 2) the ToolMessage with the execution result (or error)
        if name not in registry:
            content = f"ERROR: unknown tool '{name}'."
        else:
            try:
                result = _invoke_tool(registry[name], args)
                content = _stringify_output(result)
            except Exception as e:
                content = f"ERROR: {type(e).__name__}: {e}"

        out.append(
            ToolMessage(content=content, tool_call_id=call_id, name=name)
        )

    return out

logo_generator

generate_logo_sync(*, problem_text, workspace, out_dir, filename=None, model=DEFAULT_IMAGE_MODEL, size=None, background='opaque', quality='high', n=1, overwrite=False, style='sticker', allow_text=False, palette=None, mode='logo', aspect='square', style_intensity='overt', aperture=DEFAULT_APERTURE, console=None, image_model_provider='openai', image_provider_kwargs=None)

Generate images.

Key change (diversity): - We no longer rely on a single prompt with n>1 siblings for scenes. - If mode='scene' and style='random' and n>1, we pick n distinct scene styles (horror/fantasy/etc) and generate 1 image per style/prompt.

Return value
  • Returns the "main" path (first generated image). Additional variants are saved alongside it.
Source code in src/ursa/util/logo_generator.py
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
def generate_logo_sync(
    *,
    problem_text: str,
    workspace: str,
    out_dir: str | Path,
    filename: str | None = None,
    model: str = DEFAULT_IMAGE_MODEL,
    size: str | None = None,
    background: str = "opaque",
    quality: str = "high",
    n: int = 1,
    overwrite: bool = False,
    style: str = "sticker",
    allow_text: bool = False,
    palette: str | None = None,
    mode: str = "logo",
    aspect: str = "square",
    style_intensity: str = "overt",
    aperture: float = DEFAULT_APERTURE,
    console: Optional[Console] = None,
    image_model_provider: str = "openai",
    image_provider_kwargs: Optional[dict] = None,
) -> Path:
    """
    Generate images.

    Key change (diversity):
      - We no longer rely on a single prompt with n>1 siblings for scenes.
      - If mode='scene' and style='random' and n>1, we pick n distinct scene styles
        (horror/fantasy/etc) and generate 1 image per style/prompt.

    Return value:
      - Returns the "main" path (first generated image). Additional variants are saved alongside it.
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    # this is how we'll pass through a vision model and provider/url/endpoint
    client_kwargs = {}
    if image_provider_kwargs:
        # Only pass through safe/known kwargs
        for k in ("api_key", "base_url", "organization"):
            if k in image_provider_kwargs and image_provider_kwargs[k]:
                client_kwargs[k] = image_provider_kwargs[k]
    client = OpenAI(**client_kwargs)

    final_size = _normalize_size(size, aspect, mode)
    # Scenes tend to look odd with transparent backgrounds; force opaque.
    final_background = "opaque" if mode == "scene" else background

    # -------------------------
    # Multi-style scene generation (requested change)
    # -------------------------
    if (
        mode == "scene"
        and n > 1
        and SCENE_MULTI_STYLE_DEFAULT
        and (style is None or style.strip().lower() == "random")
        and filename
        is None  # filename implies "single series"; keep single-style naming
    ):
        style_slugs = _choose_n_distinct_styles(n)
        out_paths = _compose_multi_scene_paths(out_dir, workspace, style_slugs)

        # If everything already exists and overwrite is False, skip regeneration.
        if not overwrite and all(p.exists() for p in out_paths):
            return out_paths[0]

        for idx, (style_slug, path) in enumerate(
            zip(style_slugs, out_paths), start=1
        ):
            prompt, _ = _craft_logo_prompt(
                problem_text,
                workspace,
                style=style_slug,
                allow_text=allow_text,
                palette=palette,
                mode="scene",
                style_intensity=style_intensity,
                aperture=aperture,
            )

            extra_title = (
                f"[bold magenta]mode: scene[/bold magenta] [dim]•[/dim] "
                f"aspect: {aspect} [dim]•[/dim] variant {idx}/{n}"
            )
            _render_prompt_panel(
                console=console,
                style_slug=style_slug,
                workspace=workspace,
                prompt=prompt,
                extra_title=extra_title,
            )

            if path.exists() and not overwrite:
                continue

            kwargs = dict(
                model=model,
                prompt=prompt,
                size=final_size,
                n=1,
                quality=quality,
                background=final_background,
            )
            try:
                resp = client.images.generate(**kwargs)
            except Exception:
                # Some models ignore/forbid background=; retry without it
                kwargs.pop("background", None)
                resp = client.images.generate(**kwargs)

            path.write_bytes(base64.b64decode(resp.data[0].b64_json))

        return out_paths[0]

    # -------------------------
    # Default behavior (single-style series)
    #   - Also improved diversity: when n>1, we do n separate prompts (not siblings).
    # -------------------------
    # Build filenames for this series
    prompt0, style_slug0 = _craft_logo_prompt(
        problem_text,
        workspace,
        style=style,
        allow_text=allow_text,
        palette=palette,
        mode=mode,
        style_intensity=style_intensity,
        aperture=aperture,
    )

    main_path, alt_paths = _compose_filenames(
        out_dir, style_slug0, filename, n, mode=mode
    )

    # If everything exists and overwrite is False, return main
    if (
        not overwrite
        and main_path.exists()
        and all(p.exists() for p in alt_paths)
    ):
        return main_path

    # Generate 1 image per prompt (more divergence than n>1 siblings)
    paths = [main_path] + alt_paths
    for idx, path in enumerate(paths, start=1):
        # For n>1, rebuild prompt each time so pools + aperture actually matter
        prompt_i, style_slug_i = (
            (prompt0, style_slug0)
            if idx == 1
            else _craft_logo_prompt(
                problem_text,
                workspace,
                style=style,
                allow_text=allow_text,
                palette=palette,
                mode=mode,
                style_intensity=style_intensity,
                aperture=aperture,
            )
        )

        extra_title = (
            f"[bold magenta]mode: {mode}[/bold magenta] [dim]•[/dim] "
            f"aspect: {aspect} [dim]•[/dim] variant {idx}/{len(paths)}"
        )
        _render_prompt_panel(
            console=console,
            style_slug=style_slug_i,
            workspace=workspace,
            prompt=prompt_i,
            extra_title=extra_title,
        )

        if path.exists() and not overwrite:
            continue

        kwargs = dict(
            model=model,
            prompt=prompt_i,
            size=final_size,
            n=1,
            quality=quality,
            background=final_background,
        )
        try:
            resp = client.images.generate(**kwargs)
        except Exception:
            kwargs.pop("background", None)
            resp = client.images.generate(**kwargs)

        path.write_bytes(base64.b64decode(resp.data[0].b64_json))

    return main_path

mcp

memory_logger

AgentMemory

Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

Parameters

path : str | Path | None Where to keep the on-disk Chroma DB. If None, a folder called agent_memory_db is created in the package’s base directory. collection_name : str Name of the Chroma collection. embedding_model : | None the embedding model

Notes
  • Requires langchain-chroma, and chromadb.
Source code in src/ursa/util/memory_logger.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class AgentMemory:
    """
    Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

    Parameters
    ----------
    path : str | Path | None
        Where to keep the on-disk Chroma DB.  If *None*, a folder called
        ``agent_memory_db`` is created in the package’s base directory.
    collection_name : str
        Name of the Chroma collection.
    embedding_model : <TODO> | None
        the embedding model

    Notes
    -----
    * Requires `langchain-chroma`, and `chromadb`.
    """

    @classmethod
    def get_db_path(cls, path: Optional[str | Path]) -> Path:
        match path:
            case None:
                return Path.home() / ".cache" / "ursa" / "rag" / "db"
            case str():
                return Path(path)
            case Path():
                return path
            case _:
                raise TypeError(
                    f"Type of path is `{type(path)}` "
                    "but `Optional[str | Path]` was expected."
                )

    def __init__(
        self,
        embedding_model,
        path: Optional[str | Path] = None,
        collection_name: str = "agent_memory",
    ) -> None:
        self.path = self.get_db_path(path)
        self.collection_name = collection_name
        self.path.mkdir(parents=True, exist_ok=True)
        self.embeddings = embedding_model

        # If a DB already exists, load it; otherwise defer creation until `build_index`.
        self.vectorstore: Optional[Chroma] = None
        if any(self.path.iterdir()):
            self.vectorstore = Chroma(
                collection_name=self.collection_name,
                embedding_function=self.embeddings,
                persist_directory=str(self.path),
            )

    # --------------------------------------------------------------------- #
    # ❶ Build & index a brand-new database                                   #
    # --------------------------------------------------------------------- #
    def build_index(
        self,
        chunks: Sequence[str],
        metadatas: Optional[Sequence[dict[str, Any]]] = None,
    ) -> None:
        """
        Create a fresh vector store from ``chunks``.  Existing data (if any)
        are overwritten.

        Parameters
        ----------
        chunks : Sequence[str]
            Text snippets (already chunked) to embed.
        metadatas : Sequence[dict] | None
            Optional metadata dict for each chunk, same length as ``chunks``.
        """
        docs = [
            Document(
                page_content=text, metadata=metadatas[i] if metadatas else {}
            )
            for i, text in enumerate(chunks)
        ]

        # Create (or overwrite) the collection
        self.vectorstore = Chroma.from_documents(
            documents=docs,
            embedding=self.embeddings,
            collection_name=self.collection_name,
            persist_directory=str(self.path),
        )

    # --------------------------------------------------------------------- #
    # ❷ Add new chunks and re-index                                          #
    # --------------------------------------------------------------------- #
    def add_memories(
        self,
        new_chunks: Sequence[str],
        metadatas: Optional[Sequence[dict[str, Any]]] = None,
    ) -> None:
        """
        Append new text chunks to the existing store (must call `build_index`
        first if the DB is empty).

        Raises
        ------
        RuntimeError
            If the vector store is not yet initialised.
        """
        if self.vectorstore is None:
            self.build_index(new_chunks, metadatas)
            print("----- Vector store initialised -----")

        docs = []
        for i, text in enumerate(new_chunks):
            if len(text) > 0:  # only add non-empty documents
                docs.append(
                    Document(
                        page_content=text,
                        metadata=metadatas[i] if metadatas else {},
                    )
                )
        self.vectorstore.add_documents(docs)

    # --------------------------------------------------------------------- #
    # ❸ Retrieve relevant chunks (RAG query)                                 #
    # --------------------------------------------------------------------- #
    def retrieve(
        self,
        query: str,
        k: int = 4,
        with_scores: bool = False,
        **search_kwargs,
    ):
        """
        Return the *k* most similar chunks for `query`.

        Parameters
        ----------
        query : str
            Natural-language question or statement.
        k : int
            How many results to return.
        with_scores : bool
            If True, also return similarity scores.
        **search_kwargs
            Extra kwargs forwarded to Chroma’s ``similarity_search*`` helpers.

        Returns
        -------
        list[Document] | list[tuple[Document, float]]
        """
        if self.vectorstore is None:
            return ["None"]

        if with_scores:
            return self.vectorstore.similarity_search_with_score(
                query, k=k, **search_kwargs
            )
        return self.vectorstore.similarity_search(query, k=k, **search_kwargs)

add_memories(new_chunks, metadatas=None)

Append new text chunks to the existing store (must call build_index first if the DB is empty).

Raises

RuntimeError If the vector store is not yet initialised.

Source code in src/ursa/util/memory_logger.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def add_memories(
    self,
    new_chunks: Sequence[str],
    metadatas: Optional[Sequence[dict[str, Any]]] = None,
) -> None:
    """
    Append new text chunks to the existing store (must call `build_index`
    first if the DB is empty).

    Raises
    ------
    RuntimeError
        If the vector store is not yet initialised.
    """
    if self.vectorstore is None:
        self.build_index(new_chunks, metadatas)
        print("----- Vector store initialised -----")

    docs = []
    for i, text in enumerate(new_chunks):
        if len(text) > 0:  # only add non-empty documents
            docs.append(
                Document(
                    page_content=text,
                    metadata=metadatas[i] if metadatas else {},
                )
            )
    self.vectorstore.add_documents(docs)

build_index(chunks, metadatas=None)

Create a fresh vector store from chunks. Existing data (if any) are overwritten.

Parameters

chunks : Sequence[str] Text snippets (already chunked) to embed. metadatas : Sequence[dict] | None Optional metadata dict for each chunk, same length as chunks.

Source code in src/ursa/util/memory_logger.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def build_index(
    self,
    chunks: Sequence[str],
    metadatas: Optional[Sequence[dict[str, Any]]] = None,
) -> None:
    """
    Create a fresh vector store from ``chunks``.  Existing data (if any)
    are overwritten.

    Parameters
    ----------
    chunks : Sequence[str]
        Text snippets (already chunked) to embed.
    metadatas : Sequence[dict] | None
        Optional metadata dict for each chunk, same length as ``chunks``.
    """
    docs = [
        Document(
            page_content=text, metadata=metadatas[i] if metadatas else {}
        )
        for i, text in enumerate(chunks)
    ]

    # Create (or overwrite) the collection
    self.vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=self.embeddings,
        collection_name=self.collection_name,
        persist_directory=str(self.path),
    )

retrieve(query, k=4, with_scores=False, **search_kwargs)

Return the k most similar chunks for query.

Parameters

query : str Natural-language question or statement. k : int How many results to return. with_scores : bool If True, also return similarity scores. **search_kwargs Extra kwargs forwarded to Chroma’s similarity_search* helpers.

Returns

list[Document] | list[tuple[Document, float]]

Source code in src/ursa/util/memory_logger.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def retrieve(
    self,
    query: str,
    k: int = 4,
    with_scores: bool = False,
    **search_kwargs,
):
    """
    Return the *k* most similar chunks for `query`.

    Parameters
    ----------
    query : str
        Natural-language question or statement.
    k : int
        How many results to return.
    with_scores : bool
        If True, also return similarity scores.
    **search_kwargs
        Extra kwargs forwarded to Chroma’s ``similarity_search*`` helpers.

    Returns
    -------
    list[Document] | list[tuple[Document, float]]
    """
    if self.vectorstore is None:
        return ["None"]

    if with_scores:
        return self.vectorstore.similarity_search_with_score(
            query, k=k, **search_kwargs
        )
    return self.vectorstore.similarity_search(query, k=k, **search_kwargs)

delete_database(path=None)

Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

Parameters

path : str | Path | None Where the on-disk Chroma DB is for deleting. If None, a folder called agent_memory_db is created in the package’s base directory.

Source code in src/ursa/util/memory_logger.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def delete_database(path: Optional[str | Path] = None):
    """
    Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

    Parameters
    ----------
    path : str | Path | None
        Where the on-disk Chroma DB is for deleting.  If *None*, a folder called
        ``agent_memory_db`` is created in the package’s base directory.
    """
    db_path = AgentMemory.get_db_path(path)
    if os.path.exists(db_path):
        shutil.rmtree(db_path)
        print(f"Database: {db_path} has been deleted.")
    else:
        print("No database found to delete.")

parse

extract_json(text)

Extract a JSON object or array from text that might contain markdown or other content.

The function attempts three strategies
  1. Extract JSON from a markdown code block labeled as JSON.
  2. Extract JSON from any markdown code block.
  3. Use bracket matching to extract a JSON substring starting with '{' or '['.

Returns:

Type Description
list[dict]

A Python object parsed from the JSON string (dict or list).

Raises:

Type Description
ValueError

If no valid JSON is found.

Source code in src/ursa/util/parse.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def extract_json(text: str) -> list[dict]:
    """
    Extract a JSON object or array from text that might contain markdown or other content.

    The function attempts three strategies:
        1. Extract JSON from a markdown code block labeled as JSON.
        2. Extract JSON from any markdown code block.
        3. Use bracket matching to extract a JSON substring starting with '{' or '['.

    Returns:
        A Python object parsed from the JSON string (dict or list).

    Raises:
        ValueError: If no valid JSON is found.
    """
    # Approach 1: Look for a markdown code block specifically labeled as JSON.
    labeled_block = re.search(
        r"```json\s*([\[{].*?[\]}])\s*```", text, re.DOTALL
    )
    if labeled_block:
        json_str = labeled_block.group(1).strip()
        try:
            return json.loads(json_str)
        except json.JSONDecodeError:
            # Fall back to the next approach if parsing fails.
            pass

    # Approach 2: Look for any code block delimited by triple backticks.
    generic_block = re.search(r"```(.*?)```", text, re.DOTALL)
    if generic_block:
        json_str = generic_block.group(1).strip()
        if json_str.startswith(("{", "[")):
            try:
                return json.loads(json_str)
            except json.JSONDecodeError:
                pass

    # Approach 3: Attempt to extract JSON using bracket matching.
    # Find the first occurrence of either '{' or '['.
    first_obj = text.find("{")
    first_arr = text.find("[")
    if first_obj == -1 and first_arr == -1:
        raise ValueError("No JSON object or array found in the text.")

    # Determine which bracket comes first.
    if first_obj == -1:
        start = first_arr
        open_bracket = "["
        close_bracket = "]"
    elif first_arr == -1:
        start = first_obj
        open_bracket = "{"
        close_bracket = "}"
    else:
        if first_obj < first_arr:
            start = first_obj
            open_bracket = "{"
            close_bracket = "}"
        else:
            start = first_arr
            open_bracket = "["
            close_bracket = "]"

    # Bracket matching: find the matching closing bracket.
    depth = 0
    end = None
    for i in range(start, len(text)):
        if text[i] == open_bracket:
            depth += 1
        elif text[i] == close_bracket:
            depth -= 1
            if depth == 0:
                end = i
                break

    if end is None:
        raise ValueError(
            "Could not find matching closing bracket for JSON content."
        )

    json_str = text[start : end + 1]
    try:
        return json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError("Extracted content is not valid JSON.") from e

extract_main_text_only(html, *, max_chars=250000)

Returns plain text with navigation/ads/scripts removed. Prefers trafilatura -> jusText -> BS4 paragraphs.

Source code in src/ursa/util/parse.py
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def extract_main_text_only(html: str, *, max_chars: int = 250_000) -> str:
    """
    Returns plain text with navigation/ads/scripts removed.
    Prefers trafilatura -> jusText -> BS4 paragraphs.
    """
    # 1) Trafilatura
    # You can tune config: with_metadata, include_comments, include_images, favor_recall, etc.
    cfg = trafilatura.settings.use_config()
    cfg.set("DEFAULT", "include_comments", "false")
    cfg.set("DEFAULT", "include_tables", "false")
    cfg.set("DEFAULT", "favor_recall", "false")  # be stricter; less noise
    try:
        # If you fetched HTML already, use extract() on string; otherwise, fetch_url(url)
        txt = trafilatura.extract(
            html,
            config=cfg,
            include_comments=False,
            include_tables=False,
            favor_recall=False,
        )
        if txt and txt.strip():
            txt = _normalize_ws(txt)
            txt = _dedupe_lines(txt)
            return txt[:max_chars]
    except Exception:  # noqa: BLE001, S110
        pass

    # 2) jusText
    try:
        paragraphs = justext.justext(html, justext.get_stoplist("English"))
        body_paras = [p.text for p in paragraphs if not p.is_boilerplate]
        if body_paras:
            txt = _normalize_ws("\n\n".join(body_paras))
            txt = _dedupe_lines(txt)
            return txt[:max_chars]
    except Exception:  # noqa: BLE001, S110
        pass

    # 4) last-resort: BS4 paragraphs/headings only
    from bs4 import BeautifulSoup

    soup = BeautifulSoup(html, "html.parser")
    for tag in soup([
        "script",
        "style",
        "noscript",
        "header",
        "footer",
        "nav",
        "form",
        "aside",
    ]):
        tag.decompose()
    chunks = []
    for el in soup.find_all(["h1", "h2", "h3", "p", "li", "figcaption"]):
        t = el.get_text(" ", strip=True)
        if t:
            chunks.append(t)
    txt = _normalize_ws("\n\n".join(chunks))
    txt = _dedupe_lines(txt)
    return txt[:max_chars]

read_text_file(path)

Reads in a file at a given path into a string

Parameters:

Name Type Description Default
path str | Path

string filename, with path, to read in

required
Source code in src/ursa/util/parse.py
665
666
667
668
669
670
671
672
673
674
675
676
677
def read_text_file(path: str | Path) -> str:
    """
    Reads in a file at a given path into a string

    Args:
        path: string filename, with path, to read in
    """
    try:
        with open(path, "r", encoding="utf-8") as file:
            return file.read()
    except UnicodeDecodeError:
        # If UTF-8 fails, it's likely binary
        raise ValueError(f"File appears to be binary: {path}")

resolve_pdf_from_osti_record(rec, *, headers=None, unpaywall_email=None, timeout=25)

Returns (pdf_url, landing_used, note) - pdf_url: direct downloadable PDF URL if found (or a strong candidate) - landing_used: landing page URL we parsed (if any) - note: brief trace of how we found it

Source code in src/ursa/util/parse.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def resolve_pdf_from_osti_record(
    rec: dict[str, Any],
    *,
    headers: dict[str, str] | None = None,
    unpaywall_email: str | None = None,
    timeout: int = 25,
) -> tuple[str | None, str | None, str]:
    """
    Returns (pdf_url, landing_used, note)
      - pdf_url: direct downloadable PDF URL if found (or a strong candidate)
      - landing_used: landing page URL we parsed (if any)
      - note: brief trace of how we found it
    """
    headers = headers or {"User-Agent": "Mozilla/5.0"}
    note_parts: list[str] = []

    links = rec.get("links", []) or []
    # doi = rec.get("doi")

    # 1) Try 'fulltext' first (OSTI purl)
    fulltext = None
    for link in links:
        if link.get("rel") == "fulltext":
            fulltext = link.get("href")
            break

    if fulltext:
        note_parts.append("Tried links[fulltext] purl")
        try:
            # Follow redirects; stream to peek headers without loading whole body
            r = requests.get(
                fulltext,
                headers=headers,
                timeout=timeout,
                allow_redirects=True,
                stream=True,
            )
            r.raise_for_status()

            if _is_pdf_response(r):
                note_parts.append("fulltext resolved directly to PDF")
                return (r.url, None, " | ".join(note_parts))

            # Not a PDF: parse page HTML for meta or obvious PDF anchors
            # (If server sent binary but CT lied, _is_pdf_response would have caught via CD or ext)
            r.close()
            soup = _get_soup(fulltext, timeout=timeout, headers=headers)
            candidate = _find_pdf_on_landing(soup, fulltext)
            if candidate:
                note_parts.append(
                    "found PDF via meta/anchor on fulltext landing"
                )
                return (candidate, fulltext, " | ".join(note_parts))
        except Exception as e:  # noqa: BLE001
            note_parts.append(f"fulltext failed: {e}")

    # 2) Try DOE PAGES landing (citation_doe_pages)
    doe_pages = None
    for link in links:
        if link.get("rel") == "citation_doe_pages":
            doe_pages = link.get("href")
            break

    if doe_pages:
        note_parts.append("Tried links[citation_doe_pages] landing")
        try:
            soup = _get_soup(doe_pages, timeout=timeout, headers=headers)
            candidate = _find_pdf_on_landing(soup, doe_pages)
            if candidate:
                # Candidate may itself be a landing—check if it serves PDF
                try:
                    r2 = requests.get(
                        candidate,
                        headers=headers,
                        timeout=timeout,
                        allow_redirects=True,
                        stream=True,
                    )
                    r2.raise_for_status()
                    if _is_pdf_response(r2):
                        note_parts.append("citation_doe_pages → direct PDF")
                        return (r2.url, doe_pages, " | ".join(note_parts))
                    r2.close()
                except Exception:  # noqa: BLE001, S110
                    pass
                # If not clearly PDF, still return as a candidate (agent will fetch & parse)
                note_parts.append(
                    "citation_doe_pages → PDF-like candidate (not confirmed by headers)"
                )
                return (candidate, doe_pages, " | ".join(note_parts))
        except Exception as e:  # noqa: BLE001
            note_parts.append(f"citation_doe_pages failed: {e}")

    # # 3) Optional: DOI → Unpaywall OA
    # if doi and unpaywall_email:
    #     note_parts.append("Tried Unpaywall via DOI")
    #     pdf_from_ua = _resolve_pdf_via_unpaywall(doi, unpaywall_email)
    #     if pdf_from_ua:
    #         # May be direct PDF or landing; the caller will validate headers during download
    #         note_parts.append("Unpaywall returned candidate")
    #         return (pdf_from_ua, None, " | ".join(note_parts))

    # 4) Give up
    note_parts.append("No PDF found")
    return (None, None, " | ".join(note_parts))

plan_execute_utils

Shared utilities for plan_execute workflows.

This module contains common functionality used by both single-repo and multi-repo plan/execute workflows to reduce duplication and improve maintainability.

deep_merge_dicts(base, override)

Recursively merge override into base and return a new dict. - dict + dict => deep merge - otherwise => override wins

Source code in src/ursa/util/plan_execute_utils.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def deep_merge_dicts(base: dict, override: dict) -> dict:
    """
    Recursively merge override into base and return a new dict.
    - dict + dict => deep merge
    - otherwise => override wins
    """
    base = dict(base or {})
    override = dict(override or {})
    out = dict(base)
    for k, v in override.items():
        if isinstance(v, dict) and isinstance(out.get(k), dict):
            out[k] = deep_merge_dicts(out[k], v)
        else:
            out[k] = v
    return out

fmt_elapsed(seconds)

Format elapsed seconds as compact h:mm:ss or m:ss.

Source code in src/ursa/util/plan_execute_utils.py
530
531
532
533
534
535
536
537
538
539
def fmt_elapsed(seconds: float) -> str:
    """Format elapsed seconds as compact h:mm:ss or m:ss."""
    s = int(seconds)
    if s < 60:
        return f"{s}s"
    m, s = divmod(s, 60)
    if m < 60:
        return f"{m}m{s:02d}s"
    h, m = divmod(m, 60)
    return f"{h}h{m:02d}m"

generate_workspace_name(project='run')

Generate a workspace name using randomname, with timestamp fallback.

Source code in src/ursa/util/plan_execute_utils.py
60
61
62
63
64
65
66
def generate_workspace_name(project: str = "run") -> str:
    """Generate a workspace name using randomname, with timestamp fallback."""
    try:
        suffix = randomname.get_name(adj=_RANDOMNAME_ADJ, noun=_RANDOMNAME_NOUN)
    except Exception:
        suffix = time.strftime("%Y%m%d-%H%M%S")
    return f"{project}_{suffix}"

hash_plan(plan_steps)

Generate a stable hash of plan steps for change detection.

Source code in src/ursa/util/plan_execute_utils.py
137
138
139
140
141
142
143
144
145
146
147
def hash_plan(plan_steps: list | tuple) -> str:
    """Generate a stable hash of plan steps for change detection."""
    serial = json.dumps(
        [
            step.model_dump() if hasattr(step, "model_dump") else step
            for step in plan_steps
        ],
        sort_keys=True,
        default=str,
    )
    return hashlib.sha256(serial.encode("utf-8")).hexdigest()

load_json_file(path, default)

Load JSON from a file path, returning default on missing/invalid JSON.

Source code in src/ursa/util/plan_execute_utils.py
85
86
87
88
89
90
91
92
93
def load_json_file(path: str | Path, default: Any):
    """Load JSON from a file path, returning default on missing/invalid JSON."""
    p = Path(path)
    if not p.exists():
        return default
    try:
        return json.loads(p.read_text())
    except Exception:
        return default

load_yaml_config(path)

Load a YAML config file and return as a SimpleNamespace.

Source code in src/ursa/util/plan_execute_utils.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def load_yaml_config(path: str) -> NS:
    """Load a YAML config file and return as a SimpleNamespace."""
    try:
        with open(path, encoding="utf-8") as f:
            raw_cfg = yaml.safe_load(f) or {}
            if not isinstance(raw_cfg, dict):
                raise ValueError("Top-level YAML must be a mapping/object.")
            return NS(**raw_cfg)
    except FileNotFoundError:
        print(f"Config file not found: {path}", file=sys.stderr)
        sys.exit(2)
    except Exception as exc:
        print(f"Failed to load config {path}: {exc}", file=sys.stderr)
        sys.exit(2)

looks_like_secret_key(name)

Check if a parameter name looks like it contains sensitive data.

Source code in src/ursa/util/plan_execute_utils.py
165
166
167
168
def looks_like_secret_key(name: str) -> bool:
    """Check if a parameter name looks like it contains sensitive data."""
    n = name.lower()
    return any(s in n for s in _SECRET_KEY_SUBSTRS)

mask_secret(value, keep_start=6, keep_end=4)

Mask a secret-like string, keeping only the beginning and end. Example: sk-proj-abc123456789xyz -> sk-proj-...9xyz

Source code in src/ursa/util/plan_execute_utils.py
171
172
173
174
175
176
177
178
179
180
def mask_secret(value: str, keep_start: int = 6, keep_end: int = 4) -> str:
    """
    Mask a secret-like string, keeping only the beginning and end.
    Example: sk-proj-abc123456789xyz -> sk-proj-...9xyz
    """
    if not isinstance(value, str):
        return value
    if len(value) <= keep_start + keep_end + 3:
        return "..."
    return f"{value[:keep_start]}...{value[-keep_end:]}"

print_llm_init_banner(agent_name, provider, model_name, provider_extra, llm_kwargs, model_obj=None)

Print a Rich panel showing LLM initialization details.

Source code in src/ursa/util/plan_execute_utils.py
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def print_llm_init_banner(
    agent_name: str | None,
    provider: str,
    model_name: str,
    provider_extra: dict,
    llm_kwargs: dict,
    model_obj=None,
) -> None:
    """Print a Rich panel showing LLM initialization details."""
    who = agent_name or "llm"

    safe_provider_extra = sanitize_for_logging(provider_extra or {})
    safe_llm_kwargs = sanitize_for_logging(llm_kwargs or {})

    console.print(
        Panel.fit(
            Text.from_markup(
                f"[bold cyan]LLM init ({who})[/]\n"
                f"[bold]provider[/]: {provider}\n"
                f"[bold]model[/]: {model_name}\n\n"
                f"[bold]provider kwargs[/]: {json.dumps(safe_provider_extra, indent=2)}\n\n"
                f"[bold]llm kwargs (merged)[/]: {json.dumps(safe_llm_kwargs, indent=2)}"
            ),
            border_style="cyan",
        )
    )

    # Best-effort readback from the LangChain model object
    if model_obj is None:
        return

    readback = {}
    for attr in (
        "model_name",
        "model",
        "reasoning",
        "temperature",
        "max_completion_tokens",
        "max_tokens",
    ):
        if hasattr(model_obj, attr):
            val = getattr(model_obj, attr, None)
            if val is not None:
                readback[attr] = val

    for attr in ("model_kwargs", "kwargs"):
        if hasattr(model_obj, attr):
            val = getattr(model_obj, attr, {})
            if isinstance(val, dict) and val:
                readback[attr] = val

    if readback:
        safe_readback = sanitize_for_logging(readback)
        console.print(
            Panel.fit(
                Text.from_markup(
                    f"[dim]Model object readback:[/]\n{json.dumps(safe_readback, indent=2)}"
                ),
                border_style="dim",
            )
        )

    # Attempt a minimal test call
    effort = None
    try:
        from langchain_core.messages import HumanMessage as _HM

        effort = model_obj.invoke([_HM(content="test")])
    except Exception:
        pass

    if effort:
        console.print("[dim]✓ Test invocation succeeded[/dim]")

resolve_llm_kwargs_for_agent(models_cfg, agent_name)

Given the YAML models: dict, compute merged kwargs for init_chat_model(...) for a specific agent ('planner' or 'executor').

Merge order (later wins): 1) {} (empty) 2) models.defaults.params (optional) 3) models.profiles[defaults.profile] (optional) 4) models.agents[agent_name].profile (optional; merges that profile on top) 5) models.agents[agent_name].params (optional)

Source code in src/ursa/util/plan_execute_utils.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def resolve_llm_kwargs_for_agent(
    models_cfg: dict | None, agent_name: str | None
) -> dict:
    """
    Given the YAML `models:` dict, compute merged kwargs for init_chat_model(...)
    for a specific agent ('planner' or 'executor').

    Merge order (later wins):
      1) {} (empty)
      2) models.defaults.params (optional)
      3) models.profiles[defaults.profile] (optional)
      4) models.agents[agent_name].profile (optional; merges that profile on top)
      5) models.agents[agent_name].params (optional)
    """
    models_cfg = models_cfg or {}
    profiles = models_cfg.get("profiles") or {}
    defaults = models_cfg.get("defaults") or {}
    agents = models_cfg.get("agents") or {}

    # Start with global defaults
    merged = {}
    merged = deep_merge_dicts(merged, defaults.get("params") or {})

    # Apply default profile
    default_profile_name = defaults.get("profile")
    if default_profile_name and default_profile_name in profiles:
        merged = deep_merge_dicts(merged, profiles[default_profile_name])

    # Apply agent-specific profile + params
    if agent_name and isinstance(agents, dict) and agent_name in agents:
        agent_cfg = agents[agent_name]
        agent_profile = agent_cfg.get("profile")
        if agent_profile and agent_profile in profiles:
            merged = deep_merge_dicts(merged, profiles[agent_profile])
        merged = deep_merge_dicts(merged, agent_cfg.get("params") or {})

    return merged

resolve_model_choice(model_choice, models_cfg)

Accepts strings like 'openai:gpt-5.2' or 'my_endpoint:openai/gpt-oss-120b'. Looks up per-provider settings from cfg.models.providers.

Returns: (model_provider, pure_model, provider_extra_kwargs_for_init)

Source code in src/ursa/util/plan_execute_utils.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def resolve_model_choice(model_choice: str, models_cfg: dict):
    """
    Accepts strings like 'openai:gpt-5.2' or 'my_endpoint:openai/gpt-oss-120b'.
    Looks up per-provider settings from cfg.models.providers.

    Returns: (model_provider, pure_model, provider_extra_kwargs_for_init)
    """
    if ":" in model_choice:
        alias, pure_model = model_choice.split(":", 1)
    else:
        alias, pure_model = model_choice, model_choice

    providers = (models_cfg or {}).get("providers", {})
    prov = providers.get(alias, {})

    # Which LangChain integration to use (e.g. "openai", "mistral", etc.)
    model_provider = prov.get("model_provider", alias)

    # auth: prefer env var; optionally load via function if configured
    api_key = None
    if prov.get("api_key_env"):
        api_key = os.getenv(prov["api_key_env"])
    if not api_key and prov.get("token_loader"):
        # Dynamic token loading (omitted for brevity; can import if needed)
        pass

    provider_extra = {}
    if prov.get("base_url"):
        provider_extra["base_url"] = prov["base_url"]
    if api_key:
        provider_extra["api_key"] = api_key

    return model_provider, pure_model, provider_extra

sanitize_for_logging(obj)

Recursively sanitize secrets from config objects for safe logging.

Source code in src/ursa/util/plan_execute_utils.py
183
184
185
186
187
188
189
190
191
192
193
194
195
def sanitize_for_logging(obj: Any) -> Any:
    """Recursively sanitize secrets from config objects for safe logging."""
    if isinstance(obj, dict):
        out = {}
        for k, v in obj.items():
            if looks_like_secret_key(str(k)):
                out[k] = mask_secret(v) if isinstance(v, str) else "..."
            else:
                out[k] = sanitize_for_logging(v)
        return out
    if isinstance(obj, list):
        return [sanitize_for_logging(v) for v in obj]
    return obj

save_json_file(path, payload, *, indent=2, ensure_parent=True)

Write JSON payload to disk with optional parent directory creation.

Source code in src/ursa/util/plan_execute_utils.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def save_json_file(
    path: str | Path,
    payload: Any,
    *,
    indent: int = 2,
    ensure_parent: bool = True,
) -> None:
    """Write JSON payload to disk with optional parent directory creation."""
    p = Path(path)
    if ensure_parent:
        p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(json.dumps(payload, indent=indent))

setup_llm(model_choice, models_cfg=None, agent_name=None)

Build a LangChain chat model via init_chat_model(...), optionally applying YAML-driven params from models.profiles, models.defaults, models.agents.

Source code in src/ursa/util/plan_execute_utils.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
def setup_llm(
    model_choice: str,
    models_cfg: dict | None = None,
    agent_name: str | None = None,
):
    """
    Build a LangChain chat model via init_chat_model(...), optionally applying
    YAML-driven params from models.profiles, models.defaults, models.agents.
    """
    models_cfg = models_cfg or {}

    provider, pure_model, provider_extra = resolve_model_choice(
        model_choice, models_cfg
    )

    # Hardcoded defaults for backward compatibility
    base_llm_kwargs = {
        "max_completion_tokens": 10000,
        "max_retries": 2,
    }

    # YAML-driven kwargs (safe if absent)
    yaml_llm_kwargs = resolve_llm_kwargs_for_agent(models_cfg, agent_name)

    # Merge: base defaults < YAML overrides
    llm_kwargs = deep_merge_dicts(base_llm_kwargs, yaml_llm_kwargs)

    # Initialize
    model = init_chat_model(
        model=pure_model,
        model_provider=provider,
        **llm_kwargs,
        **(provider_extra or {}),
    )

    # Print confirmation
    print_llm_init_banner(
        agent_name=agent_name,
        provider=provider,
        model_name=pure_model,
        provider_extra=provider_extra,
        llm_kwargs=llm_kwargs,
        model_obj=model,
    )

    return model

setup_workspace(user_specified_workspace, project='run', model_name='openai:gpt-5-mini')

Set up a workspace directory for a plan/execute run. Returns the workspace path as a string.

Source code in src/ursa/util/plan_execute_utils.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def setup_workspace(
    user_specified_workspace: str | None,
    project: str = "run",
    model_name: str = "openai:gpt-5-mini",
) -> str:
    """
    Set up a workspace directory for a plan/execute run.
    Returns the workspace path as a string.
    """
    if user_specified_workspace is None:
        workspace = generate_workspace_name(project)
    else:
        workspace = user_specified_workspace

    Path(workspace).mkdir(parents=True, exist_ok=True)

    # Choose a fun emoji based on the model family
    if model_name.startswith("openai"):
        model_emoji = "🤖"
    elif "llama" in model_name.lower():
        model_emoji = "🦙"
    else:
        model_emoji = "🧠"

    # Print the panel with model info
    console.print(
        Panel.fit(
            f":rocket:  [bold bright_blue]{workspace}[/bold bright_blue]  :rocket:\n"
            f"{model_emoji}  [bold cyan]{model_name}[/bold cyan]",
            title="[bold green]ACTIVE WORKSPACE[/bold green]",
            border_style="bright_magenta",
            padding=(1, 4),
        )
    )

    return workspace

snapshot_sqlite_db(src_path, dst_path)

Make a consistent copy of the SQLite database at src_path into dst_path, using the sqlite3 backup API. Safe with WAL; no need to copy -wal/-shm.

Source code in src/ursa/util/plan_execute_utils.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
def snapshot_sqlite_db(src_path: Path, dst_path: Path) -> None:
    """
    Make a consistent copy of the SQLite database at src_path into dst_path,
    using the sqlite3 backup API. Safe with WAL; no need to copy -wal/-shm.
    """
    if not src_path.exists():
        raise FileNotFoundError(f"Source database not found: {src_path}")

    dst_path.parent.mkdir(parents=True, exist_ok=True)
    src_uri = f"file:{Path(src_path).resolve().as_posix()}?mode=ro"
    src = dst = None
    try:
        src = sqlite3.connect(src_uri, uri=True)
        dst = sqlite3.connect(str(dst_path))
        with dst:
            src.backup(dst)
    finally:
        try:
            if src:
                src.close()
        except Exception:
            pass
        try:
            if dst:
                dst.close()
        except Exception:
            pass

timed_input_with_countdown(prompt, timeout)

Read a line with a per-second countdown. Returns: - the user's input (str) if provided, - None if timeout expires, - None if non-interactive or timeout<=0.

Source code in src/ursa/util/plan_execute_utils.py
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
def timed_input_with_countdown(prompt: str, timeout: int) -> str | None:
    """
    Read a line with a per-second countdown. Returns:
      - the user's input (str) if provided,
      - None if timeout expires,
      - None if non-interactive or timeout<=0.
    """
    try:
        is_tty = sys.stdin.isatty()
    except Exception:
        is_tty = False

    if not is_tty:
        # Non-interactive: default immediately
        return None
    if timeout <= 0:
        # Timeout disabled: default immediately
        return None

    deadline = time.time() + timeout
    print(prompt, end="", flush=True)

    try:
        while True:
            remaining = int(deadline - time.time())
            if remaining <= 0:
                print()
                return None

            # Poll stdin with a 1-second timeout
            ready, _, _ = select.select([sys.stdin], [], [], 1.0)
            if ready:
                line = sys.stdin.readline()
                return line.rstrip("\n") if line else None

            # Update countdown display (clear to EOL to avoid ghost text)
            print(f"\r{prompt}({remaining}s) \x1b[K", end="", flush=True)

    except Exception:
        print()
        return None

plan_renderer

render_plan_steps_rich(plan_steps, highlight_index=None)

Pretty table for a list of plan steps (strings or dicts), with an optional highlighted row.

Source code in src/ursa/util/plan_renderer.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def render_plan_steps_rich(plan_steps, highlight_index: int | None = None):
    """Pretty table for a list of plan steps (strings or dicts), with an optional highlighted row."""

    console = get_console()

    if not plan_steps:
        return

    table = Table(
        title="Planned Steps",
        box=box.ROUNDED,
        show_lines=False,
        header_style="bold magenta",
        expand=True,
        row_styles=None,  # we'll control per-row styles manually
    )
    table.add_column("#", style="bold cyan", no_wrap=True)
    table.add_column("Name", style="bold", overflow="fold")
    table.add_column("Description", overflow="fold")
    table.add_column("Outputs", overflow="fold")
    table.add_column("Criteria", overflow="fold")
    table.add_column("Code?", justify="center", no_wrap=True)

    def bullets(items):
        if not items:
            return ""
        return "\n".join(f"• {x}" for x in items)

    def code_badge(needs_code: bool):
        return Text.from_markup(
            ":hammer_and_wrench: [bold green]Yes[/]"
            if needs_code
            else "[bold red]No[/]"
        )

    for i, step in enumerate(plan_steps, 1):
        # build cells
        if isinstance(step, PlanStep):
            name = step.name
            desc = step.description
            outs = bullets(step.expected_outputs)
            crit = bullets(step.success_criteria)
            needs_code = bool(step.requires_code)
        elif isinstance(step, dict):
            name = step.get("name", "No Name")
            desc = step.get("description", "No Description")
            outs = bullets(step.get("expected_outputs", "None"))
            crit = bullets(step.get("success_criteria", "None listed"))
            needs_code = bool(step.get("requires_code", False))
        else:
            name, desc, outs, crit, needs_code = (
                f"Step {i}",
                str(step),
                "",
                "",
                False,
            )

        # style logic
        row_style = None
        idx0 = i - 1
        step_label = str(i)

        if highlight_index is not None:
            if idx0 < highlight_index:
                row_style = "dim"
            elif idx0 == highlight_index:
                row_style = "bold white on grey50"  # light gray
                # row_style = "bold black on bright_green"
                step_label = f"▶ {i}"  # pointer on current row

        table.add_row(
            step_label,
            str(name),
            str(desc),
            outs,
            crit,
            code_badge(needs_code),
            style=row_style,
        )

    console.print(table)

traced

types

AsciiStr = Annotated[str, StringConstraints(strip_whitespace=True, strict=True, pattern='^[\\x20-\\x7E\\t\\n\\r\\f\\v]+$')] module-attribute

Limit strings to "text" ASCII characters (letters, digits, symbols, whitespace)