ToolExecutor

Registry mapping tool names to callable functions with permission checking, auto-backgrounding for long-running tasks, and output callbacks for real-time feedback.

Moduleshared.ai_tool_runner
Importfrom shared import ToolExecutor
Used byJaatoSession, JaatoClient

Features

  • Tool registration - Map tool names to executor functions
  • Permission checking - Integrate with PermissionPlugin for access control
  • Auto-backgrounding - Long-running tasks auto-convert to background tasks
  • Output callbacks - Real-time streaming output during execution
  • Parallel execution - Thread-safe callbacks for concurrent tool calls
Internal Component
Most users interact with tools through JaatoClient or JaatoSession. Direct use of ToolExecutor is typically only needed for advanced customization.
Basic usage
from shared import ToolExecutor

executor = ToolExecutor()

# Register a tool
def my_tool(args):
    return {"result": args.get("input", "") * 2}

executor.register("double_string", my_tool)

# Execute the tool
success, result = executor.execute(
    "double_string",
    {"input": "hello"}
)
# result: {"result": "hellohello"}

Constructor

Creates a new ToolExecutor instance with optional configuration for token ledger and auto-background behavior.

Parameters

ParameterTypeDefaultDescription
ledger TokenLedger None Token ledger for recording execution events
auto_background_enabled bool True Enable auto-backgrounding for long-running tasks
auto_background_pool_size int 4 Thread pool size for background execution
Constructor
from shared import ToolExecutor
from shared.token_accounting import TokenLedger

# Default configuration
executor = ToolExecutor()

# With token ledger
ledger = TokenLedger()
executor = ToolExecutor(ledger=ledger)

# Custom background pool
executor = ToolExecutor(
    auto_background_enabled=True,
    auto_background_pool_size=8
)

Registration Methods

register(name, fn)

Register a tool executor function under a given name.

ParameterTypeDescription
name str Tool name to register
fn Callable[[Dict], Any] Function that accepts args dict and returns result

clear_executors()

Clear all registered executors. Useful when refreshing tools after enabling/disabling plugins.

Tool registration
# Register tools
def greet(args):
    name = args.get("name", "World")
    return {"message": f"Hello, {name}!"}

def calculate(args):
    a = args.get("a", 0)
    b = args.get("b", 0)
    op = args.get("op", "add")
    if op == "add":
        return {"result": a + b}
    elif op == "multiply":
        return {"result": a * b}

executor.register("greet", greet)
executor.register("calculate", calculate)

# Clear all tools
executor.clear_executors()

execute()

Execute a tool by name with the given arguments. Handles permission checking, auto-backgrounding, and output callbacks automatically.

Parameters

ParameterTypeDescription
name str Tool name to execute
args Dict[str, Any] Arguments to pass to the tool
tool_output_callback Callable[[str], None] Optional callback for streaming output
call_id str Optional unique identifier for parallel execution

Returns

Tuple[bool, Any] - Success flag and result (or error dict).

Result Structure

On success, returns the tool's result. On failure, returns a dict:

  • error - Error message
  • traceback - Full traceback (if exception)
  • _permission - Permission metadata (if applicable)

Executor Metadata

Executors may return a (result_dict, metadata_dict) tuple. The metadata is merged into the result dict before it is returned.

KeyTypeDefaultDescription
continuation_id str Groups consecutive tool calls into a single popup session
show_output bool true Controls main output panel visibility for this call
show_popup bool true Controls popup panel tracking/visibility for this call
Executing tools
# Basic execution
success, result = executor.execute(
    "greet",
    {"name": "Alice"}
)
if success:
    print(result["message"])
    # "Hello, Alice!"

# With output callback for streaming
def on_output(chunk):
    print(f"[stream] {chunk}")

success, result = executor.execute(
    "long_running_task",
    {"input": "data"},
    tool_output_callback=on_output
)

# With call_id for parallel execution
success, result = executor.execute(
    "analyze",
    {"file": "data.csv"},
    call_id="call_12345"
)
Executor metadata tuple
# Executor returning metadata:
def _exec_input(self, args):
    result = {"output": "..."}
    return (result, {
        "continuation_id": "sess_1",
        "show_output": False,
    })

# ToolExecutor merges metadata into result:
success, result = executor.execute(
    "shell_input", {"session_id": "sess_1", "text": "ls"}
)
# result == {"output": "...", "continuation_id": "sess_1",
#            "show_output": False}
Error handling
success, result = executor.execute(
    "unknown_tool",
    {}
)

if not success:
    print(result["error"])
    # "No executor registered for unknown_tool"

# Permission denied
success, result = executor.execute(
    "dangerous_tool",
    {"cmd": "rm -rf /"}
)
if not success and "_permission" in result:
    print(f"Denied: {result['error']}")
    print(f"Method: {result['_permission']['method']}")

Permission Integration

The ToolExecutor integrates with PermissionPlugin to check permissions before executing tools.

set_permission_plugin(plugin, context)

