Skip to content

util

Checkpointer

Source code in src/ursa/util/__init__.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Checkpointer:
    @classmethod
    def from_workspace(
        cls,
        workspace: Path,
        db_dir: str = "db",
        db_name: str = "checkpointer.db",
    ) -> SqliteSaver:
        (db_path := workspace / db_dir).mkdir(parents=True, exist_ok=True)
        conn = sqlite3.connect(str(db_path / db_name), check_same_thread=False)
        return SqliteSaver(conn)

    @classmethod
    def from_path(cls, db_path: Path) -> SqliteSaver:
        """Make checkpointer sqlite db.

        Args
        ====
        * db_path: The path to the SQLite database file (e.g. ./checkpoint.db) to be created.
        """

        db_path.parent.mkdir(parents=True, exist_ok=True)
        conn = sqlite3.connect(str(db_path), check_same_thread=False)
        return SqliteSaver(conn)

from_path(db_path) classmethod

Make checkpointer sqlite db.

Args

  • db_path: The path to the SQLite database file (e.g. ./checkpoint.db) to be created.
Source code in src/ursa/util/__init__.py
19
20
21
22
23
24
25
26
27
28
29
30
@classmethod
def from_path(cls, db_path: Path) -> SqliteSaver:
    """Make checkpointer sqlite db.

    Args
    ====
    * db_path: The path to the SQLite database file (e.g. ./checkpoint.db) to be created.
    """

    db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(str(db_path), check_same_thread=False)
    return SqliteSaver(conn)

diff_renderer

DiffRenderer

Renderable diff—console.print(DiffRenderer(...))

Source code in src/ursa/util/diff_renderer.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class DiffRenderer:
    """Renderable diff—`console.print(DiffRenderer(...))`"""

    def __init__(self, content: str, updated: str, filename: str):
        # total lines in each version
        self._old_total = len(content.splitlines())
        self._new_total = len(updated.splitlines())

        # number of digits in the largest count
        self._num_width = len(str(max(self._old_total, self._new_total))) + 2

        # get the diff
        self._diff_lines = list(
            difflib.unified_diff(
                content.splitlines(),
                updated.splitlines(),
                fromfile=f"{filename} (original)",
                tofile=f"{filename} (modified)",
                lineterm="",
            )
        )

        # get syntax style
        try:
            self._lexer_name = Syntax.guess_lexer(filename, updated)
        except Exception:
            self._lexer_name = "text"

    def __rich_console__(
        self, console: Console, opts: ConsoleOptions
    ) -> RenderResult:
        old_line = new_line = None
        width = console.width

        for raw in self._diff_lines:
            # grab line numbers from hunk header
            if m := _HUNK_RE.match(raw):
                old_line, new_line = map(int, m.groups())
                # build a marker
                n = self._num_width
                tick_col = "." * (n - 1)
                indent_ticks = f" {tick_col} {tick_col}"
                # pad to the indent width
                full_indent = indent_ticks.ljust(2 * n + 3)
                yield Text(
                    f"{full_indent}{raw}".ljust(width), style="white on grey30"
                )
                continue

            # skip header lines
            if raw.startswith(("---", "+++")):
                continue

            # split the line
            if raw.startswith("+"):
                style = _STYLE["add"]
                code = raw[1:]
            elif raw.startswith("-"):
                style = _STYLE["del"]
                code = raw[1:]
            else:
                style = _STYLE["ctx"]
                code = raw[1:] if raw.startswith(" ") else raw

            # compute line numbers
            if raw.startswith("+"):
                old_num, new_num = None, new_line
                new_line += 1
            elif raw.startswith("-"):
                old_num, new_num = old_line, None
                old_line += 1
            else:
                old_num, new_num = old_line, new_line
                old_line += 1
                new_line += 1

            old_str = str(old_num) if old_num is not None else " "
            new_str = str(new_num) if new_num is not None else " "

            # Syntax-highlight the code part
            syntax = Syntax(
                code, self._lexer_name, line_numbers=False, word_wrap=False
            )
            text_code: Text = syntax.highlight(code)
            if text_code.plain.endswith("\n"):
                text_code = text_code[:-1]
            # apply background
            text_code.stylize(style.bg)

            # line numbers + code
            nums = Text(
                f"{old_str:>{self._num_width}}{new_str:>{self._num_width}} ",
                style=f"white {style.bg}",
            )
            diff_mark = Text(style.prefix, style=f"bright_white {style.bg}")
            line_text = nums + diff_mark + text_code

            # pad to console width
            pad_len = width - line_text.cell_len
            if pad_len > 0:
                line_text.append(" " * pad_len, style=style.bg)

            yield line_text

