Skip to content

util

diff_renderer

DiffRenderer

Renderable diff—console.print(DiffRenderer(...))

Source code in src/ursa/util/diff_renderer.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class DiffRenderer:
    """Renderable diff—`console.print(DiffRenderer(...))`"""

    def __init__(self, content: str, updated: str, filename: str):
        # total lines in each version
        self._old_total = len(content.splitlines())
        self._new_total = len(updated.splitlines())

        # number of digits in the largest count
        self._num_width = len(str(max(self._old_total, self._new_total))) + 2

        # get the diff
        self._diff_lines = list(
            difflib.unified_diff(
                content.splitlines(),
                updated.splitlines(),
                fromfile=f"{filename} (original)",
                tofile=f"{filename} (modified)",
                lineterm="",
            )
        )

        # get syntax style
        try:
            self._lexer_name = Syntax.guess_lexer(filename, updated)
        except Exception:
            self._lexer_name = "text"

    def __rich_console__(
        self, console: Console, opts: ConsoleOptions
    ) -> RenderResult:
        old_line = new_line = None
        width = console.width

        for raw in self._diff_lines:
            # grab line numbers from hunk header
            if m := _HUNK_RE.match(raw):
                old_line, new_line = map(int, m.groups())
                # build a marker
                n = self._num_width
                tick_col = "." * (n - 1)
                indent_ticks = f" {tick_col} {tick_col}"
                # pad to the indent width
                full_indent = indent_ticks.ljust(2 * n + 3)
                yield Text(
                    f"{full_indent}{raw}".ljust(width), style="white on grey30"
                )
                continue

            # skip header lines
            if raw.startswith(("---", "+++")):
                continue

            # split the line
            if raw.startswith("+"):
                style = _STYLE["add"]
                code = raw[1:]
            elif raw.startswith("-"):
                style = _STYLE["del"]
                code = raw[1:]
            else:
                style = _STYLE["ctx"]
                code = raw[1:] if raw.startswith(" ") else raw

            # compute line numbers
            if raw.startswith("+"):
                old_num, new_num = None, new_line
                new_line += 1
            elif raw.startswith("-"):
                old_num, new_num = old_line, None
                old_line += 1
            else:
                old_num, new_num = old_line, new_line
                old_line += 1
                new_line += 1

            old_str = str(old_num) if old_num is not None else " "
            new_str = str(new_num) if new_num is not None else " "

            # Syntax-highlight the code part
            syntax = Syntax(
                code, self._lexer_name, line_numbers=False, word_wrap=False
            )
            text_code: Text = syntax.highlight(code)
            if text_code.plain.endswith("\n"):
                text_code = text_code[:-1]
            # apply background
            text_code.stylize(style.bg)

            # line numbers + code
            nums = Text(
                f"{old_str:>{self._num_width}}{new_str:>{self._num_width}} ",
                style=f"white {style.bg}",
            )
            diff_mark = Text(style.prefix, style=f"bright_white {style.bg}")
            line_text = nums + diff_mark + text_code

            # pad to console width
            pad_len = width - line_text.cell_len
            if pad_len > 0:
                line_text.append(" " * pad_len, style=style.bg)

            yield line_text

helperFunctions

run_tool_calls(ai_msg, tools)

Parameters:

Name Type Description Default
ai_msg AIMessage

The LLM's AIMessage containing tool calls.

required
tools Union[ToolRegistry, Iterable[Union[Runnable, Callable[..., Any]]]]

Either a dict {name: tool} or an iterable of tools (must have .name for mapping). Each tool can be a Runnable or a plain callable.

required

Returns:

Name Type Description
out List[BaseMessage]

list[BaseMessage] to feed back to the model

