Command Dispatch
The dispatch system routes commands from processes to handlers. Processes yield commands with correlation tags, handlers execute async work, and results flow back via event queues.
Flow
sequenceDiagram
participant P as Process
participant W as Worker
participant R as Registry
participant H as Handler
P->>W: yield(command, tag)
W->>R: getHandler(cmdID)
R-->>W: handler
W->>H: Handle(cmd, tag, receiver)
H-->>H: async work
H->>W: CompleteYield(tag, result)
W->>P: queue event, wake
P->>P: resume with result
Command Registry
The registry stores handlers in a hybrid structure:
type Registry struct {
handlers [256]Handler // System commands: O(1) index
extended map[CommandID]Handler // Extended commands: map lookup
frozen atomic.Bool // Lock-free after boot
}
System commands (0-255) use array indexing. Extended commands use map lookup. After Freeze(), all lookups are lock-free.
Command ID Ranges
| Range | Module | Examples |
|---|---|---|
| 1-9 | process | Send, Spawn, Terminate, Monitor, Link |
| 10-29 | clock | Sleep, Ticker, Timer |
| 50-59 | stream | Read, Write, Close, Seek |
| 60-79 | http | Request, RequestBatch |
| 80-89 | websocket | Connect, Send, Receive |
| 90-99 | event | Subscribe, Send |
| 100-119 | sql | Query, Execute, Transaction ops |
| 120-129 | store | Get, Set, Delete, Has |
| 130-139 | security | ValidateToken, CreateToken |
| 140-149 | function | Call, AsyncStart, AsyncCancel |
| 150-159 | exec | ProcessWait |
| 160-169 | cloudstorage | Upload, Download, List, Presigned URLs |
| 170-179 | eval | Compile, Run, CreateProcess |
| 180-189 | workflow | SideEffect, Call, Version, UpsertAttrs |
| 190-199 | contract | Open, Call, AsyncCall, AsyncCancel |
| 256+ | custom | User-defined services |
Registration happens during boot via MustRegisterCommands(). Collisions panic at startup.
Defining Commands
Commands are data structures with a unique CommandID:
const MyCommand dispatcher.CommandID = 200
type MyCmd struct {
Input string
Option int
}
var myCmdPool = sync.Pool{New: func() any { return &MyCmd{} }}
func (c *MyCmd) CmdID() dispatcher.CommandID { return MyCommand }
func (c *MyCmd) Release() {
c.Input = ""
c.Option = 0
myCmdPool.Put(c)
}
Pool reuse eliminates allocation in hot paths. Register at package init:
func init() {
dispatcher.MustRegisterCommands("myservice", MyCommand)
}
Dispatchers
A dispatcher groups related handlers. It implements RegisterAll to register handlers and lifecycle methods for setup/teardown:
type Handler interface {
Handle(ctx context.Context, cmd Command, tag uint64, receiver ResultReceiver) error
}
type ResultReceiver interface {
CompleteYield(tag uint64, data any, err error)
}
type Dispatcher struct {
// service state
}
func (d *Dispatcher) RegisterAll(register func(id dispatcher.CommandID, h dispatcher.Handler)) {
register(myapi.MyCommand, dispatcher.HandlerFunc(d.handleMyCommand))
}
func (d *Dispatcher) handleMyCommand(ctx context.Context, cmd Command, tag uint64, receiver ResultReceiver) error {
c := cmd.(*myapi.MyCmd)
go func() {
result := doWork(c)
if ctx.Err() == nil {
receiver.CompleteYield(tag, result, nil)
}
}()
return nil
}
Register as a boot component:
func MyDispatcher() boot.Component {
return boot.New(boot.P{
Name: "dispatcher.myservice",
DependsOn: []boot.Name{DispatcherName},
Load: func(ctx context.Context) (context.Context, error) {
reg := dispatcher.GetRegistrar(ctx)
svc := myservice.NewDispatcher()
svc.RegisterAll(reg.Register)
return ctx, nil
},
})
}
Yields and Correlation
When a process needs async work, it yields a command with a correlation tag:
type Yield struct {
Cmd Command
Tag uint64 // Process-local counter for correlation
}
The worker extracts yields from StepOutput after each step and dispatches them to handlers. Each tag uniquely identifies the request so results can be matched back.
See Also
- Scheduler - Process execution
- Modules - Lua module integration
- Process Model - High-level concepts