helperFunctions

run_tool_calls(ai_msg, tools)

Parameters:

Name Type Description Default
ai_msg AIMessage

The LLM's AIMessage containing tool calls.

required
tools ToolRegistry | Iterable[Runnable | Callable[..., Any]]

Either a dict {name: tool} or an iterable of tools (must have .name for mapping). Each tool can be a Runnable or a plain callable.

required

Returns:

Name Type Description
out list[BaseMessage]

list[BaseMessage] to feed back to the model

Source code in src/ursa/util/helperFunctions.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def run_tool_calls(
    ai_msg: AIMessage,
    tools: ToolRegistry | Iterable[Runnable | Callable[..., Any]],
) -> list[BaseMessage]:
    """
    Args:
        ai_msg: The LLM's AIMessage containing tool calls.
        tools: Either a dict {name: tool} or an iterable of tools (must have `.name`
               for mapping). Each tool can be a Runnable or a plain callable.

    Returns:
        out: list[BaseMessage] to feed back to the model
    """
    # Build a name->tool map
    if isinstance(tools, dict):
        registry: ToolRegistry = tools  # type: ignore
    else:
        registry = {}
        for t in tools:
            name = getattr(t, "name", None) or getattr(t, "__name__", None)
            if not name:
                raise ValueError(f"Tool {t!r} has no discoverable name.")
            registry[name] = t  # type: ignore

    calls = extract_tool_calls(ai_msg)

    if not calls:
        return []

    out: list[BaseMessage] = []
    for call in calls:
        name = call.get("name")
        args = call.get("args", {}) or {}
        call_id = call.get("id") or f"call_{uuid.uuid4().hex}"

        # 1) the AIMessage that generated the call
        out.append(ai_msg)

        # 2) the ToolMessage with the execution result (or error)
        if name not in registry:
            content = f"ERROR: unknown tool '{name}'."
        else:
            try:
                result = _invoke_tool(registry[name], args)
                content = _stringify_output(result)
            except Exception as e:
                content = f"ERROR: {type(e).__name__}: {e}"

        out.append(
            ToolMessage(content=content, tool_call_id=call_id, name=name)
        )

    return out

logo_generator

generate_logo_sync(*, problem_text, workspace, out_dir, filename=None, model='gpt-image-1', size=None, background='opaque', quality='high', n=1, overwrite=False, style='sticker', allow_text=False, palette=None, mode='logo', aspect='square', style_intensity='overt', console=None, image_model_provider='openai', image_provider_kwargs=None)

Generate an image. Default behavior matches previous versions (logo/sticker). To create a cinematic illustration, set mode='scene' and consider aspect='wide'.

Source code in src/ursa/util/logo_generator.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
def generate_logo_sync(
    *,
    problem_text: str,
    workspace: str,
    out_dir: str | Path,
    filename: str | None = None,
    model: str = "gpt-image-1",
    size: str | None = None,
    background: str = "opaque",
    quality: str = "high",
    n: int = 1,
    overwrite: bool = False,
    style: str = "sticker",
    allow_text: bool = False,
    palette: str | None = None,
    mode: str = "logo",
    aspect: str = "square",
    style_intensity: str = "overt",
    console: Optional[Console] = None,
    image_model_provider: str = "openai",
    image_provider_kwargs: Optional[dict] = None,
) -> Path:
    """
    Generate an image. Default behavior matches previous versions (logo/sticker).
    To create a cinematic illustration, set mode='scene' and consider aspect='wide'.
    """
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    prompt, style_slug = _craft_logo_prompt(
        problem_text,
        workspace,
        style=style,
        allow_text=allow_text,
        palette=palette,
        mode=mode,
        style_intensity=style_intensity,
    )

    # Pretty console output
    extra_title = f"[bold magenta]mode: {mode}[/bold magenta] [dim]•[/dim] aspect: {aspect}"
    _render_prompt_panel(
        console=console,
        style_slug=style_slug,
        workspace=workspace,
        prompt=prompt,
        extra_title=extra_title,
    )

    main_path, alt_paths = _compose_filenames(out_dir, style_slug, filename, n)
    if main_path.exists() and not overwrite:
        return main_path

    # this is how we'll pass through a vision model and provider/url/endpoint
    client_kwargs = {}
    if image_provider_kwargs:
        # Only pass through safe/known kwargs
        for k in ("api_key", "base_url", "organization"):
            if k in image_provider_kwargs and image_provider_kwargs[k]:
                client_kwargs[k] = image_provider_kwargs[k]
    client = OpenAI(**client_kwargs)

    final_size = _normalize_size(size, aspect, mode)
    # Scenes tend to look odd with transparent backgrounds; force opaque.
    final_background = "opaque" if mode == "scene" else background

    kwargs = dict(
        model=model,
        prompt=prompt,
        size=final_size,
        n=n,
        quality=quality,
        background=final_background,
    )
    try:
        resp = client.images.generate(**kwargs)
    except Exception:
        # Some models ignore/forbid background=; retry without it
        kwargs.pop("background", None)
        resp = client.images.generate(**kwargs)

    main_path.write_bytes(base64.b64decode(resp.data[0].b64_json))
    for i, item in enumerate(resp.data[1:], start=0):
        if i < len(alt_paths):
            alt_paths[i].write_bytes(base64.b64decode(item.b64_json))
    return main_path