Source code in src/ursa/util/helperFunctions.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def run_tool_calls(
    ai_msg: AIMessage,
    tools: Union[ToolRegistry, Iterable[Union[Runnable, Callable[..., Any]]]],
) -> List[BaseMessage]:
    """
    Args:
        ai_msg: The LLM's AIMessage containing tool calls.
        tools: Either a dict {name: tool} or an iterable of tools (must have `.name`
               for mapping). Each tool can be a Runnable or a plain callable.

    Returns:
        out: list[BaseMessage] to feed back to the model
    """
    # Build a name->tool map
    if isinstance(tools, dict):
        registry: ToolRegistry = tools  # type: ignore
    else:
        registry = {}
        for t in tools:
            name = getattr(t, "name", None) or getattr(t, "__name__", None)
            if not name:
                raise ValueError(f"Tool {t!r} has no discoverable name.")
            registry[name] = t  # type: ignore

    calls = extract_tool_calls(ai_msg)

    if not calls:
        return []

    out: List[BaseMessage] = []
    for call in calls:
        name = call.get("name")
        args = call.get("args", {}) or {}
        call_id = call.get("id") or f"call_{uuid.uuid4().hex}"

        # 1) the AIMessage that generated the call
        out.append(ai_msg)

        # 2) the ToolMessage with the execution result (or error)
        if name not in registry:
            content = f"ERROR: unknown tool '{name}'."
        else:
            try:
                result = _invoke_tool(registry[name], args)
                content = _stringify_output(result)
            except Exception as e:
                content = f"ERROR: {type(e).__name__}: {e}"

        out.append(
            ToolMessage(content=content, tool_call_id=call_id, name=name)
        )

    return out

memory_logger

AgentMemory

Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

Parameters

path : str | Path | None Where to keep the on-disk Chroma DB. If None, a folder called agent_memory_db is created in the package’s base directory. collection_name : str Name of the Chroma collection. embedding_model : | None the embedding model

Notes
  • Requires langchain-chroma, and chromadb.
Source code in src/ursa/util/memory_logger.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class AgentMemory:
    """
    Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

    Parameters
    ----------
    path : str | Path | None
        Where to keep the on-disk Chroma DB.  If *None*, a folder called
        ``agent_memory_db`` is created in the package’s base directory.
    collection_name : str
        Name of the Chroma collection.
    embedding_model : <TODO> | None
        the embedding model

    Notes
    -----
    * Requires `langchain-chroma`, and `chromadb`.
    """

    @classmethod
    def get_db_path(cls, path: Optional[str | Path]) -> Path:
        match path:
            case None:
                return Path.home() / ".cache" / "ursa" / "rag" / "db"
            case str():
                return Path(path)
            case Path():
                return path
            case _:
                raise TypeError(
                    f"Type of path is `{type(path)}` "
                    "but `Optional[str | Path]` was expected."
                )

    def __init__(
        self,
        embedding_model,
        path: Optional[str | Path] = None,
        collection_name: str = "agent_memory",
    ) -> None:
        self.path = self.get_db_path(path)
        self.collection_name = collection_name
        self.path.mkdir(parents=True, exist_ok=True)
        self.embeddings = embedding_model

        # If a DB already exists, load it; otherwise defer creation until `build_index`.
        self.vectorstore: Optional[Chroma] = None
        if any(self.path.iterdir()):
            self.vectorstore = Chroma(
                collection_name=self.collection_name,
                embedding_function=self.embeddings,
                persist_directory=str(self.path),
            )

    # --------------------------------------------------------------------- #
    # ❶ Build & index a brand-new database                                   #
    # --------------------------------------------------------------------- #
    def build_index(
        self,
        chunks: Sequence[str],
        metadatas: Optional[Sequence[Dict[str, Any]]] = None,
    ) -> None:
        """
        Create a fresh vector store from ``chunks``.  Existing data (if any)
        are overwritten.

        Parameters
        ----------
        chunks : Sequence[str]
            Text snippets (already chunked) to embed.
        metadatas : Sequence[dict] | None
            Optional metadata dict for each chunk, same length as ``chunks``.
        """
        docs = [
            Document(
                page_content=text, metadata=metadatas[i] if metadatas else {}
            )
            for i, text in enumerate(chunks)
        ]

        # Create (or overwrite) the collection
        self.vectorstore = Chroma.from_documents(
            documents=docs,
            embedding=self.embeddings,
            collection_name=self.collection_name,
            persist_directory=str(self.path),
        )

    # --------------------------------------------------------------------- #
    # ❷ Add new chunks and re-index                                          #
    # --------------------------------------------------------------------- #
    def add_memories(
        self,
        new_chunks: Sequence[str],
        metadatas: Optional[Sequence[Dict[str, Any]]] = None,
    ) -> None:
        """
        Append new text chunks to the existing store (must call `build_index`
        first if the DB is empty).

        Raises
        ------
        RuntimeError
            If the vector store is not yet initialised.
        """
        if self.vectorstore is None:
            self.build_index(new_chunks, metadatas)
            print("----- Vector store initialised -----")

        docs = []
        for i, text in enumerate(new_chunks):
            if len(text) > 0:  # only add non-empty documents
                docs.append(
                    Document(
                        page_content=text,
                        metadata=metadatas[i] if metadatas else {},
                    )
                )
        self.vectorstore.add_documents(docs)

    # --------------------------------------------------------------------- #
    # ❸ Retrieve relevant chunks (RAG query)                                 #
    # --------------------------------------------------------------------- #
    def retrieve(
        self,
        query: str,
        k: int = 4,
        with_scores: bool = False,
        **search_kwargs,
    ):
        """
        Return the *k* most similar chunks for `query`.

        Parameters
        ----------
        query : str
            Natural-language question or statement.
        k : int
            How many results to return.
        with_scores : bool
            If True, also return similarity scores.
        **search_kwargs
            Extra kwargs forwarded to Chroma’s ``similarity_search*`` helpers.

        Returns
        -------
        list[Document] | list[tuple[Document, float]]
        """
        if self.vectorstore is None:
            return ["None"]

        if with_scores:
            return self.vectorstore.similarity_search_with_score(
                query, k=k, **search_kwargs
            )
        return self.vectorstore.similarity_search(query, k=k, **search_kwargs)

