Skip to content

agents

base

BaseAgent

Bases: ABC

Source code in src/ursa/agents/base.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
class BaseAgent(ABC):
    # llm: BaseChatModel
    # llm_with_tools: Runnable[LanguageModelInput, BaseMessage]

    def __init__(
        self,
        llm: str | BaseChatModel,
        checkpointer: BaseCheckpointSaver = None,
        enable_metrics: bool = False,  # default to enabling metrics
        metrics_dir: str = ".ursa_metrics",  # dir to save metrics, with a default
        autosave_metrics: bool = True,
        thread_id: Optional[str] = None,
        **kwargs,
    ):
        match llm:
            case BaseChatModel():
                self.llm = llm

            case str():
                self.llm_provider, self.llm_model = llm.split("/")
                self.llm = ChatLiteLLM(
                    model=llm,
                    max_tokens=kwargs.pop("max_tokens", 10000),
                    max_retries=kwargs.pop("max_retries", 2),
                    **kwargs,
                )

            case _:
                raise TypeError(
                    "llm argument must be a string with the provider and model, or a BaseChatModel instance."
                )

        self.thread_id = thread_id or uuid4().hex
        self.checkpointer = checkpointer
        self.telemetry = Telemetry(
            enable=enable_metrics,
            output_dir=metrics_dir,
            save_json_default=autosave_metrics,
        )

    @property
    def name(self) -> str:
        """Agent name."""
        return self.__class__.__name__

    def add_node(
        self,
        graph: StateGraph,
        f: Callable[..., Mapping[str, Any]],
        node_name: Optional[str] = None,
        agent_name: Optional[str] = None,
    ) -> StateGraph:
        """Add node to graph.

        This is used to track token usage and is simply the following.

        ```python
        _node_name = node_name or f.__name__
        return graph.add_node(
            _node_name, self._wrap_node(f, _node_name, self.name)
        )
        ```
        """
        _node_name = node_name or f.__name__
        _agent_name = agent_name or _to_snake(self.name)
        wrapped_node = self._wrap_node(f, _node_name, _agent_name)
        return graph.add_node(_node_name, wrapped_node)

    def write_state(self, filename, state):
        json_state = dumps(state, ensure_ascii=False)
        with open(filename, "w") as f:
            f.write(json_state)

    # BaseAgent
    def build_config(self, **overrides) -> dict:
        """
        Build a config dict that includes telemetry callbacks and the thread_id.
        You can pass overrides like recursion_limit=..., configurable={...}, etc.
        """
        base = {
            "configurable": {"thread_id": self.thread_id},
            "metadata": {
                "thread_id": self.thread_id,
                "telemetry_run_id": self.telemetry.context.get("run_id"),
            },
            # "configurable": {
            #     "thread_id": getattr(self, "thread_id", "default")
            # },
            # "metadata": {
            #     "thread_id": getattr(self, "thread_id", "default"),
            #     "telemetry_run_id": self.telemetry.context.get("run_id"),
            # },
            "tags": [self.name],
            "callbacks": self.telemetry.callbacks,
        }
        # include model name when we can
        model_name = getattr(self, "llm_model", None) or getattr(
            getattr(self, "llm", None), "model", None
        )
        if model_name:
            base["metadata"]["model"] = model_name

        if "configurable" in overrides and isinstance(
            overrides["configurable"], dict
        ):
            base["configurable"].update(overrides.pop("configurable"))
        if "metadata" in overrides and isinstance(overrides["metadata"], dict):
            base["metadata"].update(overrides.pop("metadata"))
        # merge tags if caller provides them
        if "tags" in overrides and isinstance(overrides["tags"], list):
            base["tags"] = base["tags"] + [
                t for t in overrides.pop("tags") if t not in base["tags"]
            ]
        base.update(overrides)
        return base

    # agents will invoke like this:
    # planning_output = planner.invoke(
    #     {"messages": [HumanMessage(content=problem)]},
    #     config={
    #         "recursion_limit": 999_999,
    #         "configurable": {"thread_id": planner.thread_id},
    #     },
    # )
    # they can also, separately, override these defaults about metrics
    # keys that are NOT inputs; they should not be folded into the inputs mapping
    _TELEMETRY_KW = {
        "raw_debug",
        "save_json",
        "metrics_path",
        "save_raw_snapshot",
        "save_raw_records",
    }
    _CONTROL_KW = {"config", "recursion_limit", "tags", "metadata", "callbacks"}

    @final
    def invoke(
        self,
        inputs: Optional[InputLike] = None,  # sentinel
        /,
        *,
        raw_debug: bool = False,
        save_json: Optional[bool] = None,
        metrics_path: Optional[str] = None,
        save_raw_snapshot: Optional[bool] = None,
        save_raw_records: Optional[bool] = None,
        config: Optional[dict] = None,
        **kwargs: Any,  # may contain inputs (keyword-inputs) and/or control kw
    ) -> Any:
        depth = _INVOKE_DEPTH.get()
        _INVOKE_DEPTH.set(depth + 1)
        try:
            if depth == 0:
                self.telemetry.begin_run(
                    agent=self.name, thread_id=self.thread_id
                )

            # If no positional inputs were provided, split kwargs into inputs vs control
            if inputs is None:
                kw_inputs: dict[str, Any] = {}
                control_kwargs: dict[str, Any] = {}
                for k, v in kwargs.items():
                    if k in self._TELEMETRY_KW or k in self._CONTROL_KW:
                        control_kwargs[k] = v
                    else:
                        kw_inputs[k] = v
                inputs = kw_inputs
                kwargs = control_kwargs  # only control kwargs remain

            # If both positional inputs and extra unknown kwargs-as-inputs are given, forbid merging
            else:
                # keep only control kwargs; anything else would be ambiguous
                for k in kwargs.keys():
                    if not (k in self._TELEMETRY_KW or k in self._CONTROL_KW):
                        raise TypeError(
                            f"Unexpected keyword argument '{k}'. "
                            "Pass inputs as a single mapping or omit the positional "
                            "inputs and pass them as keyword arguments."
                        )

            # subclasses may translate keys
            normalized = self._normalize_inputs(inputs)

            # forward config + any control kwargs (e.g., recursion_limit) to the agent
            return self._invoke(normalized, config=config, **kwargs)

        finally:
            new_depth = _INVOKE_DEPTH.get() - 1
            _INVOKE_DEPTH.set(new_depth)
            if new_depth == 0:
                self.telemetry.render(
                    raw=raw_debug,
                    save_json=save_json,
                    filepath=metrics_path,
                    save_raw_snapshot=save_raw_snapshot,
                    save_raw_records=save_raw_records,
                )

    def _normalize_inputs(self, inputs: InputLike) -> Mapping[str, Any]:
        if isinstance(inputs, str):
            # Adjust to your message type
            from langchain_core.messages import HumanMessage

            return {"messages": [HumanMessage(content=inputs)]}
        if isinstance(inputs, Mapping):
            return inputs
        raise TypeError(f"Unsupported input type: {type(inputs)}")

    @abstractmethod
    def _invoke(self, inputs: Mapping[str, Any], **config: Any) -> Any:
        """Subclasses implement the actual work against normalized inputs."""
        ...

    def __call__(self, inputs: InputLike, /, **kwargs: Any) -> Any:
        return self.invoke(inputs, **kwargs)

    # Runtime enforcement: forbid subclasses from overriding invoke
    def __init_subclass__(cls, **kwargs):
        super().__init_subclass__(**kwargs)
        if "invoke" in cls.__dict__:
            raise TypeError(
                f"{cls.__name__} must not override BaseAgent.invoke(); implement _invoke() only."
            )

    def stream(
        self,
        inputs: InputLike,
        config: Any | None = None,  # allow positional/keyword like LangGraph
        /,
        *,
        raw_debug: bool = False,
        save_json: bool | None = None,
        metrics_path: str | None = None,
        save_raw_snapshot: bool | None = None,
        save_raw_records: bool | None = None,
        **kwargs: Any,
    ) -> Iterator[Any]:
        """Public streaming entry point. Telemetry-wrapped."""
        depth = _INVOKE_DEPTH.get()
        _INVOKE_DEPTH.set(depth + 1)
        try:
            if depth == 0:
                self.telemetry.begin_run(
                    agent=self.name, thread_id=self.thread_id
                )
            normalized = self._normalize_inputs(inputs)
            yield from self._stream(normalized, config=config, **kwargs)
        finally:
            new_depth = _INVOKE_DEPTH.get() - 1
            _INVOKE_DEPTH.set(new_depth)
            if new_depth == 0:
                self.telemetry.render(
                    raw=raw_debug,
                    save_json=save_json,
                    filepath=metrics_path,
                    save_raw_snapshot=save_raw_snapshot,
                    save_raw_records=save_raw_records,
                )

    def _stream(
        self,
        inputs: Mapping[str, Any],
        *,
        config: Any | None = None,
        **kwargs: Any,
    ) -> Iterator[Any]:
        raise NotImplementedError(
            f"{self.name} does not support streaming. "
            "Override _stream(...) in your agent to enable it."
        )

    # def run(
    #     self,
    #     *args,
    #     raw_debug: bool = False,
    #     save_json: bool | None = None,
    #     metrics_path: str | None = None,
    #     save_raw_snapshot: bool | None = None,
    #     save_raw_records: bool | None = None,
    #     **kwargs
    # ):
    #     try:
    #         self.telemetry.begin_run(agent=self.name, thread_id=self.thread_id)
    #         result = self._run_impl(*args, **kwargs)
    #         return result
    #     finally:
    #         print(self.telemetry.render(
    #             raw=raw_debug,
    #             save_json=save_json,
    #             filepath=metrics_path,
    #             save_raw_snapshot=save_raw_snapshot,
    #             save_raw_records=save_raw_records,
    #         ))

    # @abstractmethod
    # def _run_impl(self, *args, **kwargs):
    #     raise NotImplementedError("Agents must implement _run_impl")

    def _default_node_tags(
        self, name: str, extra: Sequence[str] | None = None
    ) -> list[str]:
        tags = [self.name, "graph", name]
        if extra:
            tags.extend(extra)
        return tags

    def _as_runnable(self, fn: Any):
        # If it's already runnable (has .with_config/.invoke), return it; else wrap
        return (
            fn
            if hasattr(fn, "with_config") and hasattr(fn, "invoke")
            else RunnableLambda(fn)
        )

    def _node_cfg(self, name: str, *extra_tags: str) -> dict:
        """Build a consistent config for a node/runnable so we can reapply it after .map(), subgraph compile, etc."""
        ns = extra_tags[0] if extra_tags else _to_snake(self.name)
        tags = [self.name, "graph", name, *extra_tags]
        return dict(
            run_name="node",  # keep "node:" prefixing in the timer; don't fight Rich labels here
            tags=tags,
            metadata={
                "langgraph_node": name,
                "ursa_ns": ns,
                "ursa_agent": self.name,
            },
        )

    def ns(self, runnable_or_fn, name: str, *extra_tags: str):
        """Return a runnable with our node config applied. Safe to call on callables or runnables.
        IMPORTANT: call this AGAIN after .map() / subgraph .compile() (they often drop config)."""
        r = self._as_runnable(runnable_or_fn)
        return r.with_config(**self._node_cfg(name, *extra_tags))

    def _wrap_node(self, fn_or_runnable, name: str, *extra_tags: str):
        return self.ns(fn_or_runnable, name, *extra_tags)

    def _wrap_cond(self, fn: Any, name: str, *extra_tags: str):
        ns = extra_tags[0] if extra_tags else _to_snake(self.name)
        return RunnableLambda(fn).with_config(
            run_name="node",
            tags=[
                self.name,
                "graph",
                f"route:{name}",
                *extra_tags,
            ],
            metadata={
                "langgraph_node": f"route:{name}",
                "ursa_ns": ns,
                "ursa_agent": self.name,
            },
        )

    def _named(self, runnable: Any, name: str, *extra_tags: str):
        ns = extra_tags[0] if extra_tags else _to_snake(self.name)
        return runnable.with_config(
            run_name=name,
            tags=[self.name, "graph", name, *extra_tags],
            metadata={
                "langgraph_node": name,
                "ursa_ns": ns,
                "ursa_agent": self.name,
            },
        )

