ToolExecutor
Registry mapping tool names to callable functions with permission checking, auto-backgrounding for long-running tasks, and output callbacks for real-time feedback.
| Module | shared.ai_tool_runner |
| Import | from shared import ToolExecutor |
| Used by | JaatoSession, JaatoClient |
Features
- Tool registration - Map tool names to executor functions
- Permission checking - Integrate with PermissionPlugin for access control
- Auto-backgrounding - Long-running tasks auto-convert to background tasks
- Output callbacks - Real-time streaming output during execution
- Parallel execution - Thread-safe callbacks for concurrent tool calls
JaatoClient or JaatoSession.
Direct use of ToolExecutor is typically only needed for advanced customization.
from shared import ToolExecutor
executor = ToolExecutor()
# Register a tool
def my_tool(args):
return {"result": args.get("input", "") * 2}
executor.register("double_string", my_tool)
# Execute the tool
success, result = executor.execute(
"double_string",
{"input": "hello"}
)
# result: {"result": "hellohello"}
Constructor
Creates a new ToolExecutor instance with optional configuration for token ledger and auto-background behavior.
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
ledger |
TokenLedger |
None |
Token ledger for recording execution events |
auto_background_enabled |
bool |
True |
Enable auto-backgrounding for long-running tasks |
auto_background_pool_size |
int |
4 |
Thread pool size for background execution |
from shared import ToolExecutor
from shared.token_accounting import TokenLedger
# Default configuration
executor = ToolExecutor()
# With token ledger
ledger = TokenLedger()
executor = ToolExecutor(ledger=ledger)
# Custom background pool
executor = ToolExecutor(
auto_background_enabled=True,
auto_background_pool_size=8
)
Registration Methods
register(name, fn)
Register a tool executor function under a given name.
| Parameter | Type | Description |
|---|---|---|
name |
str |
Tool name to register |
fn |
Callable[[Dict], Any] |
Function that accepts args dict and returns result |
clear_executors()
Clear all registered executors. Useful when refreshing tools after enabling/disabling plugins.
# Register tools
def greet(args):
name = args.get("name", "World")
return {"message": f"Hello, {name}!"}
def calculate(args):
a = args.get("a", 0)
b = args.get("b", 0)
op = args.get("op", "add")
if op == "add":
return {"result": a + b}
elif op == "multiply":
return {"result": a * b}
executor.register("greet", greet)
executor.register("calculate", calculate)
# Clear all tools
executor.clear_executors()
execute()
Execute a tool by name with the given arguments. Handles permission checking, auto-backgrounding, and output callbacks automatically.
Parameters
| Parameter | Type | Description |
|---|---|---|
name |
str |
Tool name to execute |
args |
Dict[str, Any] |
Arguments to pass to the tool |
tool_output_callback |
Callable[[str], None] |
Optional callback for streaming output |
call_id |
str |
Optional unique identifier for parallel execution |
Returns
Tuple[bool, Any] - Success flag and result (or error dict).
Result Structure
On success, returns the tool's result. On failure, returns a dict:
error- Error messagetraceback- Full traceback (if exception)_permission- Permission metadata (if applicable)
Executor Metadata
Executors may return a (result_dict, metadata_dict) tuple.
The metadata is merged into the result dict before it is returned.
| Key | Type | Default | Description |
|---|---|---|---|
continuation_id |
str |
— | Groups consecutive tool calls into a single popup session |
show_output |
bool |
true |
Controls main output panel visibility for this call |
show_popup |
bool |
true |
Controls popup panel tracking/visibility for this call |
# Basic execution
success, result = executor.execute(
"greet",
{"name": "Alice"}
)
if success:
print(result["message"])
# "Hello, Alice!"
# With output callback for streaming
def on_output(chunk):
print(f"[stream] {chunk}")
success, result = executor.execute(
"long_running_task",
{"input": "data"},
tool_output_callback=on_output
)
# With call_id for parallel execution
success, result = executor.execute(
"analyze",
{"file": "data.csv"},
call_id="call_12345"
)
# Executor returning metadata:
def _exec_input(self, args):
result = {"output": "..."}
return (result, {
"continuation_id": "sess_1",
"show_output": False,
})
# ToolExecutor merges metadata into result:
success, result = executor.execute(
"shell_input", {"session_id": "sess_1", "text": "ls"}
)
# result == {"output": "...", "continuation_id": "sess_1",
# "show_output": False}
success, result = executor.execute(
"unknown_tool",
{}
)
if not success:
print(result["error"])
# "No executor registered for unknown_tool"
# Permission denied
success, result = executor.execute(
"dangerous_tool",
{"cmd": "rm -rf /"}
)
if not success and "_permission" in result:
print(f"Denied: {result['error']}")
print(f"Method: {result['_permission']['method']}")
Permission Integration
The ToolExecutor integrates with PermissionPlugin to check permissions before executing tools.
set_permission_plugin(plugin, context)
| Parameter | Type | Description |
|---|---|---|
plugin |
PermissionPlugin |
Permission plugin instance, or None to disable |
context |
Dict[str, Any] |
Optional context for permission checks (e.g., session_id) |
Permission Metadata
When permissions are checked, metadata is injected into the result:
decision-"allowed"or"denied"reason- Explanation of the decisionmethod- How the decision was made (whitelist, interactive, etc.)
from shared.plugins.permission import PermissionPlugin
# Create permission plugin
permission = PermissionPlugin(
config_path=".jaato/permissions.json",
channel_type="console"
)
# Set on executor
executor.set_permission_plugin(
permission,
context={"session_id": "abc123"}
)
# Now all executions check permissions
success, result = executor.execute(
"deleteFile",
{"path": "/etc/passwd"}
)
# User prompted for permission (if not whitelisted/blacklisted)
success, result = executor.execute(
"writeFile",
{"path": "output.txt", "content": "data"}
)
if "_permission" in result:
perm = result["_permission"]
print(f"Decision: {perm['decision']}")
print(f"Reason: {perm['reason']}")
print(f"Method: {perm['method']}")
Auto-Background
Tools from BackgroundCapable plugins can be automatically
converted to background tasks when they exceed a time threshold.
How It Works
- Tool starts executing
- If execution exceeds threshold (e.g., 30s), it continues in background
- Returns immediately with task handle
- Use background plugin to check status and get results
Auto-Background Result
When a task is auto-backgrounded, the result contains:
auto_backgrounded-truetask_id- ID to check statusplugin_name- Plugin that owns the tasktool_name- Original tool namethreshold_seconds- Threshold that was exceededmessage- Human-readable explanation
# Tool exceeds threshold
success, result = executor.execute(
"runCommand",
{"command": "npm install"} # Takes > 30s
)
if result.get("auto_backgrounded"):
task_id = result["task_id"]
print(f"Task backgrounded: {task_id}")
print(result["message"])
# "Task exceeded 30s threshold, continuing
# in background. Use task_id 'cli_abc123'
# to check status and output."
# Use background plugin to check status
checkBackgroundTask(task_id="cli_abc123")
# {"task_id": "cli_abc123", "status": "running"}
# Get output when complete
getBackgroundOutput(task_id="cli_abc123")
# {"status": "completed", "result": {...}}
Output Callbacks
ToolExecutor supports two types of callbacks for real-time output during tool execution.
set_output_callback(callback)
Set the general output callback, forwarded to all plugins that support it. Used for agent-level output streaming.
set_tool_output_callback(callback)
Set the tool-specific output callback. Set per-tool-call to route output to the correct tool tree entry in the UI.
Parallel Execution
For parallel tool execution, callbacks are stored in thread-local
storage. Pass tool_output_callback to execute()
to set it for that specific call.
get_current_tool_output_callback()
Module-level function for plugins to get the current thread's callback during parallel execution.
from shared.ai_tool_runner import (
ToolExecutor,
get_current_tool_output_callback
)
executor = ToolExecutor()
# General output callback
def on_output(source, text, mode):
print(f"[{source}] {text}")
executor.set_output_callback(on_output)
# Tool-specific callback
def on_tool_output(chunk):
print(f"[tool] {chunk}")
executor.set_tool_output_callback(on_tool_output)
from concurrent.futures import ThreadPoolExecutor
def run_tool(tool_name, args, call_id):
def callback(chunk):
print(f"[{call_id}] {chunk}")
return executor.execute(
tool_name,
args,
tool_output_callback=callback,
call_id=call_id
)
# Execute tools in parallel
with ThreadPoolExecutor(max_workers=4) as pool:
futures = [
pool.submit(run_tool, "analyze", {"f": "a.py"}, "call_1"),
pool.submit(run_tool, "analyze", {"f": "b.py"}, "call_2"),
]
results = [f.result() for f in futures]
Configuration Methods
set_ledger(ledger)
Set the TokenLedger for recording execution events
(permission checks, auto-background, etc.).
set_registry(registry)
Set the PluginRegistry for plugin lookups. Required
for auto-background support and dynamic tool discovery.
get_output_callback()
Returns the current general output callback.
get_tool_output_callback()
Returns the current tool output callback. Checks thread-local storage first (for parallel execution), then instance-level.
from jaato import PluginRegistry
from shared import ToolExecutor
from shared.token_accounting import TokenLedger
executor = ToolExecutor()
# Set ledger for event recording
ledger = TokenLedger()
executor.set_ledger(ledger)
# Set registry for plugin lookups
registry = PluginRegistry()
registry.expose_all()
executor.set_registry(registry)
# Get current callbacks
output_cb = executor.get_output_callback()
tool_cb = executor.get_tool_output_callback()