memory_logger

AgentMemory

Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

Parameters

path : str | Path | None Where to keep the on-disk Chroma DB. If None, a folder called agent_memory_db is created in the package’s base directory. collection_name : str Name of the Chroma collection. embedding_model : | None the embedding model

Notes
  • Requires langchain-chroma, and chromadb.
Source code in src/ursa/util/memory_logger.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class AgentMemory:
    """
    Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

    Parameters
    ----------
    path : str | Path | None
        Where to keep the on-disk Chroma DB.  If *None*, a folder called
        ``agent_memory_db`` is created in the package’s base directory.
    collection_name : str
        Name of the Chroma collection.
    embedding_model : <TODO> | None
        the embedding model

    Notes
    -----
    * Requires `langchain-chroma`, and `chromadb`.
    """

    @classmethod
    def get_db_path(cls, path: Optional[str | Path]) -> Path:
        match path:
            case None:
                return Path.home() / ".cache" / "ursa" / "rag" / "db"
            case str():
                return Path(path)
            case Path():
                return path
            case _:
                raise TypeError(
                    f"Type of path is `{type(path)}` "
                    "but `Optional[str | Path]` was expected."
                )

    def __init__(
        self,
        embedding_model,
        path: Optional[str | Path] = None,
        collection_name: str = "agent_memory",
    ) -> None:
        self.path = self.get_db_path(path)
        self.collection_name = collection_name
        self.path.mkdir(parents=True, exist_ok=True)
        self.embeddings = embedding_model

        # If a DB already exists, load it; otherwise defer creation until `build_index`.
        self.vectorstore: Optional[Chroma] = None
        if any(self.path.iterdir()):
            self.vectorstore = Chroma(
                collection_name=self.collection_name,
                embedding_function=self.embeddings,
                persist_directory=str(self.path),
            )

    # --------------------------------------------------------------------- #
    # ❶ Build & index a brand-new database                                   #
    # --------------------------------------------------------------------- #
    def build_index(
        self,
        chunks: Sequence[str],
        metadatas: Optional[Sequence[dict[str, Any]]] = None,
    ) -> None:
        """
        Create a fresh vector store from ``chunks``.  Existing data (if any)
        are overwritten.

        Parameters
        ----------
        chunks : Sequence[str]
            Text snippets (already chunked) to embed.
        metadatas : Sequence[dict] | None
            Optional metadata dict for each chunk, same length as ``chunks``.
        """
        docs = [
            Document(
                page_content=text, metadata=metadatas[i] if metadatas else {}
            )
            for i, text in enumerate(chunks)
        ]

        # Create (or overwrite) the collection
        self.vectorstore = Chroma.from_documents(
            documents=docs,
            embedding=self.embeddings,
            collection_name=self.collection_name,
            persist_directory=str(self.path),
        )

    # --------------------------------------------------------------------- #
    # ❷ Add new chunks and re-index                                          #
    # --------------------------------------------------------------------- #
    def add_memories(
        self,
        new_chunks: Sequence[str],
        metadatas: Optional[Sequence[dict[str, Any]]] = None,
    ) -> None:
        """
        Append new text chunks to the existing store (must call `build_index`
        first if the DB is empty).

        Raises
        ------
        RuntimeError
            If the vector store is not yet initialised.
        """
        if self.vectorstore is None:
            self.build_index(new_chunks, metadatas)
            print("----- Vector store initialised -----")

        docs = []
        for i, text in enumerate(new_chunks):
            if len(text) > 0:  # only add non-empty documents
                docs.append(
                    Document(
                        page_content=text,
                        metadata=metadatas[i] if metadatas else {},
                    )
                )
        self.vectorstore.add_documents(docs)

    # --------------------------------------------------------------------- #
    # ❸ Retrieve relevant chunks (RAG query)                                 #
    # --------------------------------------------------------------------- #
    def retrieve(
        self,
        query: str,
        k: int = 4,
        with_scores: bool = False,
        **search_kwargs,
    ):
        """
        Return the *k* most similar chunks for `query`.

        Parameters
        ----------
        query : str
            Natural-language question or statement.
        k : int
            How many results to return.
        with_scores : bool
            If True, also return similarity scores.
        **search_kwargs
            Extra kwargs forwarded to Chroma’s ``similarity_search*`` helpers.

        Returns
        -------
        list[Document] | list[tuple[Document, float]]
        """
        if self.vectorstore is None:
            return ["None"]

        if with_scores:
            return self.vectorstore.similarity_search_with_score(
                query, k=k, **search_kwargs
            )
        return self.vectorstore.similarity_search(query, k=k, **search_kwargs)