name property

Agent name.

add_node(graph, f, node_name=None, agent_name=None)

Add node to graph.

This is used to track token usage and is simply the following.

_node_name = node_name or f.__name__
return graph.add_node(
    _node_name, self._wrap_node(f, _node_name, self.name)
)
Source code in src/ursa/agents/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def add_node(
    self,
    graph: StateGraph,
    f: Callable[..., Mapping[str, Any]],
    node_name: Optional[str] = None,
    agent_name: Optional[str] = None,
) -> StateGraph:
    """Add node to graph.

    This is used to track token usage and is simply the following.

    ```python
    _node_name = node_name or f.__name__
    return graph.add_node(
        _node_name, self._wrap_node(f, _node_name, self.name)
    )
    ```
    """
    _node_name = node_name or f.__name__
    _agent_name = agent_name or _to_snake(self.name)
    wrapped_node = self._wrap_node(f, _node_name, _agent_name)
    return graph.add_node(_node_name, wrapped_node)

build_config(**overrides)

Build a config dict that includes telemetry callbacks and the thread_id. You can pass overrides like recursion_limit=..., configurable={...}, etc.

Source code in src/ursa/agents/base.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def build_config(self, **overrides) -> dict:
    """
    Build a config dict that includes telemetry callbacks and the thread_id.
    You can pass overrides like recursion_limit=..., configurable={...}, etc.
    """
    base = {
        "configurable": {"thread_id": self.thread_id},
        "metadata": {
            "thread_id": self.thread_id,
            "telemetry_run_id": self.telemetry.context.get("run_id"),
        },
        # "configurable": {
        #     "thread_id": getattr(self, "thread_id", "default")
        # },
        # "metadata": {
        #     "thread_id": getattr(self, "thread_id", "default"),
        #     "telemetry_run_id": self.telemetry.context.get("run_id"),
        # },
        "tags": [self.name],
        "callbacks": self.telemetry.callbacks,
    }
    # include model name when we can
    model_name = getattr(self, "llm_model", None) or getattr(
        getattr(self, "llm", None), "model", None
    )
    if model_name:
        base["metadata"]["model"] = model_name

    if "configurable" in overrides and isinstance(
        overrides["configurable"], dict
    ):
        base["configurable"].update(overrides.pop("configurable"))
    if "metadata" in overrides and isinstance(overrides["metadata"], dict):
        base["metadata"].update(overrides.pop("metadata"))
    # merge tags if caller provides them
    if "tags" in overrides and isinstance(overrides["tags"], list):
        base["tags"] = base["tags"] + [
            t for t in overrides.pop("tags") if t not in base["tags"]
        ]
    base.update(overrides)
    return base

ns(runnable_or_fn, name, *extra_tags)

Return a runnable with our node config applied. Safe to call on callables or runnables. IMPORTANT: call this AGAIN after .map() / subgraph .compile() (they often drop config).

Source code in src/ursa/agents/base.py
372
373
374
375
376
def ns(self, runnable_or_fn, name: str, *extra_tags: str):
    """Return a runnable with our node config applied. Safe to call on callables or runnables.
    IMPORTANT: call this AGAIN after .map() / subgraph .compile() (they often drop config)."""
    r = self._as_runnable(runnable_or_fn)
    return r.with_config(**self._node_cfg(name, *extra_tags))

stream(inputs, config=None, /, *, raw_debug=False, save_json=None, metrics_path=None, save_raw_snapshot=None, save_raw_records=None, **kwargs)

Public streaming entry point. Telemetry-wrapped.

Source code in src/ursa/agents/base.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def stream(
    self,
    inputs: InputLike,
    config: Any | None = None,  # allow positional/keyword like LangGraph
    /,
    *,
    raw_debug: bool = False,
    save_json: bool | None = None,
    metrics_path: str | None = None,
    save_raw_snapshot: bool | None = None,
    save_raw_records: bool | None = None,
    **kwargs: Any,
) -> Iterator[Any]:
    """Public streaming entry point. Telemetry-wrapped."""
    depth = _INVOKE_DEPTH.get()
    _INVOKE_DEPTH.set(depth + 1)
    try:
        if depth == 0:
            self.telemetry.begin_run(
                agent=self.name, thread_id=self.thread_id
            )
        normalized = self._normalize_inputs(inputs)
        yield from self._stream(normalized, config=config, **kwargs)
    finally:
        new_depth = _INVOKE_DEPTH.get() - 1
        _INVOKE_DEPTH.set(new_depth)
        if new_depth == 0:
            self.telemetry.render(
                raw=raw_debug,
                save_json=save_json,
                filepath=metrics_path,
                save_raw_snapshot=save_raw_snapshot,
                save_raw_records=save_raw_records,
            )

code_review_agent

read_file(filename, state)

Reads in a file with a given filename into a string

Parameters:

Name Type Description Default
filename str

string filename to read in

required
Source code in src/ursa/agents/code_review_agent.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@tool
def read_file(filename: str, state: Annotated[dict, InjectedState]):
    """
    Reads in a file with a given filename into a string

    Args:
        filename: string filename to read in
    """
    workspace_dir = state["workspace"]
    full_filename = os.path.join(workspace_dir, filename)

    print("[READING]: ", full_filename)
    with open(full_filename, "r") as file:
        file_contents = file.read()
    return file_contents

run_cmd(query, state)

Run command from commandline

Source code in src/ursa/agents/code_review_agent.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
@tool
def run_cmd(query: str, state: Annotated[dict, InjectedState]) -> str:
    """Run command from commandline"""
    workspace_dir = state["workspace"]

    print("RUNNING: ", query)
    process = subprocess.Popen(
        query.split(" "),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        cwd=workspace_dir,
    )

    stdout, stderr = process.communicate(timeout=600)

    print("STDOUT: ", stdout)
    print("STDERR: ", stderr)

    return f"STDOUT: {stdout} and STDERR: {stderr}"

write_file(code, filename, state)

Writes text to a file in the given workspace as requested.

Parameters:

Name Type Description Default
code str

Text to write to a file

required
filename str

the filename to write to

required

Returns:

Type Description
str

Execution results