add_memories(new_chunks, metadatas=None)

Append new text chunks to the existing store (must call build_index first if the DB is empty).

Raises

RuntimeError If the vector store is not yet initialised.

Source code in src/ursa/util/memory_logger.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def add_memories(
    self,
    new_chunks: Sequence[str],
    metadatas: Optional[Sequence[Dict[str, Any]]] = None,
) -> None:
    """
    Append new text chunks to the existing store (must call `build_index`
    first if the DB is empty).

    Raises
    ------
    RuntimeError
        If the vector store is not yet initialised.
    """
    if self.vectorstore is None:
        self.build_index(new_chunks, metadatas)
        print("----- Vector store initialised -----")

    docs = []
    for i, text in enumerate(new_chunks):
        if len(text) > 0:  # only add non-empty documents
            docs.append(
                Document(
                    page_content=text,
                    metadata=metadatas[i] if metadatas else {},
                )
            )
    self.vectorstore.add_documents(docs)

build_index(chunks, metadatas=None)

Create a fresh vector store from chunks. Existing data (if any) are overwritten.

Parameters

chunks : Sequence[str] Text snippets (already chunked) to embed. metadatas : Sequence[dict] | None Optional metadata dict for each chunk, same length as chunks.

Source code in src/ursa/util/memory_logger.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def build_index(
    self,
    chunks: Sequence[str],
    metadatas: Optional[Sequence[Dict[str, Any]]] = None,
) -> None:
    """
    Create a fresh vector store from ``chunks``.  Existing data (if any)
    are overwritten.

    Parameters
    ----------
    chunks : Sequence[str]
        Text snippets (already chunked) to embed.
    metadatas : Sequence[dict] | None
        Optional metadata dict for each chunk, same length as ``chunks``.
    """
    docs = [
        Document(
            page_content=text, metadata=metadatas[i] if metadatas else {}
        )
        for i, text in enumerate(chunks)
    ]

    # Create (or overwrite) the collection
    self.vectorstore = Chroma.from_documents(
        documents=docs,
        embedding=self.embeddings,
        collection_name=self.collection_name,
        persist_directory=str(self.path),
    )

retrieve(query, k=4, with_scores=False, **search_kwargs)

Return the k most similar chunks for query.

Parameters

query : str Natural-language question or statement. k : int How many results to return. with_scores : bool If True, also return similarity scores. **search_kwargs Extra kwargs forwarded to Chroma’s similarity_search* helpers.

Returns

list[Document] | list[tuple[Document, float]]