add_memories(new_chunks, metadatas=None)

Append new text chunks to the existing store (must call build_index first if the DB is empty).

Raises

RuntimeError If the vector store is not yet initialised.

Source code in src/ursa/util/memory_logger.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def add_memories(
    self,
    new_chunks: Sequence[str],
    metadatas: Optional[Sequence[dict[str, Any]]] = None,
) -> None:
    """
    Append new text chunks to the existing store (must call `build_index`
    first if the DB is empty).

    Raises
    ------
    RuntimeError
        If the vector store is not yet initialised.
    """
    if self.vectorstore is None:
        self.build_index(new_chunks, metadatas)
        print("----- Vector store initialised -----")

    docs = []
    for i, text in enumerate(new_chunks):
        if len(text) > 0:  # only add non-empty documents
            docs.append(
                Document(
                    page_content=text,
                    metadata=metadatas[i] if metadatas else {},
                )
            )
    self.vectorstore.add_documents(docs)

build_index(chunks, metadatas=None)

Create a fresh vector store from chunks. Existing data (if any) are overwritten.

Parameters

chunks : Sequence[str] Text snippets (already chunked) to embed. metadatas : Sequence[dict] | None Optional metadata dict for each chunk, same length as chunks.

Source code in src/ursa/util/memory_logger.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def build_index(
    self,
    chunks: Sequence[str],
    metadatas: Optional[Sequence[dict[str, Any]]] = None,
) -> None:
    """
    Create a fresh vector store from ``chunks``.  Existing data (if any)
    are overwritten.

    Parameters
    ----------
    chunks : Sequence[str]
        Text snippets (already chunked) to embed.
    metadatas : Sequence[dict] | None
        Optional metadata dict for each chunk, same length as ``chunks``.
    """
    docs = [
        Document(
            page_content=text, metadata=metadatas[i] if metadatas else {}
        )
        for i, text in enumerate(chunks)
    ]

    # Create (or overwrite) the collection
    self.vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=self.embeddings,
        collection_name=self.collection_name,
        persist_directory=str(self.path),
    )

retrieve(query, k=4, with_scores=False, **search_kwargs)

Return the k most similar chunks for query.

Parameters

query : str Natural-language question or statement. k : int How many results to return. with_scores : bool If True, also return similarity scores. **search_kwargs Extra kwargs forwarded to Chroma’s similarity_search* helpers.

Returns

list[Document] | list[tuple[Document, float]]

Source code in src/ursa/util/memory_logger.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def retrieve(
    self,
    query: str,
    k: int = 4,
    with_scores: bool = False,
    **search_kwargs,
):
    """
    Return the *k* most similar chunks for `query`.

    Parameters
    ----------
    query : str
        Natural-language question or statement.
    k : int
        How many results to return.
    with_scores : bool
        If True, also return similarity scores.
    **search_kwargs
        Extra kwargs forwarded to Chroma’s ``similarity_search*`` helpers.

    Returns
    -------
    list[Document] | list[tuple[Document, float]]
    """
    if self.vectorstore is None:
        return ["None"]

    if with_scores:
        return self.vectorstore.similarity_search_with_score(
            query, k=k, **search_kwargs
        )
    return self.vectorstore.similarity_search(query, k=k, **search_kwargs)