Source code in src/ursa/agents/code_review_agent.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
@tool
def write_file(
    code: str, filename: str, state: Annotated[dict, InjectedState]
) -> str:
    """
    Writes text to a file in the given workspace as requested.

    Args:
        code: Text to write to a file
        filename: the filename to write to

    Returns:
        Execution results
    """
    workspace_dir = state["workspace"]

    print("[WRITING]: ", filename)
    try:
        # Extract code if wrapped in markdown code blocks
        if "```" in code:
            code_parts = code.split("```")
            if len(code_parts) >= 3:
                # Extract the actual code
                if "\n" in code_parts[1]:
                    code = "\n".join(code_parts[1].strip().split("\n")[1:])
                else:
                    code = code_parts[2].strip()

        # Write code to a file
        code_file = os.path.join(workspace_dir, filename)

        with open(code_file, "w") as f:
            f.write(code)
        print(f"Written code to file: {code_file}")

        return f"File {filename} written successfully."

    except Exception as e:
        print(f"Error generating code: {str(e)}")
        # Return minimal code that prints the error
        return f"Failed to write {filename} successfully."

execution_agent

ExecutionAgent

Bases: BaseAgent

Source code in src/ursa/agents/execution_agent.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class ExecutionAgent(BaseAgent):
    def __init__(
        self,
        llm: str | BaseChatModel = "openai/gpt-4o-mini",
        agent_memory: Optional[Any | AgentMemory] = None,
        log_state: bool = False,
        **kwargs,
    ):
        super().__init__(llm, **kwargs)
        self.agent_memory = agent_memory
        self.safety_prompt = safety_prompt
        self.executor_prompt = executor_prompt
        self.summarize_prompt = summarize_prompt
        self.tools = [run_cmd, write_code, edit_code, search_tool]
        self.tool_node = ToolNode(self.tools)
        self.llm = self.llm.bind_tools(self.tools)
        self.log_state = log_state

        self._action = self._build_graph()

    # Define the function that calls the model
    def query_executor(self, state: ExecutionState) -> ExecutionState:
        new_state = state.copy()
        if "workspace" not in new_state.keys():
            new_state["workspace"] = randomname.get_name()
            print(
                f"{RED}Creating the folder {BLUE}{BOLD}{new_state['workspace']}{RESET}{RED} for this project.{RESET}"
            )
        os.makedirs(new_state["workspace"], exist_ok=True)

        # code related to symlink
        sd = new_state.get("symlinkdir")
        if isinstance(sd, dict) and "is_linked" not in sd:
            # symlinkdir = {"source": "foo", "dest": "bar"}
            symlinkdir = new_state["symlinkdir"]
            # user provided a symlinkdir key - let's do the linking!

            src = Path(symlinkdir["source"]).expanduser().resolve()
            workspace_root = Path(new_state["workspace"]).expanduser().resolve()
            dst = workspace_root / symlinkdir["dest"]  # prepend workspace

            # if you want to replace an existing link/file, unlink it first
            if dst.exists() or dst.is_symlink():
                dst.unlink()

            # create parent dirs for the link location if they don’t exist
            dst.parent.mkdir(parents=True, exist_ok=True)

            # actually make the link (tell pathlib it’s a directory target)
            dst.symlink_to(src, target_is_directory=src.is_dir())
            print(f"{RED}Symlinked {src} (source) --> {dst} (dest)")
            # note that we've done the symlink now, so don't need to do it later
            new_state["symlinkdir"]["is_linked"] = True

        if isinstance(new_state["messages"][0], SystemMessage):
            new_state["messages"][0] = SystemMessage(
                content=self.executor_prompt
            )
        else:
            new_state["messages"] = [
                SystemMessage(content=self.executor_prompt)
            ] + state["messages"]
        try:
            response = self.llm.invoke(
                new_state["messages"], self.build_config(tags=["agent"])
            )
        except ContentPolicyViolationError as e:
            print("Error: ", e, " ", new_state["messages"][-1].content)
        if self.log_state:
            self.write_state("execution_agent.json", new_state)
        return {"messages": [response], "workspace": new_state["workspace"]}

    # Define the function that calls the model
    def summarize(self, state: ExecutionState) -> ExecutionState:
        messages = [SystemMessage(content=summarize_prompt)] + state["messages"]
        try:
            response = self.llm.invoke(
                messages, self.build_config(tags=["summarize"])
            )
        except ContentPolicyViolationError as e:
            print("Error: ", e, " ", messages[-1].content)
        if self.agent_memory:
            memories = []
            # Handle looping through the messages
            for x in state["messages"]:
                if not isinstance(x, AIMessage):
                    memories.append(x.content)
                elif not x.tool_calls:
                    memories.append(x.content)
                else:
                    tool_strings = []
                    for tool in x.tool_calls:
                        tool_name = "Tool Name: " + tool["name"]
                        tool_strings.append(tool_name)
                        for y in tool["args"]:
                            tool_strings.append(
                                f"Arg: {str(y)}\nValue: {str(tool['args'][y])}"
                            )
                    memories.append("\n".join(tool_strings))
            memories.append(response.content)
            self.agent_memory.add_memories(memories)
            save_state = state.copy()
            save_state["messages"].append(response)
        if self.log_state:
            self.write_state("execution_agent.json", save_state)
        return {"messages": [response.content]}

    # Define the function that calls the model
    def safety_check(self, state: ExecutionState) -> ExecutionState:
        """
        Validate the safety of a pending shell command.

        Args:
            state: Current execution state.

        Returns:
            Either the unchanged state (safe) or a state with tool message(s) (unsafe).
        """
        new_state = state.copy()
        last_msg = new_state["messages"][-1]

        tool_responses = []
        tool_failed = False
        for tool_call in last_msg.tool_calls:
            call_name = tool_call["name"]

            if call_name == "run_cmd":
                query = tool_call["args"]["query"]
                safety_check = self.llm.invoke(
                    self.safety_prompt + query,
                    self.build_config(tags=["safety_check"]),
                )

                if "[NO]" in safety_check.content:
                    tool_failed = True

                    tool_response = f"""
                    [UNSAFE] That command `{query}` was deemed unsafe and cannot be run.
                    For reason: {safety_check.content}
                    """
                    console.print(
                        "[bold red][WARNING][/bold red] Command deemed unsafe:",
                        query,
                    )
                    # and tell the user the reason
                    console.print(
                        "[bold red][WARNING][/bold red] REASON:", tool_response
                    )

                else:
                    tool_response = f"Command `{query}` passed safety check."
                    console.print(
                        f"[green]Command passed safety check:[/green] {query}"
                    )

                tool_responses.append(
                    ToolMessage(
                        content=tool_response,
                        tool_call_id=tool_call["id"],
                    )
                )

        if tool_failed:
            new_state["messages"].extend(tool_responses)

        return new_state

    def _build_graph(self):
        graph = StateGraph(ExecutionState)

        self.add_node(graph, self.query_executor, "agent")
        self.add_node(graph, self.tool_node, "action")
        self.add_node(graph, self.summarize, "summarize")
        self.add_node(graph, self.safety_check, "safety_check")

        # Set the entrypoint as `agent`
        # This means that this node is the first one called
        graph.set_entry_point("agent")

        graph.add_conditional_edges(
            "agent",
            self._wrap_cond(should_continue, "should_continue", "execution"),
            {"continue": "safety_check", "summarize": "summarize"},
        )

        graph.add_conditional_edges(
            "safety_check",
            self._wrap_cond(command_safe, "command_safe", "execution"),
            {"safe": "action", "unsafe": "agent"},
        )

        graph.add_edge("action", "agent")
        graph.set_finish_point("summarize")

        return graph.compile(checkpointer=self.checkpointer)
        # self.action.get_graph().draw_mermaid_png(output_file_path="execution_agent_graph.png", draw_method=MermaidDrawMethod.PYPPETEER)

    def _invoke(
        self, inputs: Mapping[str, Any], recursion_limit: int = 999_999, **_
    ):
        config = self.build_config(
            recursion_limit=recursion_limit, tags=["graph"]
        )
        return self._action.invoke(inputs, config)

    # this is trying to stop people bypassing invoke
    @property
    def action(self):
        raise AttributeError(
            "Use .stream(...) or .invoke(...); direct .action access is unsupported."
        )

safety_check(state)

Validate the safety of a pending shell command.

Parameters:

Name Type Description Default
state ExecutionState

Current execution state.

required

Returns:

Type Description
ExecutionState

Either the unchanged state (safe) or a state with tool message(s) (unsafe).

Source code in src/ursa/agents/execution_agent.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def safety_check(self, state: ExecutionState) -> ExecutionState:
    """
    Validate the safety of a pending shell command.

    Args:
        state: Current execution state.

    Returns:
        Either the unchanged state (safe) or a state with tool message(s) (unsafe).
    """
    new_state = state.copy()
    last_msg = new_state["messages"][-1]

    tool_responses = []
    tool_failed = False
    for tool_call in last_msg.tool_calls:
        call_name = tool_call["name"]

        if call_name == "run_cmd":
            query = tool_call["args"]["query"]
            safety_check = self.llm.invoke(
                self.safety_prompt + query,
                self.build_config(tags=["safety_check"]),
            )

            if "[NO]" in safety_check.content:
                tool_failed = True

                tool_response = f"""
                [UNSAFE] That command `{query}` was deemed unsafe and cannot be run.
                For reason: {safety_check.content}
                """
                console.print(
                    "[bold red][WARNING][/bold red] Command deemed unsafe:",
                    query,
                )
                # and tell the user the reason
                console.print(
                    "[bold red][WARNING][/bold red] REASON:", tool_response
                )

            else:
                tool_response = f"Command `{query}` passed safety check."
                console.print(
                    f"[green]Command passed safety check:[/green] {query}"
                )

            tool_responses.append(
                ToolMessage(
                    content=tool_response,
                    tool_call_id=tool_call["id"],
                )
            )

    if tool_failed:
        new_state["messages"].extend(tool_responses)

    return new_state

