Chain¶
Composable, synchronous step orchestrator.
A Chain lets you declare a sequence of steps that pass values via a mutable
context dictionary. Each step can declare inputs by referencing context keys
(including dotted paths), and write its output to a named key via out.
Features
- Precompiled argument/keyword resolvers (fast, no reflection at runtime)
- Retries with backoff + optional jitter, deadline awareness
- Per-step validation hooks (fail-fast, not retried)
- Structured logging via templated
log_fmt(with{dotted.refs}) - Before/after hooks and message redaction
strictmode to error on unresolved references
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
strict
|
bool
|
If True, unresolved dotted refs raise |
False
|
before_step
|
HooksFn | None
|
Optional hook called before each step: |
None
|
after_step
|
HooksFn | None
|
Optional hook called after each step: |
None
|
on_retry
|
HooksFn | None
|
Optional hook called on retry: |
None
|
redact
|
Callable[[str], str] | None
|
Optional function to scrub log lines before emission. |
None
|
deadline_fn
|
Callable[[], float] | None
|
Optional function returning remaining seconds for deadline-aware retries. |
None
|
safety_margin
|
float
|
Minimum seconds to keep as buffer before deadline when sleeping. |
0.25
|
jitter
|
bool
|
If True, randomize delay within the backoff window. |
True
|
Example
from stepchain import Chain def add(a, b): return a + b ctx = (Chain().put("x", 2).put("y", 3) ... .next(add, out="sum", args=["x","y"]) ... .run()) ctx["sum"] 5
context
property
¶
context
Snapshot of the current context.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
A shallow copy of the context dict (safe to inspect). |
put ¶
put(key, value)
Preload a key/value into the chain context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Context key. |
required |
value
|
Any
|
Value to store. |
required |
Returns:
| Type | Description |
|---|---|
'Chain'
|
Self (for fluent chaining). |
next ¶
next(func, *, out, args=(), kwargs=None, name=None, retries=0, retry_on=(Exception,), backoff=1.5, max_backoff=10.0, log=None, log_fmt=None, validate=None)
Append a step to the chain.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Any]
|
Callable to execute. Its parameters are resolved from context using |
required |
out
|
str
|
Context key to store the step's return value. |
required |
args
|
Iterable[Any]
|
Positional argument specs. Each item can be a literal or a context ref (e.g. |
()
|
kwargs
|
Mapping[str, Any] | None
|
Keyword argument specs using the same rules as |
None
|
name
|
str | None
|
Optional display name for logs; defaults to |
None
|
retries
|
int
|
Max retries on exceptions matching |
0
|
retry_on
|
Tuple[Type[BaseException], ...]
|
Tuple of exception types that are retryable. |
(Exception,)
|
backoff
|
float
|
Initial delay multiplier between retries. |
1.5
|
max_backoff
|
float
|
Max delay cap. |
10.0
|
log
|
Optional[Callable[[Mapping[str, Any], Any], None]]
|
Optional callback |
None
|
log_fmt
|
Optional[str]
|
Log template with |
None
|
validate
|
Optional[Callable[[Any], None]]
|
Optional |
None
|
Returns:
| Type | Description |
|---|---|
'Chain'
|
Self (for fluent chaining). |
run ¶
run()
Execute the chain synchronously.
The context is mutated in-place as steps run. If a step fails with a retryable error,
backoff + jitter + deadline checks are applied. If the deadline is exceeded, a
StepFailedError is raised. Validation failures raise ValidationFailedError immediately.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
The final context dictionary. |
Raises:
| Type | Description |
|---|---|
ValidationFailedError
|
A step's |
StepFailedError
|
Retries exhausted or deadline exceeded. |