Skip to content

Getting Started

Install

pip install py-stepchain

Usage & Examples

Short links to real, runnable examples in the repo:

Basic sync ETL

# examples/basic_sync.py
import logging
from stepchain import Chain, ValidationFailedError, StepFailedError

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

SOURCE = [
    {"id": 1, "name": "alice", "active": True},
    {"id": 2, "name": "bob", "active": False},
    {"id": 3, "name": "carrie", "active": True},
]


def extract():
    return SOURCE


def transform(rows):
    return [{"id": r["id"], "name": r["name"].upper()} for r in rows if r["active"]]


TARGET = []
calls = {"load": 0}


def load(rows):
    calls["load"] += 1
    if calls["load"] == 1:
        # force a retry once
        raise RuntimeError("transient sink issue")
    TARGET.extend(rows)
    return {"loaded": len(rows)}


def validate_non_empty(load_result):
    if not load_result or not load_result.get("loaded"):
        raise ValueError("no rows loaded")


def redact(msg: str) -> str:
    # example: mask secrets if any appear in logs
    return msg.replace("SECRET", "****")


if __name__ == "__main__":
    try:
        ctx = (
            Chain(redact=redact, strict=True)
            .next(extract, out="raw", log_fmt="raw_n={raw.__len__}")
            .next(transform, out="clean", args=["raw"], log_fmt="clean_n={clean.__len__}")
            .next(
                load,
                out="loadres",
                args=["clean"],
                retries=1,
                retry_on=(RuntimeError,),
                backoff=0.0,
                max_backoff=0.0,
                log_fmt="loaded={loadres.loaded}",
            )
            .next(
                lambda r: r["loaded"],
                out="count",
                args=["loadres"],
                log_fmt="count={count}",
            )
            .run()
        )
        print("pipeline ok:", ctx["count"], "rows loaded; target:", TARGET)
    except ValidationFailedError as e:
        print("validation error:", e)
    except StepFailedError as e:
        print("step failed:", e)

Basic async ETL

# examples/basic_async.py
import asyncio
import logging
from stepchain.chain.async_chain import AsyncChain
from stepchain.exceptions import ValidationFailedError, StepFailedError

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")

SOURCE = [
    {"id": 10, "name": "dave", "active": True},
    {"id": 11, "name": "ellen", "active": True},
    {"id": 12, "name": "frank", "active": False},
]


async def extract():
    return list(SOURCE)


async def transform(rows):
    return [{"id": r["id"], "name": r["name"].title()} for r in rows if r["active"]]


calls = {"n": 0}


async def load(rows):
    calls["n"] += 1
    if calls["n"] == 1:
        raise RuntimeError("flaky sink")
    return {"loaded": len(rows), "rows": rows}


def validate(res):
    if not res.get("loaded"):
        raise ValueError("no rows loaded")


async def main():
    try:
        ctx = await (
            AsyncChain(jitter=False)  # deterministic; set backoff=0 to avoid sleeping
            .next(extract, out="raw", log_fmt="raw_n={raw.__len__}")
            .next(transform, out="clean", args=["raw"], log_fmt="clean_n={clean.__len__}")
            .next(
                load,
                out="loadres",
                args=["clean"],
                retries=1,
                retry_on=(RuntimeError,),
                backoff=0.0,
                max_backoff=0.0,
                log_fmt="loaded={loadres.loaded}",
            )
            .run()
        )
        print("pipeline ok:", ctx["loadres"]["loaded"], "rows:", ctx["loadres"]["rows"])
    except ValidationFailedError as e:
        print("validation error:", e)
    except StepFailedError as e:
        print("step failed:", e)


if __name__ == "__main__":
    asyncio.run(main())

FastAPI sync ETL

# examples/fastapi_sync.py
from fastapi import FastAPI
from stepchain import Chain

app = FastAPI(title="Stepchain Sync API")


def extract_users():
    return [
        {"id": 1, "name": "alice", "active": True},
        {"id": 2, "name": "bob", "active": False},
        {"id": 3, "name": "carrie", "active": True},
    ]


def transform_active(rows):
    return [{"id": r["id"], "name": r["name"].upper()} for r in rows if r["active"]]


def save(rows):
    # pretend DB insert; just echo back
    return {"inserted": len(rows), "rows": rows}


@app.get("/sync/etl")
def sync_etl():
    ctx = (
        Chain(strict=True)
        .next(extract_users, out="raw", log_fmt="raw={raw.__len__}")
        .next(transform_active, out="clean", args=["raw"], log_fmt="clean={clean.__len__}")
        .next(save, out="result", args=["clean"], log_fmt="inserted={result.inserted}")
        .run()
    )
    return ctx["result"]

FastAPI async ETL

# examples/fastapi_async.py
from fastapi import FastAPI
from stepchain.chain.async_chain import AsyncChain

app = FastAPI(title="Stepchain Async API")


async def fetch_items():
    return [
        {"sku": "A1", "price": 10.0, "active": True},
        {"sku": "B2", "price": 0.0, "active": False},
        {"sku": "C3", "price": 25.5, "active": True},
    ]


async def to_invoice_lines(items):
    return [
        {"sku": x["sku"], "amount": x["price"]} for x in items if x["active"] and x["price"] > 0
    ]


async def post_invoice(lines):
    # pretend call to billing service; return summary
    return {"lines": len(lines), "total": sum(l["amount"] for l in lines)}


@app.get("/async/invoice")
async def async_invoice():
    ctx = await (
        AsyncChain(jitter=False)
        .next(fetch_items, out="items", log_fmt="n_items={items.__len__}")
        .next(
            to_invoice_lines,
            out="lines",
            args=["items"],
            log_fmt="n_lines={lines.__len__}",
        )
        .next(post_invoice, out="summary", args=["lines"], log_fmt="total={summary.total}")
        .run()
    )
    return ctx["summary"]

These examples demonstrate context passing, retries, logging templates (log_fmt), and validation hooks.