command_safe(state)

Return graph edge "safe" if the last command was safe, otherwise return edge "unsafe"

Source code in src/ursa/agents/execution_agent.py
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
def command_safe(state: ExecutionState) -> Literal["safe", "unsafe"]:
    """
    Return graph edge "safe" if the last command was safe, otherwise return edge "unsafe"
    """

    index = -1
    message = state["messages"][index]
    # Loop through all the consecutive tool messages in reverse order
    while isinstance(message, ToolMessage):
        if "[UNSAFE]" in message.content:
            return "unsafe"

        index -= 1
        message = state["messages"][index]

    return "safe"

edit_code(old_code, new_code, filename, state)

Replace the first occurrence of old_code with new_code in filename.

Parameters:

Name Type Description Default
old_code str

Code fragment to search for.

required
new_code str

Replacement fragment.

required
filename str

Target file inside the workspace.

required

Returns:

Type Description
str

Success / failure message.

Source code in src/ursa/agents/execution_agent.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
@tool
def edit_code(
    old_code: str,
    new_code: str,
    filename: str,
    state: Annotated[dict, InjectedState],
) -> str:
    """Replace the **first** occurrence of *old_code* with *new_code* in *filename*.

    Args:
        old_code: Code fragment to search for.
        new_code: Replacement fragment.
        filename: Target file inside the workspace.

    Returns:
        Success / failure message.
    """
    workspace_dir = state["workspace"]
    console.print("[cyan]Editing file:[/cyan]", filename)

    code_file = os.path.join(workspace_dir, filename)
    try:
        with open(code_file, "r", encoding="utf-8") as f:
            content = f.read()
    except FileNotFoundError:
        console.print(
            "[bold bright_white on red] :heavy_multiplication_x: [/] "
            "[red]File not found:[/]",
            filename,
        )
        return f"Failed: {filename} not found."

    # Clean up markdown fences
    old_code_clean = _strip_fences(old_code)
    new_code_clean = _strip_fences(new_code)

    if old_code_clean not in content:
        console.print(
            "[yellow] ⚠️ 'old_code' not found in file'; no changes made.[/]"
        )
        return f"No changes made to {filename}: 'old_code' not found in file."

    updated = content.replace(old_code_clean, new_code_clean, 1)

    console.print(
        Panel(
            DiffRenderer(content, updated, filename),
            title="Diff Preview",
            border_style="cyan",
        )
    )

    try:
        with open(code_file, "w", encoding="utf-8") as f:
            f.write(updated)
    except Exception as exc:
        console.print(
            "[bold bright_white on red] :heavy_multiplication_x: [/] "
            "[red]Failed to write file:[/]",
            exc,
        )
        return f"Failed to edit {filename}."

    console.print(
        f"[bold bright_white on green] :heavy_check_mark: [/] "
        f"[green]File updated:[/] {code_file}"
    )
    return f"File {filename} updated successfully."

run_cmd(query, state)

Run a commandline command from using the subprocess package in python

Parameters:

Name Type Description Default
query str

commandline command to be run as a string given to the subprocess.run command.

required
Source code in src/ursa/agents/execution_agent.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
@tool
def run_cmd(query: str, state: Annotated[dict, InjectedState]) -> str:
    """
    Run a commandline command from using the subprocess package in python

    Args:
        query: commandline command to be run as a string given to the subprocess.run command.
    """
    workspace_dir = state["workspace"]
    print("RUNNING: ", query)
    try:
        result = subprocess.run(
            query,
            text=True,
            shell=True,
            timeout=60000,
            capture_output=True,
            cwd=workspace_dir,
        )
        stdout, stderr = result.stdout, result.stderr
    except KeyboardInterrupt:
        print("Keyboard Interrupt of command: ", query)
        stdout, stderr = "", "KeyboardInterrupt:"

    # Fit BOTH streams under a single overall cap
    stdout_fit, stderr_fit = _fit_streams_to_budget(
        stdout or "", stderr or "", MAX_TOOL_MSG_CHARS
    )

    print("STDOUT: ", stdout_fit)
    print("STDERR: ", stderr_fit)

    return f"STDOUT:\n{stdout_fit}\nSTDERR:\n{stderr_fit}"

write_code(code, filename, tool_call_id, state)

Write code to filename.

Parameters:

Name Type Description Default
code str

Source code as a string.

required
filename str

Target filename (including extension).

required

Returns:

Type Description
Command

Success / failure message.

Source code in src/ursa/agents/execution_agent.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@tool
def write_code(
    code: str,
    filename: str,
    tool_call_id: Annotated[str, InjectedToolCallId],
    state: Annotated[dict, InjectedState],
) -> Command:
    """Write *code* to *filename*.

    Args:
        code: Source code as a string.
        filename: Target filename (including extension).

    Returns:
        Success / failure message.
    """
    workspace_dir = state["workspace"]
    console.print("[cyan]Writing file:[/]", filename)

    # Clean up markdown fences
    code = _strip_fences(code)

    # Syntax-highlighted preview
    try:
        lexer_name = Syntax.guess_lexer(filename, code)
    except Exception:
        lexer_name = "text"

    console.print(
        Panel(
            Syntax(code, lexer_name, line_numbers=True),
            title="File Preview",
            border_style="cyan",
        )
    )

    code_file = os.path.join(workspace_dir, filename)
    try:
        with open(code_file, "w", encoding="utf-8") as f:
            f.write(code)
    except Exception as exc:
        console.print(
            "[bold bright_white on red] :heavy_multiplication_x: [/] "
            "[red]Failed to write file:[/]",
            exc,
        )
        return f"Failed to write {filename}."

    console.print(
        f"[bold bright_white on green] :heavy_check_mark: [/] "
        f"[green]File written:[/] {code_file}"
    )

    # Append the file to the list in state
    file_list = state.get("code_files", [])
    file_list.append(filename)

    # Create a tool message to send back
    msg = ToolMessage(
        content=f"File {filename} written successfully.",
        tool_call_id=tool_call_id,
    )

    # Return updated code files list & the message
    return Command(
        update={
            "code_files": file_list,
            "messages": [msg],
        }
    )

hypothesizer_agent

HypothesizerAgent

Bases: BaseAgent