delete_database(path=None)

Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

Parameters

path : str | Path | None Where the on-disk Chroma DB is for deleting. If None, a folder called agent_memory_db is created in the package’s base directory.

Source code in src/ursa/util/memory_logger.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def delete_database(path: Optional[str | Path] = None):
    """
    Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

    Parameters
    ----------
    path : str | Path | None
        Where the on-disk Chroma DB is for deleting.  If *None*, a folder called
        ``agent_memory_db`` is created in the package’s base directory.
    """
    db_path = AgentMemory.get_db_path(path)
    if os.path.exists(db_path):
        shutil.rmtree(db_path)
        print(f"Database: {db_path} has been deleted.")
    else:
        print("No database found to delete.")

parse

extract_json(text)

Extract a JSON object or array from text that might contain markdown or other content.

The function attempts three strategies
  1. Extract JSON from a markdown code block labeled as JSON.
  2. Extract JSON from any markdown code block.
  3. Use bracket matching to extract a JSON substring starting with '{' or '['.

Returns:

Type Description
list[dict]

A Python object parsed from the JSON string (dict or list).

Raises:

Type Description
ValueError

If no valid JSON is found.

Source code in src/ursa/util/parse.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def extract_json(text: str) -> list[dict]:
    """
    Extract a JSON object or array from text that might contain markdown or other content.

    The function attempts three strategies:
        1. Extract JSON from a markdown code block labeled as JSON.
        2. Extract JSON from any markdown code block.
        3. Use bracket matching to extract a JSON substring starting with '{' or '['.

    Returns:
        A Python object parsed from the JSON string (dict or list).

    Raises:
        ValueError: If no valid JSON is found.
    """
    # Approach 1: Look for a markdown code block specifically labeled as JSON.
    labeled_block = re.search(
        r"```json\s*([\[{].*?[\]}])\s*```", text, re.DOTALL
    )
    if labeled_block:
        json_str = labeled_block.group(1).strip()
        try:
            return json.loads(json_str)
        except json.JSONDecodeError:
            # Fall back to the next approach if parsing fails.
            pass

    # Approach 2: Look for any code block delimited by triple backticks.
    generic_block = re.search(r"```(.*?)```", text, re.DOTALL)
    if generic_block:
        json_str = generic_block.group(1).strip()
        if json_str.startswith("{") or json_str.startswith("["):
            try:
                return json.loads(json_str)
            except json.JSONDecodeError:
                pass

    # Approach 3: Attempt to extract JSON using bracket matching.
    # Find the first occurrence of either '{' or '['.
    first_obj = text.find("{")
    first_arr = text.find("[")
    if first_obj == -1 and first_arr == -1:
        raise ValueError("No JSON object or array found in the text.")

    # Determine which bracket comes first.
    if first_obj == -1:
        start = first_arr
        open_bracket = "["
        close_bracket = "]"
    elif first_arr == -1:
        start = first_obj
        open_bracket = "{"
        close_bracket = "}"
    else:
        if first_obj < first_arr:
            start = first_obj
            open_bracket = "{"
            close_bracket = "}"
        else:
            start = first_arr
            open_bracket = "["
            close_bracket = "]"

    # Bracket matching: find the matching closing bracket.
    depth = 0
    end = None
    for i in range(start, len(text)):
        if text[i] == open_bracket:
            depth += 1
        elif text[i] == close_bracket:
            depth -= 1
            if depth == 0:
                end = i
                break

    if end is None:
        raise ValueError(
            "Could not find matching closing bracket for JSON content."
        )

    json_str = text[start : end + 1]
    try:
        return json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError("Extracted content is not valid JSON.") from e

extract_main_text_only(html, *, max_chars=250000)

Returns plain text with navigation/ads/scripts removed. Prefers trafilatura -> jusText -> BS4 paragraphs.