ParameterTypeDescription
plugin PermissionPlugin Permission plugin instance, or None to disable
context Dict[str, Any] Optional context for permission checks (e.g., session_id)

Permission Metadata

When permissions are checked, metadata is injected into the result:

  • decision - "allowed" or "denied"
  • reason - Explanation of the decision
  • method - How the decision was made (whitelist, interactive, etc.)
Permission configuration
from shared.plugins.permission import PermissionPlugin

# Create permission plugin
permission = PermissionPlugin(
    config_path=".jaato/permissions.json",
    channel_type="console"
)

# Set on executor
executor.set_permission_plugin(
    permission,
    context={"session_id": "abc123"}
)

# Now all executions check permissions
success, result = executor.execute(
    "deleteFile",
    {"path": "/etc/passwd"}
)
# User prompted for permission (if not whitelisted/blacklisted)
Permission metadata in result
success, result = executor.execute(
    "writeFile",
    {"path": "output.txt", "content": "data"}
)

if "_permission" in result:
    perm = result["_permission"]
    print(f"Decision: {perm['decision']}")
    print(f"Reason: {perm['reason']}")
    print(f"Method: {perm['method']}")

Auto-Background

Tools from BackgroundCapable plugins can be automatically converted to background tasks when they exceed a time threshold.

How It Works

  1. Tool starts executing
  2. If execution exceeds threshold (e.g., 30s), it continues in background
  3. Returns immediately with task handle
  4. Use background plugin to check status and get results

Auto-Background Result

When a task is auto-backgrounded, the result contains:

  • auto_backgrounded - true
  • task_id - ID to check status
  • plugin_name - Plugin that owns the task
  • tool_name - Original tool name
  • threshold_seconds - Threshold that was exceeded
  • message - Human-readable explanation
Auto-background result
# Tool exceeds threshold
success, result = executor.execute(
    "runCommand",
    {"command": "npm install"}  # Takes > 30s
)

if result.get("auto_backgrounded"):
    task_id = result["task_id"]
    print(f"Task backgrounded: {task_id}")
    print(result["message"])
    # "Task exceeded 30s threshold, continuing
    #  in background. Use task_id 'cli_abc123'
    #  to check status and output."
Check background task
# Use background plugin to check status
checkBackgroundTask(task_id="cli_abc123")
# {"task_id": "cli_abc123", "status": "running"}

# Get output when complete
getBackgroundOutput(task_id="cli_abc123")
# {"status": "completed", "result": {...}}

Output Callbacks

ToolExecutor supports two types of callbacks for real-time output during tool execution.

set_output_callback(callback)

Set the general output callback, forwarded to all plugins that support it. Used for agent-level output streaming.

set_tool_output_callback(callback)

Set the tool-specific output callback. Set per-tool-call to route output to the correct tool tree entry in the UI.

Parallel Execution

For parallel tool execution, callbacks are stored in thread-local storage. Pass tool_output_callback to execute() to set it for that specific call.

get_current_tool_output_callback()

Module-level function for plugins to get the current thread's callback during parallel execution.

Output callbacks
from shared.ai_tool_runner import (
    ToolExecutor,
    get_current_tool_output_callback
)

executor = ToolExecutor()

# General output callback
def on_output(source, text, mode):
    print(f"[{source}] {text}")

executor.set_output_callback(on_output)

# Tool-specific callback
def on_tool_output(chunk):
    print(f"[tool] {chunk}")

executor.set_tool_output_callback(on_tool_output)
Parallel execution
from concurrent.futures import ThreadPoolExecutor

def run_tool(tool_name, args, call_id):
    def callback(chunk):
        print(f"[{call_id}] {chunk}")

    return executor.execute(
        tool_name,
        args,
        tool_output_callback=callback,
        call_id=call_id
    )

# Execute tools in parallel
with ThreadPoolExecutor(max_workers=4) as pool:
    futures = [
        pool.submit(run_tool, "analyze", {"f": "a.py"}, "call_1"),
        pool.submit(run_tool, "analyze", {"f": "b.py"}, "call_2"),
    ]
    results = [f.result() for f in futures]

Configuration Methods

set_ledger(ledger)

Set the TokenLedger for recording execution events (permission checks, auto-background, etc.).

set_registry(registry)

Set the PluginRegistry for plugin lookups. Required for auto-background support and dynamic tool discovery.

get_output_callback()

Returns the current general output callback.

get_tool_output_callback()

Returns the current tool output callback. Checks thread-local storage first (for parallel execution), then instance-level.

Configuration
from jaato import PluginRegistry
from shared import ToolExecutor
from shared.token_accounting import TokenLedger

executor = ToolExecutor()

# Set ledger for event recording
ledger = TokenLedger()
executor.set_ledger(ledger)

# Set registry for plugin lookups
registry = PluginRegistry()
registry.expose_all()
executor.set_registry(registry)

# Get current callbacks
output_cb = executor.get_output_callback()
tool_cb = executor.get_tool_output_callback()