Source code in src/ursa/agents/hypothesizer_agent.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
class HypothesizerAgent(BaseAgent):
    def __init__(self, llm: str | BaseChatModel = "openai/o3-mini", **kwargs):
        super().__init__(llm, **kwargs)
        self.hypothesizer_prompt = hypothesizer_prompt
        self.critic_prompt = critic_prompt
        self.competitor_prompt = competitor_prompt
        self.search_tool = DuckDuckGoSearchResults(
            output_format="json", num_results=10
        )
        # self.search_tool = TavilySearchResults(
        #     max_results=10, search_depth="advanced", include_answer=False
        # )

        self._action = self._build_graph()

    def agent1_generate_solution(
        self, state: HypothesizerState
    ) -> HypothesizerState:
        """Agent 1: Hypothesizer."""
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Entering agent1_generate_solution. Iteration: {state['current_iteration']}"
        )

        current_iter = state["current_iteration"]
        user_content = f"Question: {state['question']}\n"

        if current_iter > 0:
            user_content += (
                f"\nPrevious solution: {state['agent1_solution'][-1]}"
            )
            user_content += f"\nCritique: {state['agent2_critiques'][-1]}"
            user_content += (
                f"\nCompetitor perspective: {state['agent3_perspectives'][-1]}"
            )
            user_content += (
                "\n\n**You must explicitly list how this new solution differs from the previous solution,** "
                "point by point, explaining what changes were made in response to the critique and competitor perspective."
                "\nAfterward, provide your updated solution."
            )
        else:
            user_content += "Research this problem and generate a solution."

        search_query = self.llm.invoke(
            f"Here is a problem description: {state['question']}. Turn it into a short query to be fed into a search engine."
        ).content
        if '"' in search_query:
            search_query = search_query.split('"')[1]
        raw_search_results = self.search_tool.invoke(search_query)

        # Parse the results if possible, so we can collect URLs
        new_state = state.copy()
        new_state["question_search_query"] = search_query
        if "visited_sites" not in new_state:
            new_state["visited_sites"] = []

        try:
            if isinstance(raw_search_results, str):
                results_list = ast.literal_eval(raw_search_results)
            else:
                results_list = raw_search_results
            # Each item typically might have "link", "title", "snippet"
            for item in results_list:
                link = item.get("link")
                if link:
                    print(f"[DEBUG] Appending visited link: {link}")
                    new_state["visited_sites"].append(link)
        except (ValueError, SyntaxError, TypeError):
            # If it's not valid Python syntax or something else goes wrong
            print("[DEBUG] Could not parse search results as Python list.")
            print("[DEBUG] raw_search_results:", raw_search_results)

        user_content += f"\nSearch results: {raw_search_results}"

        # Provide a system message to define this agent's role
        messages = [
            SystemMessage(content=self.hypothesizer_prompt),
            HumanMessage(content=user_content),
        ]
        solution = self.llm.invoke(messages)

        new_state["agent1_solution"].append(solution.content)

        # Print the entire solution in green
        print(
            f"{GREEN}[Agent1 - Hypothesizer solution]\n{solution.content}{RESET}"
        )
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Exiting agent1_generate_solution."
        )
        return new_state

    def agent2_critique(self, state: HypothesizerState) -> HypothesizerState:
        """Agent 2: Critic."""
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Entering agent2_critique."
        )

        solution = state["agent1_solution"][-1]
        user_content = (
            f"Question: {state['question']}\n"
            f"Proposed solution: {solution}\n"
            "Provide a detailed critique of this solution. Identify potential flaws, assumptions, and areas for improvement."
        )

        fact_check_query = f"fact check {state['question_search_query']} solution effectiveness"

        raw_search_results = self.search_tool.invoke(fact_check_query)

        # Parse the results if possible, so we can collect URLs
        new_state = state.copy()
        if "visited_sites" not in new_state:
            new_state["visited_sites"] = []

        try:
            if isinstance(raw_search_results, str):
                results_list = ast.literal_eval(raw_search_results)
            else:
                results_list = raw_search_results
            # Each item typically might have "link", "title", "snippet"
            for item in results_list:
                link = item.get("link")
                if link:
                    print(f"[DEBUG] Appending visited link: {link}")
                    new_state["visited_sites"].append(link)
        except (ValueError, SyntaxError, TypeError):
            # If it's not valid Python syntax or something else goes wrong
            print("[DEBUG] Could not parse search results as Python list.")
            print("[DEBUG] raw_search_results:", raw_search_results)

        fact_check_results = raw_search_results
        user_content += f"\nFact check results: {fact_check_results}"

        messages = [
            SystemMessage(content=self.critic_prompt),
            HumanMessage(content=user_content),
        ]
        critique = self.llm.invoke(messages)

        new_state["agent2_critiques"].append(critique.content)

        # Print the entire critique in blue
        print(f"{BLUE}[Agent2 - Critic]\n{critique.content}{RESET}")
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Exiting agent2_critique."
        )
        return new_state

    def agent3_competitor_perspective(
        self, state: HypothesizerState
    ) -> HypothesizerState:
        """Agent 3: Competitor/Stakeholder Simulator."""
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Entering agent3_competitor_perspective."
        )

        solution = state["agent1_solution"][-1]
        critique = state["agent2_critiques"][-1]

        user_content = (
            f"Question: {state['question']}\n"
            f"Proposed solution: {solution}\n"
            f"Critique: {critique}\n"
            "Simulate how a competitor, government agency, or other stakeholder might respond to this solution."
        )

        competitor_search_query = (
            f"competitor responses to {state['question_search_query']}"
        )

        raw_search_results = self.search_tool.invoke(competitor_search_query)

        # Parse the results if possible, so we can collect URLs
        new_state = state.copy()
        if "visited_sites" not in new_state:
            new_state["visited_sites"] = []

        try:
            if isinstance(raw_search_results, str):
                results_list = ast.literal_eval(raw_search_results)
            else:
                results_list = raw_search_results
            # Each item typically might have "link", "title", "snippet"
            for item in results_list:
                link = item.get("link")
                if link:
                    print(f"[DEBUG] Appending visited link: {link}")
                    new_state["visited_sites"].append(link)
        except (ValueError, SyntaxError, TypeError):
            # If it's not valid Python syntax or something else goes wrong
            print("[DEBUG] Could not parse search results as Python list.")
            print("[DEBUG] raw_search_results:", raw_search_results)

        competitor_info = raw_search_results
        user_content += f"\nCompetitor information: {competitor_info}"

        messages = [
            SystemMessage(content=self.competitor_prompt),
            HumanMessage(content=user_content),
        ]
        perspective = self.llm.invoke(messages)

        new_state["agent3_perspectives"].append(perspective.content)

        # Print the entire perspective in red
        print(
            f"{RED}[Agent3 - Competitor/Stakeholder Perspective]\n{perspective.content}{RESET}"
        )
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Exiting agent3_competitor_perspective."
        )
        return new_state

    def increment_iteration(
        self, state: HypothesizerState
    ) -> HypothesizerState:
        new_state = state.copy()
        new_state["current_iteration"] += 1
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Iteration incremented to {new_state['current_iteration']}"
        )
        return new_state

    def generate_solution(self, state: HypothesizerState) -> HypothesizerState:
        """Generate the overall, refined solution based on all iterations."""
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Entering generate_solution."
        )
        prompt = f"Original question: {state['question']}\n\n"
        prompt += "Evolution of solutions:\n"

        for i in range(state["max_iterations"]):
            prompt += f"\nIteration {i + 1}:\n"
            prompt += f"Solution: {state['agent1_solution'][i]}\n"
            prompt += f"Critique: {state['agent2_critiques'][i]}\n"
            prompt += (
                f"Competitor perspective: {state['agent3_perspectives'][i]}\n"
            )

        prompt += "\nBased on this iterative process, provide the overall, refined solution."

        print(
            f"[iteration {state['current_iteration']} - DEBUG] Generating overall solution with LLM..."
        )
        solution = self.llm.invoke(prompt)
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Overall solution obtained. Preview:",
            solution.content[:200],
            "...",
        )

        new_state = state.copy()
        new_state["solution"] = solution.content

        print(
            f"[iteration {state['current_iteration']} - DEBUG] Exiting generate_solution."
        )
        return new_state

    def print_visited_sites(
        self, state: HypothesizerState
    ) -> HypothesizerState:
        new_state = state.copy()
        all_sites = new_state.get("visited_sites", [])
        print("[DEBUG] Visited Sites:")
        for s in all_sites:
            print("  ", s)
        return new_state

    def summarize_process_as_latex(
        self, state: HypothesizerState
    ) -> HypothesizerState:
        """
        Summarize how the solution changed over time, referencing
        each iteration's critique and competitor perspective,
        then produce a final LaTeX document.
        """
        print("[DEBUG] Entering summarize_process_as_latex.")
        llm_model = state.get("llm_model", "unknown_model")

        # Build a single string describing the entire iterative process
        iteration_details = ""
        for i, (sol, crit, comp) in enumerate(
            zip(
                state["agent1_solution"],
                state["agent2_critiques"],
                state["agent3_perspectives"],
            ),
            start=1,
        ):
            iteration_details += (
                f"\\subsection*{{Iteration {i}}}\n\n"
                f"\\textbf{{Solution:}}\\\\\n{sol}\n\n"
                f"\\textbf{{Critique:}}\\\\\n{crit}\n\n"
                f"\\textbf{{Competitor Perspective:}}\\\\\n{comp}\n\n"
            )

        # -----------------------------
        # Write iteration_details to disk as .txt
        # -----------------------------
        timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        txt_filename = (
            f"iteration_details_{llm_model}_{timestamp_str}_chat_history.txt"
        )
        with open(txt_filename, "w", encoding="utf-8") as f:
            f.write(iteration_details)

        print(f"[DEBUG] Wrote iteration details to {txt_filename}.")

        # Prompt the LLM to produce a LaTeX doc
        # We'll just pass it as a single string to the LLM;
        # you could also do system+human messages if you prefer.
        prompt = f"""\
            You are a system that produces a FULL LaTeX document.
            Here is information about a multi-iteration process:

            Original question: {state["question"]}

            Below are the solutions, critiques, and competitor perspectives from each iteration:

            {iteration_details}

            The solution we arrived at was:

            {state["solution"]}

            Now produce a valid LaTeX document.  Be sure to use a table of contents.
            It must start with an Executive Summary (that may be multiple pages) which summarizes
            the entire iterative process.  Following that, we should include the solution in full,
            not summarized, but reformatted for appropriate LaTeX.  And then, finally (and this will be
            quite long), we must take all the steps - solutions, critiques, and competitor perspectives
            and *NOT SUMMARIZE THEM* but merely reformat them for the reader.  This will be in an Appendix
            of the full content of the steps.  Finally, include a listing of all of the websites we
            used in our research.

            You must ONLY RETURN LaTeX, nothing else.  It must be valid LaTeX syntax!

            Your output should start with:
            \\documentclass{{article}}
            \\usepackage[margin=1in]{{geometry}}
            etc.

            It must compile without errors under pdflatex. 
        """

        # Now produce a valid LaTeX document that nicely summarizes this entire iterative process.
        # It must include the overall solution in full, not summarized, but reformatted for appropriate
        # LaTeX. The summarization is for the other steps.

        all_visited_sites = state.get("visited_sites", [])
        # (Optional) remove duplicates by converting to a set, then back to a list
        visited_sites_unique = list(set(all_visited_sites))
        if visited_sites_unique:
            websites_latex = "\\section*{Websites Visited}\\begin{itemize}\n"
            for url in visited_sites_unique:
                print(f"We visited: {url}")
                # Use \url{} to handle special characters in URLs
                websites_latex += f"\\item \\url{{{url}}}\n"
            websites_latex += "\\end{itemize}\n\n"
        else:
            # If no sites visited, or the list is empty
            websites_latex = (
                "\\section*{Websites Visited}\nNo sites were visited.\n\n"
            )
        print(websites_latex)

        # Ask the LLM to produce *only* LaTeX content
        latex_response = self.llm.invoke(prompt)

        latex_doc = latex_response.content

        def inject_into_latex(original_tex: str, injection: str) -> str:
            """
            Find the last occurrence of '\\end{document}' in 'original_tex'
            and insert 'injection' right before it.
            If '\\end{document}' is not found, just append the injection at the end.
            """
            injection_index = original_tex.rfind(r"\end{document}")
            if injection_index == -1:
                # If the LLM didn't include \end{document}, just append
                return original_tex + "\n" + injection
            else:
                # Insert right before \end{document}
                return (
                    original_tex[:injection_index]
                    + "\n"
                    + injection
                    + "\n"
                    + original_tex[injection_index:]
                )

        final_latex = inject_into_latex(latex_doc, websites_latex)

        new_state = state.copy()
        new_state["summary_report"] = final_latex

        print(
            f"[iteration {state['current_iteration']} - DEBUG] Received LaTeX from LLM. Preview:"
        )
        print(latex_response.content[:300], "...")
        print(
            f"[iteration {state['current_iteration']} - DEBUG] Exiting summarize_process_as_latex."
        )
        return new_state

    def _build_graph(self):
        # Initialize the graph
        graph = StateGraph(HypothesizerState)

        # Add nodes
        self.add_node(graph, self.agent1_generate_solution, "agent1")
        self.add_node(graph, self.agent2_critique, "agent2")
        self.add_node(graph, self.agent3_competitor_perspective, "agent3")
        self.add_node(graph, self.increment_iteration, "increment_iteration")
        self.add_node(graph, self.generate_solution, "finalize")
        self.add_node(graph, self.print_visited_sites, "print_sites")
        self.add_node(
            graph, self.summarize_process_as_latex, "summarize_as_latex"
        )
        # self.graph.add_node("compile_pdf",                compile_summary_to_pdf)

        # Add simple edges for the known flow
        graph.add_edge("agent1", "agent2")
        graph.add_edge("agent2", "agent3")
        graph.add_edge("agent3", "increment_iteration")

        # Then from increment_iteration, we have a conditional:
        # If we 'continue', we go back to agent1
        # If we 'finish', we jump to the finalize node
        graph.add_conditional_edges(
            "increment_iteration",
            should_continue,
            {"continue": "agent1", "finish": "finalize"},
        )

        graph.add_edge("finalize", "summarize_as_latex")
        graph.add_edge("summarize_as_latex", "print_sites")
        # self.graph.add_edge("summarize_as_latex", "compile_pdf")
        # self.graph.add_edge("compile_pdf", "print_sites")

        # Set the entry point
        graph.set_entry_point("agent1")
        graph.set_finish_point("print_sites")

        return graph.compile(checkpointer=self.checkpointer)
        # self.action.get_graph().draw_mermaid_png(output_file_path="hypothesizer_agent_graph.png", draw_method=MermaidDrawMethod.PYPPETEER)

    def _invoke(
        self, inputs: Mapping[str, Any], recursion_limit: int = 100000, **_
    ):
        config = self.build_config(
            recursion_limit=recursion_limit, tags=["graph"]
        )
        if "prompt" not in inputs:
            raise KeyError("'prompt' is a required arguments")

        inputs["max_iterations"] = inputs.get("max_iterations", 3)
        inputs["current_iteration"] = 0
        inputs["agent1_solution"] = []
        inputs["agent2_critiques"] = []
        inputs["agent3_perspectives"] = []
        inputs["solution"] = ""

        return self._action.invoke(inputs, config)

