IPCRecoveryClient
An IPC client with automatic connection recovery. Wraps
IPCClient to provide automatic reconnection when the
server becomes unavailable, with session reattachment, exponential
backoff, and status callbacks for UI updates.
from jaato_sdk.client.config import RecoveryConfig, get_recovery_config
import asyncio
from jaato_sdk.client import IPCRecoveryClient, ConnectionState
from jaato_sdk.client.recovery import ConnectionStatus
def on_status(status: ConnectionStatus):
if status.state == ConnectionState.RECONNECTING:
print(f"Reconnecting... attempt "
f"{status.attempt}/{status.max_attempts}")
async def main():
client = IPCRecoveryClient(
socket_path="/tmp/jaato.sock",
on_status_change=on_status,
)
await client.connect()
# Create session with an agent profile
session_id = await client.create_session(
"work", profile="researcher-claude"
)
client.set_session_id(session_id)
await client.send_message("Hello!")
async for event in client.events():
handle_event(event)
await client.close()
asyncio.run(main())
Constructor
__init__
socket_path: str = DEFAULT_SOCKET_PATH,
config: Optional[RecoveryConfig] = None,
auto_start: bool = True,
env_file: str = ".env",
workspace_path: Optional[Path] = None,
on_status_change: Optional[StatusCallback] = None
)
Creates a new recovery client instance.
-
socket_path
str
optional
Path to Unix domain socket or Windows named pipe. Defaults to
DEFAULT_SOCKET_PATH. -
config
RecoveryConfig
optional
Recovery configuration controlling reconnection behavior. If
None, loads automatically from config files and environment variables viaget_recovery_config(). -
auto_start
bool
optional
Whether to auto-start the server daemon if it is not already running. Default:
True. -
env_file
str
optional
Path to
.envfile passed to the auto-started server. Default:".env". -
workspace_path
Path
optional
Workspace path for loading project-level config (
.jaato/client.json). -
on_status_change
StatusCallback
optional
Callback invoked on every connection state transition. Receives a ConnectionStatus object. Signature:
(status: ConnectionStatus) -> None.
from jaato_sdk.client import IPCRecoveryClient
# Config loaded from ~/.jaato/client.json,
# .jaato/client.json, and JAATO_IPC_* env vars
client = IPCRecoveryClient(
socket_path="/tmp/jaato.sock",
)
from jaato_sdk.client import IPCRecoveryClient
from jaato_sdk.client.config import RecoveryConfig
config = RecoveryConfig(
max_attempts=5,
base_delay=2.0,
max_delay=30.0,
)
client = IPCRecoveryClient(
socket_path="/tmp/jaato.sock",
config=config,
auto_start=False,
on_status_change=on_status,
)
Connection
connect
Connect to the server. If auto_start is enabled
and the server is not running, starts it automatically.
-
timeout
float
optional
Connection timeout in seconds. Default:
5.0.
ConnectionError if connection fails.ConnectionClosedError if the client is already closed.
disconnect
Disconnect gracefully. Cancels any reconnection in progress and
transitions to DISCONNECTED. The client can be
reconnected later by calling connect() again.
close
Permanently close the connection. Transitions to the terminal
CLOSED state — no further reconnection attempts
will be made.
close(), the client cannot be reconnected.
Use disconnect() for temporary disconnection.
is_connected property
Returns True when the state is CONNECTED.
is_reconnecting property
Returns True when the state is RECONNECTING.
is_closed property
Returns True when the state is CLOSED.
state property
Returns the current ConnectionState.
client_id property
Returns the client ID assigned by the server, or None if not connected.
client = IPCRecoveryClient("/tmp/jaato.sock")
# Connect (auto-starts server if needed)
await client.connect(timeout=10.0)
# Check state
print(client.is_connected) # True
print(client.state) # ConnectionState.CONNECTED
print(client.client_id) # "client-abc123"
# ... use client ...
# Temporary disconnect (can reconnect later)
await client.disconnect()
print(client.is_connected) # False
# Reconnect
await client.connect()
# Permanent close (no more reconnection)
await client.close()
print(client.is_closed) # True
Sessions
create_session
name: Optional[str] = None,
profile: Optional[str] = None
) -> Optional[str]
Create a new session on the server. Automatically tracks the session ID for recovery. Optionally initialize the session with a predefined agent profile that configures the model, provider, tools, system instructions, and GC settings.
-
name
str
optional
Human-readable name for the session.
-
profile
str
optional
Name of an agent profile from
.jaato/profiles/to apply. The profile configures the session's model, provider, tools, system instructions, and GC settings. Profile files are JSON files in the workspace's.jaato/profiles/directory.
ReconnectingError if currently reconnecting.ConnectionClosedError if connection is closed.
attach_session
Attach to an existing session. Tracks the session ID for recovery.
-
session_id
str
required
The session to attach to.
set_session_id
Manually set the session ID for reattachment after reconnection.
Call this when the session was created or attached through other
means (e.g., from a SessionInfoEvent).
-
session_id
str
required
The session ID to track for recovery.
session_id property
Returns the current session ID, or None if no session is tracked.
list_profiles
Request the server to list available agent profiles. The server responds
with a SessionProfilesEvent containing profile summaries
(name, description, model, provider, icon). Profiles are discovered from
.jaato/profiles/*.json files in the workspace.
ReconnectingError if currently reconnecting.ConnectionClosedError if connection is closed.
client = IPCRecoveryClient("/tmp/jaato.sock")
await client.connect()
# List available profiles first
await client.list_profiles()
async for event in client.events():
if isinstance(event, SessionProfilesEvent):
for p in event.profiles:
print(f"{p['name']}: {p['description']}")
break
# Create a session with a profile
session_id = await client.create_session(
name="research-task",
profile="researcher-claude",
)
client.set_session_id(session_id)
client = IPCRecoveryClient("/tmp/jaato.sock")
await client.connect()
# Create a new session (auto-tracked)
session_id = await client.create_session("my-session")
print(client.session_id) # "session-abc123"
# Or attach to existing session (auto-tracked)
await client.attach_session("session-xyz789")
print(client.session_id) # "session-xyz789"
# If you get the session ID from an event
# instead of create_session/attach_session:
from jaato_sdk.events import SessionInfoEvent
async for event in client.events():
if isinstance(event, SessionInfoEvent):
client.set_session_id(event.session_id)
# Now recovery will reattach to this session
Messaging
send_message
text: str,
attachments: Optional[list] = None
) -> None
Send a message to the model via the server.
-
text
str
required
The message text.
-
attachments
list
optional
File attachments to include with the message.
ReconnectingError if currently reconnecting.ConnectionClosedError if connection is closed.
events
Async iterator that yields events from the server. Automatically handles reconnection — if the connection is lost and recovery is enabled, the iterator waits for reconnection and then resumes yielding events from the new connection.
stop
Stop the current operation. Only effective when connected. Does not raise during reconnection.
execute_command
command: str,
args: Optional[list] = None
) -> None
Execute a server-side command.
respond_to_permission
request_id: str,
response: str,
edited_arguments: Optional[Dict[str, Any]] = None
) -> None
Respond to a permission request from the server.
respond_to_clarification
request_id: str,
response: str
) -> None
Respond to a clarification question from the model.
from jaato_sdk.events import (
AgentOutputEvent,
PermissionRequestedEvent,
)
await client.send_message("List files in /tmp")
async for event in client.events():
if isinstance(event, AgentOutputEvent):
print(event.text, end="")
elif isinstance(event, PermissionRequestedEvent):
await client.respond_to_permission(
event.request_id,
"y", # approve
)
# events() handles reconnection automatically.
# No special try/except needed — the iterator
# waits during RECONNECTING and resumes when
# the connection is restored.
async for event in client.events():
handle_event(event)
# If server restarts, events() reconnects
# and resumes yielding from the new connection.
from jaato_sdk.client.recovery import (
ReconnectingError,
ConnectionClosedError,
)
async def send_with_retry(client, message):
while True:
try:
await client.send_message(message)
return
except ReconnectingError:
# Wait for reconnection, then retry
while client.is_reconnecting:
await asyncio.sleep(0.5)
if client.is_closed:
raise ConnectionClosedError()
Status
get_status
Get current connection status. Returns a snapshot of the connection state, reconnection progress, and session info.
status = client.get_status()
print(f"State: {status.state.value}")
print(f"Session: {status.session_id}")
print(f"Client: {status.client_id}")
if status.state == ConnectionState.RECONNECTING:
print(f"Attempt: {status.attempt}/{status.max_attempts}")
if status.next_retry_in is not None:
print(f"Next retry in: {status.next_retry_in:.1f}s")
Configuration
RecoveryConfig
RecoveryConfig
Controls reconnection behavior. Uses exponential backoff with jitter for retry delays.
| Field | Type | Default | Description |
|---|---|---|---|
enabled |
bool |
True |
Whether automatic reconnection is enabled |
max_attempts |
int |
10 |
Maximum reconnection attempts before giving up (min: 1) |
base_delay |
float |
1.0 |
Initial backoff delay in seconds (min: 0.1) |
max_delay |
float |
60.0 |
Maximum backoff delay cap in seconds (must be ≥ base_delay) |
jitter_factor |
float |
0.3 |
Random jitter range, 0.0–1.0 (0.3 = ±30%) |
connection_timeout |
float |
5.0 |
Timeout per connection attempt in seconds (min: 1.0) |
reattach_session |
bool |
True |
Automatically reattach to previous session after reconnecting |
get_recovery_config
Load recovery config with layered precedence: built-in defaults
→ user config (~/.jaato/client.json)
→ project config (.jaato/client.json)
→ environment variables (highest).
load_client_config
Load the full client configuration. Returns a ClientConfig
object containing the recovery section and any future
configuration sections.
Environment Variables
| Variable | Config Field | Default |
|---|---|---|
JAATO_IPC_AUTO_RECONNECT |
enabled |
true |
JAATO_IPC_RETRY_MAX_ATTEMPTS |
max_attempts |
10 |
JAATO_IPC_RETRY_BASE_DELAY |
base_delay |
1.0 |
JAATO_IPC_RETRY_MAX_DELAY |
max_delay |
60.0 |
JAATO_IPC_RETRY_JITTER |
jitter_factor |
0.3 |
JAATO_IPC_CONNECTION_TIMEOUT |
connection_timeout |
5.0 |
JAATO_IPC_REATTACH_SESSION |
reattach_session |
true |
from jaato_sdk.client.config import RecoveryConfig
# Custom config
config = RecoveryConfig(
enabled=True,
max_attempts=5,
base_delay=2.0,
max_delay=30.0,
jitter_factor=0.2,
connection_timeout=10.0,
reattach_session=True,
)
client = IPCRecoveryClient(
socket_path="/tmp/jaato.sock",
config=config,
)
from pathlib import Path
from jaato_sdk.client.config import (
get_recovery_config,
load_client_config,
)
# Load just recovery config
config = get_recovery_config(
workspace_path=Path.cwd()
)
# Or load full client config
full_config = load_client_config(
workspace_path=Path.cwd()
)
config = full_config.recovery
{
"recovery": {
"enabled": true,
"max_attempts": 10,
"base_delay": 1.0,
"max_delay": 60.0,
"jitter_factor": 0.3,
"connection_timeout": 5.0,
"reattach_session": true
}
}
# Override via environment (highest precedence)
export JAATO_IPC_AUTO_RECONNECT=true
export JAATO_IPC_RETRY_MAX_ATTEMPTS=20
export JAATO_IPC_RETRY_BASE_DELAY=0.5
export JAATO_IPC_CONNECTION_TIMEOUT=10.0
Types
ConnectionState
Connection state enum. String values match the state names in lowercase.
| Member | Value | Description |
|---|---|---|
DISCONNECTED |
"disconnected" |
Not connected, no reconnection in progress |
CONNECTING |
"connecting" |
Initial connection or retry attempt in progress |
CONNECTED |
"connected" |
Successfully connected to server |
RECONNECTING |
"reconnecting" |
Connection lost, waiting for next retry |
DISCONNECTING |
"disconnecting" |
Graceful disconnect initiated |
CLOSED |
"closed" |
Terminal state, no more reconnection attempts |
ConnectionStatus
ConnectionStatus
Snapshot of connection state for UI display. Passed to the on_status_change callback.
| Field | Type | Description |
|---|---|---|
state |
ConnectionState |
Current connection state |
attempt |
int |
Current reconnection attempt (0 if not reconnecting) |
max_attempts |
int |
Maximum reconnection attempts configured |
next_retry_in |
float | None |
Seconds until next retry (None if not waiting) |
last_error |
str | None |
Description of the last error encountered |
session_id |
str | None |
ID of the attached session |
client_id |
str | None |
ID assigned by server |
StatusCallback
Type alias for the status change callback function.
ReconnectingError
Raised when an operation (e.g., send_message) is attempted
while the client is reconnecting. Catch this to queue messages for retry.
ConnectionClosedError
Raised when an operation is attempted after the connection has been
permanently closed via close() or after max reconnection
attempts are exhausted.
# Main client and state enum
from jaato_sdk.client import (
IPCRecoveryClient,
ConnectionState,
)
# Recovery-specific types
from jaato_sdk.client.recovery import (
ConnectionStatus,
StatusCallback,
ReconnectingError,
ConnectionClosedError,
)
# Configuration
from jaato_sdk.client.config import (
RecoveryConfig,
ClientConfig,
get_recovery_config,
load_client_config,
)
from jaato_sdk.client.recovery import (
ReconnectingError,
ConnectionClosedError,
)
try:
await client.send_message("Hello!")
except ReconnectingError:
# Queue the message, retry after reconnect
message_queue.append("Hello!")
except ConnectionClosedError:
# Connection is permanently closed
print("Connection lost. Please restart.")