Source code in src/ursa/util/parse.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
def extract_main_text_only(html: str, *, max_chars: int = 250_000) -> str:
    """
    Returns plain text with navigation/ads/scripts removed.
    Prefers trafilatura -> jusText -> BS4 paragraphs.
    """
    # 1) Trafilatura
    # You can tune config: with_metadata, include_comments, include_images, favor_recall, etc.
    cfg = trafilatura.settings.use_config()
    cfg.set("DEFAULT", "include_comments", "false")
    cfg.set("DEFAULT", "include_tables", "false")
    cfg.set("DEFAULT", "favor_recall", "false")  # be stricter; less noise
    try:
        # If you fetched HTML already, use extract() on string; otherwise, fetch_url(url)
        txt = trafilatura.extract(
            html,
            config=cfg,
            include_comments=False,
            include_tables=False,
            favor_recall=False,
        )
        if txt and txt.strip():
            txt = _normalize_ws(txt)
            txt = _dedupe_lines(txt)
            return txt[:max_chars]
    except Exception:
        pass

    # 2) jusText
    try:
        paragraphs = justext.justext(html, justext.get_stoplist("English"))
        body_paras = [p.text for p in paragraphs if not p.is_boilerplate]
        if body_paras:
            txt = _normalize_ws("\n\n".join(body_paras))
            txt = _dedupe_lines(txt)
            return txt[:max_chars]
    except Exception:
        pass

    # 4) last-resort: BS4 paragraphs/headings only
    from bs4 import BeautifulSoup

    soup = BeautifulSoup(html, "html.parser")
    for tag in soup([
        "script",
        "style",
        "noscript",
        "header",
        "footer",
        "nav",
        "form",
        "aside",
    ]):
        tag.decompose()
    chunks = []
    for el in soup.find_all(["h1", "h2", "h3", "p", "li", "figcaption"]):
        t = el.get_text(" ", strip=True)
        if t:
            chunks.append(t)
    txt = _normalize_ws("\n\n".join(chunks))
    txt = _dedupe_lines(txt)
    return txt[:max_chars]

read_text_file(path)

Reads in a file at a given path into a string

Parameters:

Name Type Description Default
path str

string filename, with path, to read in

required
Source code in src/ursa/util/parse.py
415
416
417
418
419
420
421
422
423
424
def read_text_file(path: str) -> str:
    """
    Reads in a file at a given path into a string

    Args:
        path: string filename, with path, to read in
    """
    with open(path, "r", encoding="utf-8") as file:
        file_contents = file.read()
    return file_contents

resolve_pdf_from_osti_record(rec, *, headers=None, unpaywall_email=None, timeout=25)

Returns (pdf_url, landing_used, note) - pdf_url: direct downloadable PDF URL if found (or a strong candidate) - landing_used: landing page URL we parsed (if any) - note: brief trace of how we found it