agent1_generate_solution(state)

Agent 1: Hypothesizer.

Source code in src/ursa/agents/hypothesizer_agent.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def agent1_generate_solution(
    self, state: HypothesizerState
) -> HypothesizerState:
    """Agent 1: Hypothesizer."""
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Entering agent1_generate_solution. Iteration: {state['current_iteration']}"
    )

    current_iter = state["current_iteration"]
    user_content = f"Question: {state['question']}\n"

    if current_iter > 0:
        user_content += (
            f"\nPrevious solution: {state['agent1_solution'][-1]}"
        )
        user_content += f"\nCritique: {state['agent2_critiques'][-1]}"
        user_content += (
            f"\nCompetitor perspective: {state['agent3_perspectives'][-1]}"
        )
        user_content += (
            "\n\n**You must explicitly list how this new solution differs from the previous solution,** "
            "point by point, explaining what changes were made in response to the critique and competitor perspective."
            "\nAfterward, provide your updated solution."
        )
    else:
        user_content += "Research this problem and generate a solution."

    search_query = self.llm.invoke(
        f"Here is a problem description: {state['question']}. Turn it into a short query to be fed into a search engine."
    ).content
    if '"' in search_query:
        search_query = search_query.split('"')[1]
    raw_search_results = self.search_tool.invoke(search_query)

    # Parse the results if possible, so we can collect URLs
    new_state = state.copy()
    new_state["question_search_query"] = search_query
    if "visited_sites" not in new_state:
        new_state["visited_sites"] = []

    try:
        if isinstance(raw_search_results, str):
            results_list = ast.literal_eval(raw_search_results)
        else:
            results_list = raw_search_results
        # Each item typically might have "link", "title", "snippet"
        for item in results_list:
            link = item.get("link")
            if link:
                print(f"[DEBUG] Appending visited link: {link}")
                new_state["visited_sites"].append(link)
    except (ValueError, SyntaxError, TypeError):
        # If it's not valid Python syntax or something else goes wrong
        print("[DEBUG] Could not parse search results as Python list.")
        print("[DEBUG] raw_search_results:", raw_search_results)

    user_content += f"\nSearch results: {raw_search_results}"

    # Provide a system message to define this agent's role
    messages = [
        SystemMessage(content=self.hypothesizer_prompt),
        HumanMessage(content=user_content),
    ]
    solution = self.llm.invoke(messages)

    new_state["agent1_solution"].append(solution.content)

    # Print the entire solution in green
    print(
        f"{GREEN}[Agent1 - Hypothesizer solution]\n{solution.content}{RESET}"
    )
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Exiting agent1_generate_solution."
    )
    return new_state

agent2_critique(state)

Agent 2: Critic.

Source code in src/ursa/agents/hypothesizer_agent.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def agent2_critique(self, state: HypothesizerState) -> HypothesizerState:
    """Agent 2: Critic."""
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Entering agent2_critique."
    )

    solution = state["agent1_solution"][-1]
    user_content = (
        f"Question: {state['question']}\n"
        f"Proposed solution: {solution}\n"
        "Provide a detailed critique of this solution. Identify potential flaws, assumptions, and areas for improvement."
    )

    fact_check_query = f"fact check {state['question_search_query']} solution effectiveness"

    raw_search_results = self.search_tool.invoke(fact_check_query)

    # Parse the results if possible, so we can collect URLs
    new_state = state.copy()
    if "visited_sites" not in new_state:
        new_state["visited_sites"] = []

    try:
        if isinstance(raw_search_results, str):
            results_list = ast.literal_eval(raw_search_results)
        else:
            results_list = raw_search_results
        # Each item typically might have "link", "title", "snippet"
        for item in results_list:
            link = item.get("link")
            if link:
                print(f"[DEBUG] Appending visited link: {link}")
                new_state["visited_sites"].append(link)
    except (ValueError, SyntaxError, TypeError):
        # If it's not valid Python syntax or something else goes wrong
        print("[DEBUG] Could not parse search results as Python list.")
        print("[DEBUG] raw_search_results:", raw_search_results)

    fact_check_results = raw_search_results
    user_content += f"\nFact check results: {fact_check_results}"

    messages = [
        SystemMessage(content=self.critic_prompt),
        HumanMessage(content=user_content),
    ]
    critique = self.llm.invoke(messages)

    new_state["agent2_critiques"].append(critique.content)

    # Print the entire critique in blue
    print(f"{BLUE}[Agent2 - Critic]\n{critique.content}{RESET}")
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Exiting agent2_critique."
    )
    return new_state

agent3_competitor_perspective(state)

Agent 3: Competitor/Stakeholder Simulator.

Source code in src/ursa/agents/hypothesizer_agent.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def agent3_competitor_perspective(
    self, state: HypothesizerState
) -> HypothesizerState:
    """Agent 3: Competitor/Stakeholder Simulator."""
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Entering agent3_competitor_perspective."
    )

    solution = state["agent1_solution"][-1]
    critique = state["agent2_critiques"][-1]

    user_content = (
        f"Question: {state['question']}\n"
        f"Proposed solution: {solution}\n"
        f"Critique: {critique}\n"
        "Simulate how a competitor, government agency, or other stakeholder might respond to this solution."
    )

    competitor_search_query = (
        f"competitor responses to {state['question_search_query']}"
    )

    raw_search_results = self.search_tool.invoke(competitor_search_query)

    # Parse the results if possible, so we can collect URLs
    new_state = state.copy()
    if "visited_sites" not in new_state:
        new_state["visited_sites"] = []

    try:
        if isinstance(raw_search_results, str):
            results_list = ast.literal_eval(raw_search_results)
        else:
            results_list = raw_search_results
        # Each item typically might have "link", "title", "snippet"
        for item in results_list:
            link = item.get("link")
            if link:
                print(f"[DEBUG] Appending visited link: {link}")
                new_state["visited_sites"].append(link)
    except (ValueError, SyntaxError, TypeError):
        # If it's not valid Python syntax or something else goes wrong
        print("[DEBUG] Could not parse search results as Python list.")
        print("[DEBUG] raw_search_results:", raw_search_results)

    competitor_info = raw_search_results
    user_content += f"\nCompetitor information: {competitor_info}"

    messages = [
        SystemMessage(content=self.competitor_prompt),
        HumanMessage(content=user_content),
    ]
    perspective = self.llm.invoke(messages)

    new_state["agent3_perspectives"].append(perspective.content)

    # Print the entire perspective in red
    print(
        f"{RED}[Agent3 - Competitor/Stakeholder Perspective]\n{perspective.content}{RESET}"
    )
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Exiting agent3_competitor_perspective."
    )
    return new_state

