Getting Started¶
Install¶
pip install py-stepchain
Usage & Examples¶
Short links to real, runnable examples in the repo:
Basic sync ETL¶
# examples/basic_sync.py
import logging
from stepchain import Chain, ValidationFailedError, StepFailedError
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
SOURCE = [
{"id": 1, "name": "alice", "active": True},
{"id": 2, "name": "bob", "active": False},
{"id": 3, "name": "carrie", "active": True},
]
def extract():
return SOURCE
def transform(rows):
return [{"id": r["id"], "name": r["name"].upper()} for r in rows if r["active"]]
TARGET = []
calls = {"load": 0}
def load(rows):
calls["load"] += 1
if calls["load"] == 1:
# force a retry once
raise RuntimeError("transient sink issue")
TARGET.extend(rows)
return {"loaded": len(rows)}
def validate_non_empty(load_result):
if not load_result or not load_result.get("loaded"):
raise ValueError("no rows loaded")
def redact(msg: str) -> str:
# example: mask secrets if any appear in logs
return msg.replace("SECRET", "****")
if __name__ == "__main__":
try:
ctx = (
Chain(redact=redact, strict=True)
.next(extract, out="raw", log_fmt="raw_n={raw.__len__}")
.next(transform, out="clean", args=["raw"], log_fmt="clean_n={clean.__len__}")
.next(
load,
out="loadres",
args=["clean"],
retries=1,
retry_on=(RuntimeError,),
backoff=0.0,
max_backoff=0.0,
log_fmt="loaded={loadres.loaded}",
)
.next(
lambda r: r["loaded"],
out="count",
args=["loadres"],
log_fmt="count={count}",
)
.run()
)
print("pipeline ok:", ctx["count"], "rows loaded; target:", TARGET)
except ValidationFailedError as e:
print("validation error:", e)
except StepFailedError as e:
print("step failed:", e)
Basic async ETL¶
# examples/basic_async.py
import asyncio
import logging
from stepchain.chain.async_chain import AsyncChain
from stepchain.exceptions import ValidationFailedError, StepFailedError
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
SOURCE = [
{"id": 10, "name": "dave", "active": True},
{"id": 11, "name": "ellen", "active": True},
{"id": 12, "name": "frank", "active": False},
]
async def extract():
return list(SOURCE)
async def transform(rows):
return [{"id": r["id"], "name": r["name"].title()} for r in rows if r["active"]]
calls = {"n": 0}
async def load(rows):
calls["n"] += 1
if calls["n"] == 1:
raise RuntimeError("flaky sink")
return {"loaded": len(rows), "rows": rows}
def validate(res):
if not res.get("loaded"):
raise ValueError("no rows loaded")
async def main():
try:
ctx = await (
AsyncChain(jitter=False) # deterministic; set backoff=0 to avoid sleeping
.next(extract, out="raw", log_fmt="raw_n={raw.__len__}")
.next(transform, out="clean", args=["raw"], log_fmt="clean_n={clean.__len__}")
.next(
load,
out="loadres",
args=["clean"],
retries=1,
retry_on=(RuntimeError,),
backoff=0.0,
max_backoff=0.0,
log_fmt="loaded={loadres.loaded}",
)
.run()
)
print("pipeline ok:", ctx["loadres"]["loaded"], "rows:", ctx["loadres"]["rows"])
except ValidationFailedError as e:
print("validation error:", e)
except StepFailedError as e:
print("step failed:", e)
if __name__ == "__main__":
asyncio.run(main())
FastAPI sync ETL¶
# examples/fastapi_sync.py
from fastapi import FastAPI
from stepchain import Chain
app = FastAPI(title="Stepchain Sync API")
def extract_users():
return [
{"id": 1, "name": "alice", "active": True},
{"id": 2, "name": "bob", "active": False},
{"id": 3, "name": "carrie", "active": True},
]
def transform_active(rows):
return [{"id": r["id"], "name": r["name"].upper()} for r in rows if r["active"]]
def save(rows):
# pretend DB insert; just echo back
return {"inserted": len(rows), "rows": rows}
@app.get("/sync/etl")
def sync_etl():
ctx = (
Chain(strict=True)
.next(extract_users, out="raw", log_fmt="raw={raw.__len__}")
.next(transform_active, out="clean", args=["raw"], log_fmt="clean={clean.__len__}")
.next(save, out="result", args=["clean"], log_fmt="inserted={result.inserted}")
.run()
)
return ctx["result"]
FastAPI async ETL¶
# examples/fastapi_async.py
from fastapi import FastAPI
from stepchain.chain.async_chain import AsyncChain
app = FastAPI(title="Stepchain Async API")
async def fetch_items():
return [
{"sku": "A1", "price": 10.0, "active": True},
{"sku": "B2", "price": 0.0, "active": False},
{"sku": "C3", "price": 25.5, "active": True},
]
async def to_invoice_lines(items):
return [
{"sku": x["sku"], "amount": x["price"]} for x in items if x["active"] and x["price"] > 0
]
async def post_invoice(lines):
# pretend call to billing service; return summary
return {"lines": len(lines), "total": sum(l["amount"] for l in lines)}
@app.get("/async/invoice")
async def async_invoice():
ctx = await (
AsyncChain(jitter=False)
.next(fetch_items, out="items", log_fmt="n_items={items.__len__}")
.next(
to_invoice_lines,
out="lines",
args=["items"],
log_fmt="n_lines={lines.__len__}",
)
.next(post_invoice, out="summary", args=["lines"], log_fmt="total={summary.total}")
.run()
)
return ctx["summary"]
These examples demonstrate context passing, retries, logging templates (
log_fmt), and validation hooks.