Source code in src/ursa/util/memory_logger.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def retrieve(
    self,
    query: str,
    k: int = 4,
    with_scores: bool = False,
    **search_kwargs,
):
    """
    Return the *k* most similar chunks for `query`.

    Parameters
    ----------
    query : str
        Natural-language question or statement.
    k : int
        How many results to return.
    with_scores : bool
        If True, also return similarity scores.
    **search_kwargs
        Extra kwargs forwarded to Chroma’s ``similarity_search*`` helpers.

    Returns
    -------
    list[Document] | list[tuple[Document, float]]
    """
    if self.vectorstore is None:
        return ["None"]

    if with_scores:
        return self.vectorstore.similarity_search_with_score(
            query, k=k, **search_kwargs
        )
    return self.vectorstore.similarity_search(query, k=k, **search_kwargs)

delete_database(path=None)

Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

Parameters

path : str | Path | None Where the on-disk Chroma DB is for deleting. If None, a folder called agent_memory_db is created in the package’s base directory.

Source code in src/ursa/util/memory_logger.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def delete_database(path: Optional[str | Path] = None):
    """
    Simple wrapper around a persistent Chroma vector-store for agent-conversation memory.

    Parameters
    ----------
    path : str | Path | None
        Where the on-disk Chroma DB is for deleting.  If *None*, a folder called
        ``agent_memory_db`` is created in the package’s base directory.
    """
    db_path = AgentMemory.get_db_path(path)
    if os.path.exists(db_path):
        shutil.rmtree(db_path)
        print(f"Database: {db_path} has been deleted.")
    else:
        print("No database found to delete.")

parse

extract_json(text)

Extract a JSON object or array from text that might contain markdown or other content.

The function attempts three strategies
  1. Extract JSON from a markdown code block labeled as JSON.
  2. Extract JSON from any markdown code block.
  3. Use bracket matching to extract a JSON substring starting with '{' or '['.

Returns:

Type Description
list[dict]

A Python object parsed from the JSON string (dict or list).

Raises:

Type Description
ValueError

If no valid JSON is found.

Source code in src/ursa/util/parse.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def extract_json(text: str) -> list[dict]:
    """
    Extract a JSON object or array from text that might contain markdown or other content.

    The function attempts three strategies:
        1. Extract JSON from a markdown code block labeled as JSON.
        2. Extract JSON from any markdown code block.
        3. Use bracket matching to extract a JSON substring starting with '{' or '['.

    Returns:
        A Python object parsed from the JSON string (dict or list).

    Raises:
        ValueError: If no valid JSON is found.
    """
    # Approach 1: Look for a markdown code block specifically labeled as JSON.
    labeled_block = re.search(
        r"```json\s*([\[{].*?[\]}])\s*```", text, re.DOTALL
    )
    if labeled_block:
        json_str = labeled_block.group(1).strip()
        try:
            return json.loads(json_str)
        except json.JSONDecodeError:
            # Fall back to the next approach if parsing fails.
            pass

    # Approach 2: Look for any code block delimited by triple backticks.
    generic_block = re.search(r"```(.*?)```", text, re.DOTALL)
    if generic_block:
        json_str = generic_block.group(1).strip()
        if json_str.startswith("{") or json_str.startswith("["):
            try:
                return json.loads(json_str)
            except json.JSONDecodeError:
                pass

    # Approach 3: Attempt to extract JSON using bracket matching.
    # Find the first occurrence of either '{' or '['.
    first_obj = text.find("{")
    first_arr = text.find("[")
    if first_obj == -1 and first_arr == -1:
        raise ValueError("No JSON object or array found in the text.")

    # Determine which bracket comes first.
    if first_obj == -1:
        start = first_arr
        open_bracket = "["
        close_bracket = "]"
    elif first_arr == -1:
        start = first_obj
        open_bracket = "{"
        close_bracket = "}"
    else:
        if first_obj < first_arr:
            start = first_obj
            open_bracket = "{"
            close_bracket = "}"
        else:
            start = first_arr
            open_bracket = "["
            close_bracket = "]"

    # Bracket matching: find the matching closing bracket.
    depth = 0
    end = None
    for i in range(start, len(text)):
        if text[i] == open_bracket:
            depth += 1
        elif text[i] == close_bracket:
            depth -= 1
            if depth == 0:
                end = i
                break

    if end is None:
        raise ValueError(
            "Could not find matching closing bracket for JSON content."
        )

    json_str = text[start : end + 1]
    try:
        return json.loads(json_str)
    except json.JSONDecodeError as e:
        raise ValueError("Extracted content is not valid JSON.") from e