generate_solution(state)

Generate the overall, refined solution based on all iterations.

Source code in src/ursa/agents/hypothesizer_agent.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def generate_solution(self, state: HypothesizerState) -> HypothesizerState:
    """Generate the overall, refined solution based on all iterations."""
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Entering generate_solution."
    )
    prompt = f"Original question: {state['question']}\n\n"
    prompt += "Evolution of solutions:\n"

    for i in range(state["max_iterations"]):
        prompt += f"\nIteration {i + 1}:\n"
        prompt += f"Solution: {state['agent1_solution'][i]}\n"
        prompt += f"Critique: {state['agent2_critiques'][i]}\n"
        prompt += (
            f"Competitor perspective: {state['agent3_perspectives'][i]}\n"
        )

    prompt += "\nBased on this iterative process, provide the overall, refined solution."

    print(
        f"[iteration {state['current_iteration']} - DEBUG] Generating overall solution with LLM..."
    )
    solution = self.llm.invoke(prompt)
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Overall solution obtained. Preview:",
        solution.content[:200],
        "...",
    )

    new_state = state.copy()
    new_state["solution"] = solution.content

    print(
        f"[iteration {state['current_iteration']} - DEBUG] Exiting generate_solution."
    )
    return new_state

summarize_process_as_latex(state)

Summarize how the solution changed over time, referencing each iteration's critique and competitor perspective, then produce a final LaTeX document.

Source code in src/ursa/agents/hypothesizer_agent.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
def summarize_process_as_latex(
    self, state: HypothesizerState
) -> HypothesizerState:
    """
    Summarize how the solution changed over time, referencing
    each iteration's critique and competitor perspective,
    then produce a final LaTeX document.
    """
    print("[DEBUG] Entering summarize_process_as_latex.")
    llm_model = state.get("llm_model", "unknown_model")

    # Build a single string describing the entire iterative process
    iteration_details = ""
    for i, (sol, crit, comp) in enumerate(
        zip(
            state["agent1_solution"],
            state["agent2_critiques"],
            state["agent3_perspectives"],
        ),
        start=1,
    ):
        iteration_details += (
            f"\\subsection*{{Iteration {i}}}\n\n"
            f"\\textbf{{Solution:}}\\\\\n{sol}\n\n"
            f"\\textbf{{Critique:}}\\\\\n{crit}\n\n"
            f"\\textbf{{Competitor Perspective:}}\\\\\n{comp}\n\n"
        )

    # -----------------------------
    # Write iteration_details to disk as .txt
    # -----------------------------
    timestamp_str = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    txt_filename = (
        f"iteration_details_{llm_model}_{timestamp_str}_chat_history.txt"
    )
    with open(txt_filename, "w", encoding="utf-8") as f:
        f.write(iteration_details)

    print(f"[DEBUG] Wrote iteration details to {txt_filename}.")

    # Prompt the LLM to produce a LaTeX doc
    # We'll just pass it as a single string to the LLM;
    # you could also do system+human messages if you prefer.
    prompt = f"""\
        You are a system that produces a FULL LaTeX document.
        Here is information about a multi-iteration process:

        Original question: {state["question"]}

        Below are the solutions, critiques, and competitor perspectives from each iteration:

        {iteration_details}

        The solution we arrived at was:

        {state["solution"]}

        Now produce a valid LaTeX document.  Be sure to use a table of contents.
        It must start with an Executive Summary (that may be multiple pages) which summarizes
        the entire iterative process.  Following that, we should include the solution in full,
        not summarized, but reformatted for appropriate LaTeX.  And then, finally (and this will be
        quite long), we must take all the steps - solutions, critiques, and competitor perspectives
        and *NOT SUMMARIZE THEM* but merely reformat them for the reader.  This will be in an Appendix
        of the full content of the steps.  Finally, include a listing of all of the websites we
        used in our research.

        You must ONLY RETURN LaTeX, nothing else.  It must be valid LaTeX syntax!

        Your output should start with:
        \\documentclass{{article}}
        \\usepackage[margin=1in]{{geometry}}
        etc.

        It must compile without errors under pdflatex. 
    """

    # Now produce a valid LaTeX document that nicely summarizes this entire iterative process.
    # It must include the overall solution in full, not summarized, but reformatted for appropriate
    # LaTeX. The summarization is for the other steps.

    all_visited_sites = state.get("visited_sites", [])
    # (Optional) remove duplicates by converting to a set, then back to a list
    visited_sites_unique = list(set(all_visited_sites))
    if visited_sites_unique:
        websites_latex = "\\section*{Websites Visited}\\begin{itemize}\n"
        for url in visited_sites_unique:
            print(f"We visited: {url}")
            # Use \url{} to handle special characters in URLs
            websites_latex += f"\\item \\url{{{url}}}\n"
        websites_latex += "\\end{itemize}\n\n"
    else:
        # If no sites visited, or the list is empty
        websites_latex = (
            "\\section*{Websites Visited}\nNo sites were visited.\n\n"
        )
    print(websites_latex)

    # Ask the LLM to produce *only* LaTeX content
    latex_response = self.llm.invoke(prompt)

    latex_doc = latex_response.content

    def inject_into_latex(original_tex: str, injection: str) -> str:
        """
        Find the last occurrence of '\\end{document}' in 'original_tex'
        and insert 'injection' right before it.
        If '\\end{document}' is not found, just append the injection at the end.
        """
        injection_index = original_tex.rfind(r"\end{document}")
        if injection_index == -1:
            # If the LLM didn't include \end{document}, just append
            return original_tex + "\n" + injection
        else:
            # Insert right before \end{document}
            return (
                original_tex[:injection_index]
                + "\n"
                + injection
                + "\n"
                + original_tex[injection_index:]
            )

    final_latex = inject_into_latex(latex_doc, websites_latex)

    new_state = state.copy()
    new_state["summary_report"] = final_latex

    print(
        f"[iteration {state['current_iteration']} - DEBUG] Received LaTeX from LLM. Preview:"
    )
    print(latex_response.content[:300], "...")
    print(
        f"[iteration {state['current_iteration']} - DEBUG] Exiting summarize_process_as_latex."
    )
    return new_state

mp_agent

MaterialsProjectAgent

Bases: BaseAgent

Source code in src/ursa/agents/mp_agent.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class MaterialsProjectAgent(BaseAgent):
    def __init__(
        self,
        llm="openai/o3-mini",
        summarize: bool = True,
        max_results: int = 3,
        database_path: str = "mp_database",
        summaries_path: str = "mp_summaries",
        **kwargs,
    ):
        super().__init__(llm, **kwargs)
        self.summarize = summarize
        self.max_results = max_results
        self.database_path = database_path
        self.summaries_path = summaries_path

        os.makedirs(self.database_path, exist_ok=True)
        os.makedirs(self.summaries_path, exist_ok=True)

        self._action = self._build_graph()

    def _fetch_node(self, state: Dict) -> Dict:
        f = state["query"]
        els = f["elements"]  # e.g. ["Ga","In"]
        bg = (f["band_gap_min"], f["band_gap_max"])
        e_above_hull = (0, 0)  # only on-hull (stable)
        mats = []
        with MPRester() as mpr:
            # get ALL matching materials…
            all_results = mpr.materials.summary.search(
                elements=els,
                band_gap=bg,
                energy_above_hull=e_above_hull,
                is_stable=True,  # equivalent filter
            )
            # …then take only the first `max_results`
            for doc in all_results[: self.max_results]:
                mid = doc.material_id
                data = doc.dict()
                # cache to disk
                path = os.path.join(self.database_path, f"{mid}.json")
                if not os.path.exists(path):
                    with open(path, "w") as f:
                        json.dump(data, f, indent=2)
                mats.append({"material_id": mid, "metadata": data})

        return {**state, "materials": mats}

    def _summarize_node(self, state: Dict) -> Dict:
        """Summarize each material via LLM over its metadata."""
        # prompt template
        prompt = ChatPromptTemplate.from_template("""
You are a materials-science assistant. Given the following metadata about a material, produce a concise summary focusing on its key properties:

{metadata}
        """)
        chain = prompt | self.llm | StrOutputParser()

        summaries = [None] * len(state["materials"])

        def process(i, mat):
            mid = mat["material_id"]
            meta = mat["metadata"]
            # flatten metadata to text
            text = "\n".join(f"{k}: {v}" for k, v in meta.items())
            # build or load summary
            summary_file = os.path.join(
                self.summaries_path, f"{mid}_summary.txt"
            )
            if os.path.exists(summary_file):
                with open(summary_file) as f:
                    return i, f.read()
            # optional: vectorize & retrieve, but here we just summarize full text
            result = chain.invoke({"metadata": text})
            with open(summary_file, "w") as f:
                f.write(result)
            return i, result

        with ThreadPoolExecutor(
            max_workers=min(8, len(state["materials"]))
        ) as exe:
            futures = [
                exe.submit(process, i, m)
                for i, m in enumerate(state["materials"])
            ]
            for future in tqdm(futures, desc="Summarizing materials"):
                i, summ = future.result()
                summaries[i] = summ

        return {**state, "summaries": summaries}

    def _aggregate_node(self, state: Dict) -> Dict:
        """Combine all summaries into a single, coherent answer."""
        combined = "\n\n----\n\n".join(
            f"[{i + 1}] {m['material_id']}\n\n{summary}"
            for i, (m, summary) in enumerate(
                zip(state["materials"], state["summaries"])
            )
        )

        prompt = ChatPromptTemplate.from_template("""
        You are a materials informatics assistant. Below are brief summaries of several materials:

        {summaries}

        Answer the user’s question in context:

        {context}
                """)
        chain = prompt | self.llm | StrOutputParser()
        final = chain.invoke({
            "summaries": combined,
            "context": state["context"],
        })
        return {**state, "final_summary": final}

    def _build_graph(self):
        graph = StateGraph(dict)  # using plain dict for state
        self.add_node(graph, self._fetch_node)
        if self.summarize:
            self.add_node(graph, self._summarize_node)
            self.add_node(graph, self._aggregate_node)

            graph.set_entry_point("_fetch_node")
            graph.add_edge("_fetch_node", "_summarize_node")
            graph.add_edge("_summarize_node", "_aggregate_node")
            graph.set_finish_point("_aggregate_node")
        else:
            graph.set_entry_point("_fetch_node")
            graph.set_finish_point("_fetch_node")
        return graph.compile(checkpointer=self.checkpointer)

    def _invoke(
        self,
        inputs: Mapping[str, Any],
        *,
        summarize: bool | None = None,
        recursion_limit: int = 1000,
        **_,
    ) -> str:
        config = self.build_config(
            recursion_limit=recursion_limit, tags=["graph"]
        )

        if "query" not in inputs:
            if "mp_query" in inputs:
                # make a shallow copy and rename the key
                inputs = dict(inputs)
                inputs["query"] = inputs.pop("mp_query")
            else:
                raise KeyError(
                    "Missing 'query' in inputs (alias 'mp_query' also accepted)."
                )

        result = self._action.invoke(inputs, config)

        use_summary = self.summarize if summarize is None else summarize
        return (
            result.get("final_summary", "No summary generated.")
            if use_summary
            else "\n\nFinished Fetching Materials Database Information!"
        )

