Service Discovery and the Schema Registry in Soorma
In a multi-agent system, every agent needs to answer two questions: who can do this work, and what does their interface look like. In Soorma, both answers live in the Registry service.
Two Roles, One Service
The Registry serves two distinct but complementary purposes:
- Service Discovery — find agents by capability at runtime
- Schema Registry — store and validate the event contracts those agents advertise
These are not separate systems. An agent registers itself with its capabilities and its event schemas in a single operation. Discovery and contract validation are two sides of the same record.
Registering an Agent
When a Worker starts, it registers itself with the platform:
from soorma import Worker
from soorma.context import PlatformContext
worker = Worker(
name="parts-inventory-worker",
capabilities=["check_parts", "reserve_parts"],
events_consumed=["parts.check.requested", "parts.reserve.requested"],
events_produced=["parts.check.completed", "parts.reserve.completed"],
)
The capabilities, events_consumed, and events_produced fields are stored in the Registry on startup. This is the schema contract. Other agents can query it before emitting events, or the platform can validate incoming events against it at the gateway.
Discovering Agents by Capability
A Planner uses context.registry to find a Worker before delegating a task:
from soorma.agents.planner import Planner, GoalContext
from soorma.context import PlatformContext
@planner.on_goal("maintenance.goal")
async def plan_maintenance(goal: GoalContext, context: PlatformContext) -> None:
# Find agents with the "check_parts" capability
capable_workers = await context.registry.discover(requirements=["check_parts"])
if not capable_workers:
# No capable agent registered — escalate or fail gracefully
await context.bus.publish(
topic="business-facts",
event_type="maintenance.failed",
data={"reason": "no_parts_worker_available"},
)
return
# Emit to the discovered agent's consumed event type
await context.bus.request(
event_type="parts.check.requested",
data={"vehicle_id": goal.data.get("vehicle_id")},
response_event="parts.check.completed",
correlation_id=goal.correlation_id,
)
context.registry.discover(requirements=["check_parts"]) returns all registered agents whose capabilities match the requirement. For single-agent delegation, use workers[0]. For fan-out patterns where you want every capable agent to process a task, iterate the full list.
Schema Validation at the Gateway
The Registry's stored event schemas enable the platform to validate event payloads at ingress. When an event arrives at the gateway, the platform can:
- Look up the
events_consumedschema for the target agent - Validate the payload structure before delivery
- Reject malformed events with a structured error response rather than delivering them to agents
This shifts validation left. Your agent code does not need defensive checks on every field — the contract is enforced before the event reaches your handler.
The context.registry API
| Method | Description |
|---|---|
context.registry.discover(requirements=[...]) | Returns agents matching capability task names |
context.registry.discover_agents(consumed_event=...) | Returns agents by consumed event name |
context.registry.register_agent(agent) | Announces capabilities (called automatically on startup) |
context.registry.query_agents(...) | Searches agents by name, capability, or event |
All calls are tenant-scoped. Multi-tenant isolation is enforced at the service layer — an agent in tenant A cannot discover or query agents in tenant B. See ARCHITECTURE_PATTERNS.md Section 4 for the full multi-tenancy specification.
Agent Lifecycle and Re-registration
Agents re-register on every startup. The Registry uses an upsert pattern — if a record for the agent name already exists in the tenant, it is updated in place. This means:
- Capability changes take effect immediately on restart — no manual deregistration step
- Schema changes are reflected in the Registry before the agent starts consuming events
- Stale registrations from crashed instances are overwritten on recovery
The Registry actively tracks agent liveness via TTL. On registration, each agent record is stamped with a last_heartbeat timestamp. Agents must periodically send a heartbeat refresh — an HTTP PUT to /v1/agents/{agent_id}/heartbeat via client.refresh_heartbeat(agent_id) in the SDK — to extend their TTL. If an agent stops sending heartbeats (because it crashed or was removed), its record expires and is cascade-deleted by the Registry's background cleanup job. This means discover() only returns agents with a live heartbeat, not stale registrations from long-dead instances. See test_agent_ttl.py, test_expired_agent_cascade_delete.py, and test_background_cleanup.py in the Registry service tests for the full TTL contract.
What v0.9.1 Ships
The v0.9.1 Registry service includes:
- Agent registration with capability and event schema storage
context.registry.discover()andcontext.registry.discover_agents()discovery methodsclient.refresh_heartbeat()for TTL-based agent liveness- Tenant-scoped isolation via PostgreSQL RLS policies
- Schema storage for registered event types (gateway-level payload validation is planned)
The examples directory includes an example that demonstrates Choreography Pattern using dynamic discovery. For technical deep dive on how to use registry service with LLM based dynamic discovery refer to Autonomous Event Discovery Pattern for details.