Source code in src/ursa/util/parse.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
def resolve_pdf_from_osti_record(
    rec: dict[str, Any],
    *,
    headers: Optional[dict[str, str]] = None,
    unpaywall_email: Optional[str] = None,
    timeout: int = 25,
) -> tuple[Optional[str], Optional[str], str]:
    """
    Returns (pdf_url, landing_used, note)
      - pdf_url: direct downloadable PDF URL if found (or a strong candidate)
      - landing_used: landing page URL we parsed (if any)
      - note: brief trace of how we found it
    """
    headers = headers or {"User-Agent": "Mozilla/5.0"}
    note_parts: list[str] = []

    links = rec.get("links", []) or []
    # doi = rec.get("doi")

    # 1) Try 'fulltext' first (OSTI purl)
    fulltext = None
    for link in links:
        if link.get("rel") == "fulltext":
            fulltext = link.get("href")
            break

    if fulltext:
        note_parts.append("Tried links[fulltext] purl")
        try:
            # Follow redirects; stream to peek headers without loading whole body
            r = requests.get(
                fulltext,
                headers=headers,
                timeout=timeout,
                allow_redirects=True,
                stream=True,
            )
            r.raise_for_status()

            if _is_pdf_response(r):
                note_parts.append("fulltext resolved directly to PDF")
                return (r.url, None, " | ".join(note_parts))

            # Not a PDF: parse page HTML for meta or obvious PDF anchors
            # (If server sent binary but CT lied, _is_pdf_response would have caught via CD or ext)
            r.close()
            soup = _get_soup(fulltext, timeout=timeout, headers=headers)
            candidate = _find_pdf_on_landing(soup, fulltext)
            if candidate:
                note_parts.append(
                    "found PDF via meta/anchor on fulltext landing"
                )
                return (candidate, fulltext, " | ".join(note_parts))
        except Exception as e:
            note_parts.append(f"fulltext failed: {e}")

    # 2) Try DOE PAGES landing (citation_doe_pages)
    doe_pages = None
    for link in links:
        if link.get("rel") == "citation_doe_pages":
            doe_pages = link.get("href")
            break

    if doe_pages:
        note_parts.append("Tried links[citation_doe_pages] landing")
        try:
            soup = _get_soup(doe_pages, timeout=timeout, headers=headers)
            candidate = _find_pdf_on_landing(soup, doe_pages)
            if candidate:
                # Candidate may itself be a landing—check if it serves PDF
                try:
                    r2 = requests.get(
                        candidate,
                        headers=headers,
                        timeout=timeout,
                        allow_redirects=True,
                        stream=True,
                    )
                    r2.raise_for_status()
                    if _is_pdf_response(r2):
                        note_parts.append("citation_doe_pages → direct PDF")
                        return (r2.url, doe_pages, " | ".join(note_parts))
                    r2.close()
                except Exception:
                    pass
                # If not clearly PDF, still return as a candidate (agent will fetch & parse)
                note_parts.append(
                    "citation_doe_pages → PDF-like candidate (not confirmed by headers)"
                )
                return (candidate, doe_pages, " | ".join(note_parts))
        except Exception as e:
            note_parts.append(f"citation_doe_pages failed: {e}")

    # # 3) Optional: DOI → Unpaywall OA
    # if doi and unpaywall_email:
    #     note_parts.append("Tried Unpaywall via DOI")
    #     pdf_from_ua = _resolve_pdf_via_unpaywall(doi, unpaywall_email)
    #     if pdf_from_ua:
    #         # May be direct PDF or landing; the caller will validate headers during download
    #         note_parts.append("Unpaywall returned candidate")
    #         return (pdf_from_ua, None, " | ".join(note_parts))

    # 4) Give up
    note_parts.append("No PDF found")
    return (None, None, " | ".join(note_parts))

plan_renderer

render_plan_steps_rich(plan_steps, highlight_index=None)

Pretty table for a list of plan steps (strings or dicts), with an optional highlighted row.

Source code in src/ursa/util/plan_renderer.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def render_plan_steps_rich(plan_steps, highlight_index: int | None = None):
    """Pretty table for a list of plan steps (strings or dicts), with an optional highlighted row."""

    console = get_console()

    if not plan_steps:
        return

    table = Table(
        title="Planned Steps",
        box=box.ROUNDED,
        show_lines=False,
        header_style="bold magenta",
        expand=True,
        row_styles=None,  # we'll control per-row styles manually
    )
    table.add_column("#", style="bold cyan", no_wrap=True)
    table.add_column("Name", style="bold", overflow="fold")
    table.add_column("Description", overflow="fold")
    table.add_column("Outputs", overflow="fold")
    table.add_column("Criteria", overflow="fold")
    table.add_column("Code?", justify="center", no_wrap=True)

    def bullets(items):
        if not items:
            return ""
        return "\n".join(f"• {x}" for x in items)

    def code_badge(needs_code: bool):
        return Text.from_markup(
            ":hammer_and_wrench: [bold green]Yes[/]"
            if needs_code
            else "[bold red]No[/]"
        )

    for i, step in enumerate(plan_steps, 1):
        # build cells
        if isinstance(step, dict):
            name = (
                step.get("name")
                or step.get("title")
                or step.get("id")
                or f"Step {i}"
            )
            desc = step.get("description") or ""
            outs = bullets(
                step.get("expected_outputs") or step.get("artifacts")
            )
            crit = bullets(step.get("success_criteria"))
            needs_code = bool(step.get("requires_code"))
        else:
            name, desc, outs, crit, needs_code = (
                f"Step {i}",
                str(step),
                "",
                "",
                False,
            )

        # style logic
        row_style = None
        idx0 = i - 1
        step_label = str(i)

        if highlight_index is not None:
            if idx0 < highlight_index:
                row_style = "dim"
            elif idx0 == highlight_index:
                row_style = "bold white on grey50"  # light gray
                # row_style = "bold black on bright_green"
                step_label = f"▶ {i}"  # pointer on current row

        table.add_row(
            step_label,
            str(name),
            str(desc),
            outs,
            crit,
            code_badge(needs_code),
            style=row_style,
        )

    console.print(table)