optimization_agent

run_cmd(query, state)

Run a commandline command from using the subprocess package in python

Parameters:

Name Type Description Default
query str

commandline command to be run as a string given to the subprocess.run command.

required
Source code in src/ursa/agents/optimization_agent.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@tool
def run_cmd(query: str, state: Annotated[dict, InjectedState]) -> str:
    """
    Run a commandline command from using the subprocess package in python

    Args:
        query: commandline command to be run as a string given to the subprocess.run command.
    """
    workspace_dir = state["workspace"]
    print("RUNNING: ", query)
    try:
        process = subprocess.Popen(
            query.split(" "),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            cwd=workspace_dir,
        )

        stdout, stderr = process.communicate(timeout=60000)
    except KeyboardInterrupt:
        print("Keyboard Interrupt of command: ", query)
        stdout, stderr = "", "KeyboardInterrupt:"

    print("STDOUT: ", stdout)
    print("STDERR: ", stderr)

    return f"STDOUT: {stdout} and STDERR: {stderr}"

write_code(code, filename, state)

Writes python or Julia code to a file in the given workspace as requested.

Parameters:

Name Type Description Default
code str

The code to write

required
filename str

the filename with an appropriate extension for programming language (.py for python, .jl for Julia, etc.)

required

Returns:

Type Description
str

Execution results

Source code in src/ursa/agents/optimization_agent.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
@tool
def write_code(
    code: str, filename: str, state: Annotated[dict, InjectedState]
) -> str:
    """
    Writes python or Julia code to a file in the given workspace as requested.

    Args:
        code: The code to write
        filename: the filename with an appropriate extension for programming language (.py for python, .jl for Julia, etc.)

    Returns:
        Execution results
    """
    workspace_dir = state["workspace"]
    print("Writing filename ", filename)
    try:
        # Extract code if wrapped in markdown code blocks
        if "```" in code:
            code_parts = code.split("```")
            if len(code_parts) >= 3:
                # Extract the actual code
                if "\n" in code_parts[1]:
                    code = "\n".join(code_parts[1].strip().split("\n")[1:])
                else:
                    code = code_parts[2].strip()

        # Write code to a file
        code_file = os.path.join(workspace_dir, filename)

        with open(code_file, "w") as f:
            f.write(code)
        print(f"Written code to file: {code_file}")

        return f"File {filename} written successfully."

    except Exception as e:
        print(f"Error generating code: {str(e)}")
        # Return minimal code that prints the error
        return f"Failed to write {filename} successfully."

websearch_agent

WebSearchAgent

Bases: BaseAgent

Source code in src/ursa/agents/websearch_agent.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class WebSearchAgent(BaseAgent):
    def __init__(
        self, llm: str | BaseChatModel = "openai/gpt-4o-mini", **kwargs
    ):
        super().__init__(llm, **kwargs)
        self.websearch_prompt = websearch_prompt
        self.reflection_prompt = reflection_prompt
        self.tools = [search_tool, process_content]  # + cb_tools
        self.has_internet = self._check_for_internet(
            kwargs.get("url", "http://www.lanl.gov")
        )
        self._build_graph()

    def _review_node(self, state: WebSearchState) -> WebSearchState:
        if not self.has_internet:
            return {
                "messages": [
                    HumanMessage(
                        content="No internet for WebSearch Agent so no research to review."
                    )
                ],
                "urls_visited": [],
            }

        translated = [SystemMessage(content=reflection_prompt)] + state[
            "messages"
        ]
        res = self.llm.invoke(
            translated, {"configurable": {"thread_id": self.thread_id}}
        )
        return {"messages": [HumanMessage(content=res.content)]}

    def _response_node(self, state: WebSearchState) -> WebSearchState:
        if not self.has_internet:
            return {
                "messages": [
                    HumanMessage(
                        content="No internet for WebSearch Agent. No research carried out."
                    )
                ],
                "urls_visited": [],
            }

        messages = state["messages"] + [SystemMessage(content=summarize_prompt)]
        response = self.llm.invoke(
            messages, {"configurable": {"thread_id": self.thread_id}}
        )

        urls_visited = []
        for message in messages:
            if message.model_dump().get("tool_calls", []):
                if "url" in message.tool_calls[0]["args"]:
                    urls_visited.append(message.tool_calls[0]["args"]["url"])
        return {"messages": [response.content], "urls_visited": urls_visited}

    def _check_for_internet(self, url, timeout=2):
        """
        Checks for internet connectivity by attempting an HTTP GET request.
        """
        try:
            requests.get(url, timeout=timeout)
            return True
        except (requests.ConnectionError, requests.Timeout):
            return False

    def _state_store_node(self, state: WebSearchState) -> WebSearchState:
        state["thread_id"] = self.thread_id
        return state
        # return dict(**state, thread_id=self.thread_id)

    def _create_react(self, state: WebSearchState) -> WebSearchState:
        react_agent = create_react_agent(
            self.llm,
            self.tools,
            state_schema=WebSearchState,
            prompt=self.websearch_prompt,
        )
        return react_agent.invoke(state)

    def _build_graph(self):
        graph = StateGraph(WebSearchState)
        self.add_node(graph, self._state_store_node)
        self.add_node(graph, self._create_react)
        self.add_node(graph, self._review_node)
        self.add_node(graph, self._response_node)

        graph.set_entry_point("_state_store_node")
        graph.add_edge("_state_store_node", "_create_react")
        graph.add_edge("_create_react", "_review_node")
        graph.set_finish_point("_response_node")

        graph.add_conditional_edges(
            "_review_node",
            should_continue,
            {
                "_create_react": "_create_react",
                "_response_node": "_response_node",
            },
        )
        self._action = graph.compile(checkpointer=self.checkpointer)
        # self._action.get_graph().draw_mermaid_png(output_file_path="./websearch_agent_graph.png", draw_method=MermaidDrawMethod.PYPPETEER)

    def _invoke(
        self, inputs: Mapping[str, Any], recursion_limit: int = 1000, **_
    ):
        config = self.build_config(
            recursion_limit=recursion_limit, tags=["graph"]
        )
        return self._action.invoke(inputs, config)

process_content(url, context, state)

Processes content from a given webpage.

Parameters:

Name Type Description Default
url str

string with the url to obtain text content from.

required
context str

string summary of the information the agent wants from the url for summarizing salient information.

required
Source code in src/ursa/agents/websearch_agent.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def process_content(
    url: str, context: str, state: Annotated[dict, InjectedState]
) -> str:
    """
    Processes content from a given webpage.

    Args:
        url: string with the url to obtain text content from.
        context: string summary of the information the agent wants from the url for summarizing salient information.
    """
    print("Parsing information from ", url)
    response = requests.get(url)
    soup = BeautifulSoup(response.content, "html.parser")

    content_prompt = f"""
    Here is the full content:
    {soup.get_text()}

    Carefully summarize the content in full detail, given the following context:
    {context}
    """
    summarized_information = (
        state["model"]
        .invoke(
            content_prompt, {"configurable": {"thread_id": state["thread_id"]}}
        )
        .content
    )
    return summarized_information