Command Dispatch

The dispatch system routes commands from processes to handlers. Processes yield commands with correlation tags, handlers execute async work, and results flow back via event queues.

Flow

sequenceDiagram
    participant P as Process
    participant W as Worker
    participant R as Registry
    participant H as Handler

    P->>W: yield(command, tag)
    W->>R: getHandler(cmdID)
    R-->>W: handler
    W->>H: Handle(cmd, tag, receiver)
    H-->>H: async work
    H->>W: CompleteYield(tag, result)
    W->>P: queue event, wake
    P->>P: resume with result

Command Registry

The registry stores handlers in a hybrid structure:

type Registry struct {
    handlers [256]Handler         // System commands: O(1) index
    extended map[CommandID]Handler // Extended commands: map lookup
    frozen   atomic.Bool          // Lock-free after boot
}

System commands (0-255) use array indexing. Extended commands use map lookup. After Freeze(), all lookups are lock-free.

Command ID Ranges

Range Module Examples
1-9 process Send, Spawn, Terminate, Monitor, Link
10-29 clock Sleep, Ticker, Timer
50-59 stream Read, Write, Close, Seek
60-79 http Request, RequestBatch
80-89 websocket Connect, Send, Receive
90-99 event Subscribe, Send
100-119 sql Query, Execute, Transaction ops
120-129 store Get, Set, Delete, Has
130-139 security ValidateToken, CreateToken
140-149 function Call, AsyncStart, AsyncCancel
150-159 exec ProcessWait
160-169 cloudstorage Upload, Download, List, Presigned URLs
170-179 eval Compile, Run, CreateProcess
180-189 workflow SideEffect, Call, Version, UpsertAttrs
190-199 contract Open, Call, AsyncCall, AsyncCancel
256+ custom User-defined services

Registration happens during boot via MustRegisterCommands(). Collisions panic at startup.

Defining Commands

Commands are data structures with a unique CommandID:

const MyCommand dispatcher.CommandID = 200

type MyCmd struct {
    Input  string
    Option int
}

var myCmdPool = sync.Pool{New: func() any { return &MyCmd{} }}

func (c *MyCmd) CmdID() dispatcher.CommandID { return MyCommand }

func (c *MyCmd) Release() {
    c.Input = ""
    c.Option = 0
    myCmdPool.Put(c)
}

Pool reuse eliminates allocation in hot paths. Register at package init:

func init() {
    dispatcher.MustRegisterCommands("myservice", MyCommand)
}

Dispatchers

A dispatcher groups related handlers. It implements RegisterAll to register handlers and lifecycle methods for setup/teardown:

type Handler interface {
    Handle(ctx context.Context, cmd Command, tag uint64, receiver ResultReceiver) error
}

type ResultReceiver interface {
    CompleteYield(tag uint64, data any, err error)
}
type Dispatcher struct {
    // service state
}

func (d *Dispatcher) RegisterAll(register func(id dispatcher.CommandID, h dispatcher.Handler)) {
    register(myapi.MyCommand, dispatcher.HandlerFunc(d.handleMyCommand))
}

func (d *Dispatcher) handleMyCommand(ctx context.Context, cmd Command, tag uint64, receiver ResultReceiver) error {
    c := cmd.(*myapi.MyCmd)
    go func() {
        result := doWork(c)
        if ctx.Err() == nil {
            receiver.CompleteYield(tag, result, nil)
        }
    }()
    return nil
}

Register as a boot component:

func MyDispatcher() boot.Component {
    return boot.New(boot.P{
        Name:      "dispatcher.myservice",
        DependsOn: []boot.Name{DispatcherName},
        Load: func(ctx context.Context) (context.Context, error) {
            reg := dispatcher.GetRegistrar(ctx)
            svc := myservice.NewDispatcher()
            svc.RegisterAll(reg.Register)
            return ctx, nil
        },
    })
}

Yields and Correlation

When a process needs async work, it yields a command with a correlation tag:

type Yield struct {
    Cmd Command
    Tag uint64    // Process-local counter for correlation
}

The worker extracts yields from StepOutput after each step and dispatches them to handlers. Each tag uniquely identifies the request so results can be matched back.

See Also