Skip to content

Phase 1 Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Build a working multi-chain NFT indexing pipeline (Polygon + TON) with dedup, idempotency, reorg handling, projections, and a read-only API — all Phase 1 exit criteria passing.

Architecture: Modular monolith under src/ft/* namespace. Event-driven pipeline: Chain Adapters → Normalizer → State Updater → Projection Pipeline → FastAPI. Redis Streams as event bus, PostgreSQL as source of truth, applied_events for idempotency, outbox pattern for reliable publishing.

Tech Stack: Python 3.12+, uv, FastAPI, SQLAlchemy 2.0 (Mapped[]), asyncpg, Redis Streams, PostgreSQL 16 + pgvector, Pydantic v2, pytest + testcontainers, structlog, Prometheus client, Docker Compose.

Design doc: docs/plans/2026-02-28-phase1-design.md

Reference: docs/NFT_Platform_Implementation_Plan.md (architecture spec)


Task 1: Project Scaffold

Files: - Create: pyproject.toml - Create: src/ft/__init__.py - Create: src/ft/core/__init__.py - Create: tests/__init__.py - Create: .gitignore - Create: .python-version

Step 1: Initialize git repo

cd /c/Users/rusla/Projects/ft.supply
git init

Step 2: Create .python-version

3.12

Step 3: Create .gitignore

__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.eggs/
*.egg
.venv/
.env
.env.*
!.env.example
*.db
*.sqlite3
.mypy_cache/
.ruff_cache/
.pytest_cache/
.coverage
htmlcov/
node_modules/

Step 4: Create pyproject.toml

[project]
name = "ft-supply"
version = "0.1.0"
description = "Multi-chain NFT indexing and analytics platform"
requires-python = ">=3.12"
dependencies = [
    # Web
    "fastapi>=0.115.0",
    "uvicorn[standard]>=0.32.0",
    # Database
    "sqlalchemy[asyncio]>=2.0.36",
    "asyncpg>=0.30.0",
    "alembic>=1.14.0",
    # Redis
    "redis[hiredis]>=5.2.0",
    # Validation & Config
    "pydantic>=2.10.0",
    "pydantic-settings>=2.7.0",
    # HTTP
    "httpx>=0.28.0",
    # Blockchain (EVM)
    "web3>=7.6.0",
    # Observability
    "structlog>=24.4.0",
    "prometheus-client>=0.21.0",
    # Utils
    "orjson>=3.10.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.3.0",
    "pytest-asyncio>=0.24.0",
    "testcontainers[postgres]>=4.9.0",
    "ruff>=0.8.0",
    "httpx>=0.28.0",  # for FastAPI TestClient
]

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
packages = ["src/ft"]

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
pythonpath = ["src"]

[tool.ruff]
src = ["src", "tests"]
line-length = 120
target-version = "py312"

[tool.ruff.lint]
select = ["E", "F", "W", "I", "UP", "B", "SIM", "RUF"]
ignore = [
    "E712",  # SQLAlchemy `== True` comparisons
]

[tool.ruff.lint.isort]
known-first-party = ["ft"]

Step 5: Create package init files

mkdir -p src/ft/core src/ft/ingest src/ft/pipeline src/ft/api src/ft/worker src/ft/observability
mkdir -p tests/contract tests/idempotency tests/reorg tests/e2e tests/reconciliation tests/api
mkdir -p tests/fixtures/polygon tests/fixtures/ton tests/fixtures/shared
mkdir -p migrations docker workers

Create empty __init__.py in every src/ft/ subpackage and tests/ directory.

Step 6: Install with uv

uv sync

Expected: lockfile generated, venv created, all deps installed.

Step 7: Verify setup

uv run python -c "import ft; print('ok')"
uv run ruff check src/ tests/
uv run pytest --co  # collect only, no tests yet

Step 8: Commit

git add .
git commit -m "feat: project scaffold with uv + pyproject.toml"

Task 2: Core Config + DB Engine

Files: - Create: src/ft/core/config.py - Create: src/ft/core/db.py - Create: .env.example - Test: tests/test_config.py

Step 1: Write test for config loading

# tests/test_config.py
import os
import pytest
from ft.core.config import Settings

def test_settings_from_env(monkeypatch):
    monkeypatch.setenv("POSTGRES_HOST", "localhost")
    monkeypatch.setenv("POSTGRES_PORT", "5432")
    monkeypatch.setenv("POSTGRES_DB", "ftsupply")
    monkeypatch.setenv("POSTGRES_USER", "ft")
    monkeypatch.setenv("POSTGRES_PASSWORD", "ft")
    monkeypatch.setenv("REDIS_URL", "redis://localhost:6379/0")

    settings = Settings()
    assert "localhost" in settings.database_url
    assert "5432" in settings.database_url
    assert settings.redis_url == "redis://localhost:6379/0"

def test_settings_defaults(monkeypatch):
    monkeypatch.setenv("POSTGRES_PASSWORD", "ft")
    settings = Settings()
    assert settings.postgres_host == "localhost"
    assert settings.postgres_port == 5432

Step 2: Run test to verify it fails

uv run pytest tests/test_config.py -v

Expected: FAIL — ModuleNotFoundError: No module named 'ft.core.config'

Step 3: Implement config

# src/ft/core/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    model_config = {"env_prefix": "", "case_sensitive": False}

    # Postgres
    postgres_host: str = "localhost"
    postgres_port: int = 5432
    postgres_db: str = "ftsupply"
    postgres_user: str = "ft"
    postgres_password: str = "ft"

    # Redis
    redis_url: str = "redis://localhost:6379/0"

    # MinIO
    minio_endpoint: str = "localhost:9000"
    minio_access_key: str = "minioadmin"
    minio_secret_key: str = "minioadmin"

    # API
    api_host: str = "0.0.0.0"
    api_port: int = 8000

    @property
    def database_url(self) -> str:
        return (
            f"postgresql+asyncpg://{self.postgres_user}:{self.postgres_password}"
            f"@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}"
        )

    @property
    def database_url_sync(self) -> str:
        return (
            f"postgresql://{self.postgres_user}:{self.postgres_password}"
            f"@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}"
        )


def get_settings() -> Settings:
    return Settings()

Step 4: Implement DB engine

# src/ft/core/db.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine

from ft.core.config import Settings

def create_engine(settings: Settings):
    return create_async_engine(
        settings.database_url,
        echo=False,
        pool_size=10,
        max_overflow=20,
    )

def create_session_factory(engine) -> async_sessionmaker[AsyncSession]:
    return async_sessionmaker(engine, expire_on_commit=False)

Step 5: Create .env.example

POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=ftsupply
POSTGRES_USER=ft
POSTGRES_PASSWORD=ft
REDIS_URL=redis://localhost:6379/0
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin

Step 6: Run tests

uv run pytest tests/test_config.py -v

Expected: PASS

Step 7: Commit

git add src/ft/core/config.py src/ft/core/db.py tests/test_config.py .env.example
git commit -m "feat: core config (Pydantic Settings) + async DB engine"

Task 3: Model Base + uuidv5 ID Strategy

Files: - Create: src/ft/core/models/__init__.py - Create: src/ft/core/models/base.py - Test: tests/test_models_base.py

Step 1: Write test for uuidv5 ID generation

# tests/test_models_base.py
import uuid
from ft.core.models.base import make_id, FT_NAMESPACE

def test_make_id_deterministic():
    """Same inputs always produce same ID."""
    id1 = make_id("ton", "EQCollection", "42")
    id2 = make_id("ton", "EQCollection", "42")
    assert id1 == id2
    assert isinstance(id1, uuid.UUID)

def test_make_id_different_inputs():
    """Different inputs produce different IDs."""
    id1 = make_id("ton", "EQCollection", "42")
    id2 = make_id("ton", "EQCollection", "43")
    assert id1 != id2

def test_make_id_is_uuid5():
    """IDs are uuidv5."""
    id1 = make_id("polygon", "0xabc", "0")
    assert id1.version == 5

def test_namespace_is_stable():
    """Namespace doesn't change between imports."""
    expected = uuid.uuid5(uuid.NAMESPACE_URL, "ft.supply")
    assert FT_NAMESPACE == expected

Step 2: Run test to verify it fails

uv run pytest tests/test_models_base.py -v

Step 3: Implement base models

# src/ft/core/models/base.py
import uuid
from datetime import datetime

from sqlalchemy import MetaData, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

FT_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_URL, "ft.supply")


def make_id(*parts: str) -> uuid.UUID:
    """Deterministic uuidv5 from natural key parts. No FK race conditions."""
    return uuid.uuid5(FT_NAMESPACE, ":".join(parts))


# Convention: each schema gets its own MetaData for clean separation
SCHEMA_METADATA = {
    "ref": MetaData(schema="ref"),
    "ingest": MetaData(schema="ingest"),
    "ledger": MetaData(schema="ledger"),
    "catalog": MetaData(schema="catalog"),
    "market": MetaData(schema="market"),
    "projection": MetaData(schema="projection"),
    "system": MetaData(schema="system"),
}


class Base(DeclarativeBase):
    """Base for all ORM models. No default schema — each model declares its own."""
    pass


class TimestampMixin:
    """created_at / updated_at for tables that need it."""
    created_at: Mapped[datetime] = mapped_column(server_default=text("now()"))
    updated_at: Mapped[datetime] = mapped_column(server_default=text("now()"), onupdate=datetime.utcnow)
# src/ft/core/models/__init__.py
from ft.core.models.base import Base, make_id, FT_NAMESPACE, TimestampMixin

__all__ = ["Base", "make_id", "FT_NAMESPACE", "TimestampMixin"]

Step 4: Run tests

uv run pytest tests/test_models_base.py -v

Expected: PASS

Step 5: Commit

git add src/ft/core/models/ tests/test_models_base.py
git commit -m "feat: DeclarativeBase + uuidv5 ID strategy"

Task 4: Domain Models — ref, ingest, system

Files: - Create: src/ft/core/models/ref.py - Create: src/ft/core/models/ingest.py - Create: src/ft/core/models/system.py - Create: src/ft/core/models/enums.py

Step 1: Create shared enums

# src/ft/core/models/enums.py
from enum import StrEnum

class FinalityStatus(StrEnum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    FINALIZED = "finalized"

class EventKind(StrEnum):
    TRANSFER = "transfer"
    MINT = "mint"
    BURN = "burn"
    LIST = "list"
    SALE = "sale"
    CANCEL = "cancel"

Step 2: Implement ref models

# src/ft/core/models/ref.py
import uuid
from sqlalchemy import String, ForeignKey, Integer
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base, TimestampMixin

class Chain(Base, TimestampMixin):
    __tablename__ = "chains"
    __table_args__ = {"schema": "ref"}

    chain_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    slug: Mapped[str] = mapped_column(String(32), unique=True)         # "evm", "ton", "solana"
    display_name: Mapped[str] = mapped_column(String(64))

class Network(Base, TimestampMixin):
    __tablename__ = "networks"
    __table_args__ = {"schema": "ref"}

    network_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    chain_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.chains.chain_id"))
    slug: Mapped[str] = mapped_column(String(64), unique=True)         # "polygon", "ton-mainnet"
    display_name: Mapped[str] = mapped_column(String(64))
    rpc_url: Mapped[str] = mapped_column(String(512))
    finality_depth: Mapped[int] = mapped_column(Integer, default=128)
    is_active: Mapped[bool] = mapped_column(default=True)

class TokenStandard(Base):
    __tablename__ = "token_standards"
    __table_args__ = {"schema": "ref"}

    standard_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    chain_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.chains.chain_id"))
    slug: Mapped[str] = mapped_column(String(32), unique=True)         # "ERC-721", "ERC-1155", "TEP-62"
    display_name: Mapped[str] = mapped_column(String(64))

class Marketplace(Base, TimestampMixin):
    __tablename__ = "marketplaces"
    __table_args__ = {"schema": "ref"}

    marketplace_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    slug: Mapped[str] = mapped_column(String(64), unique=True)         # "opensea", "getgems"
    display_name: Mapped[str] = mapped_column(String(64))
    network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))

Step 3: Implement ingest models

# src/ft/core/models/ingest.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, ForeignKey, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base, TimestampMixin
from ft.core.models.enums import FinalityStatus

class RawEvent(Base):
    __tablename__ = "raw_events"
    __table_args__ = (
        Index("ix_raw_events_network_block", "network_id", "block_number"),
        {"schema": "ingest"},
    )

    raw_event_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
    block_number: Mapped[int] = mapped_column(BigInteger)
    tx_hash: Mapped[str] = mapped_column(String(128))
    log_index: Mapped[int] = mapped_column(Integer)
    source_event_id: Mapped[str] = mapped_column(String(256))
    raw_payload: Mapped[dict] = mapped_column(JSONB)
    finality_status: Mapped[str] = mapped_column(String(16), default=FinalityStatus.PENDING)
    received_at: Mapped[datetime] = mapped_column(server_default="now()")

Step 4: Implement system models

# src/ft/core/models/system.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, ForeignKey, Index, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base

class SyncCursor(Base):
    __tablename__ = "sync_cursors"
    __table_args__ = {"schema": "system"}

    network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"), primary_key=True)
    last_block: Mapped[int] = mapped_column(BigInteger, default=0)
    last_block_hash: Mapped[str | None] = mapped_column(String(128))
    last_updated_at: Mapped[datetime] = mapped_column(server_default="now()")

class OutboxEvent(Base):
    __tablename__ = "outbox_events"
    __table_args__ = (
        Index("ix_outbox_unpublished", "published_at", postgresql_where="published_at IS NULL"),
        {"schema": "system"},
    )

    event_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    topic: Mapped[str] = mapped_column(String(128))
    payload: Mapped[dict] = mapped_column(JSONB)
    created_at: Mapped[datetime] = mapped_column(server_default="now()")
    published_at: Mapped[datetime | None] = mapped_column(default=None)

class DeadLetterItem(Base):
    __tablename__ = "dlq"
    __table_args__ = {"schema": "system"}

    dlq_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    event_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("system.outbox_events.event_id"))
    error: Mapped[str] = mapped_column(Text)
    attempts: Mapped[int] = mapped_column(Integer, default=0)
    last_attempt_at: Mapped[datetime] = mapped_column(server_default="now()")

class ApiKey(Base):
    __tablename__ = "api_keys"
    __table_args__ = {"schema": "system"}

    key_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
    key_hash: Mapped[str] = mapped_column(String(128), unique=True)
    label: Mapped[str] = mapped_column(String(128))
    is_active: Mapped[bool] = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(server_default="now()")

Step 5: Verify models compile

uv run python -c "from ft.core.models import ref, ingest, system; print('ok')"

Step 6: Commit

git add src/ft/core/models/
git commit -m "feat: domain models — ref, ingest, system schemas"

Task 5: Domain Models — ledger, catalog, market, projection

Files: - Create: src/ft/core/models/ledger.py - Create: src/ft/core/models/catalog.py - Create: src/ft/core/models/market.py - Create: src/ft/core/models/projection.py

Step 1: Implement ledger models (dedup + idempotency core)

# src/ft/core/models/ledger.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, ForeignKey, Index, Boolean, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base

class NormalizedEventKey(Base):
    """Dedup table. UPSERT on (source_event_id, sub_index).
    Handles reorg re-inclusion: is_reverted flipped back to false."""
    __tablename__ = "normalized_event_keys"
    __table_args__ = (
        UniqueConstraint("source_event_id", "sub_index", name="uq_event_key"),
        {"schema": "ledger"},
    )

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    source_event_id: Mapped[str] = mapped_column(String(256))
    sub_index: Mapped[int] = mapped_column(Integer, default=0)
    normalized_event_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ledger.normalized_events.normalized_event_id"))
    is_reverted: Mapped[bool] = mapped_column(Boolean, default=False)

class NormalizedEvent(Base):
    __tablename__ = "normalized_events"
    __table_args__ = (
        Index("ix_norm_events_asset", "asset_id"),
        Index("ix_norm_events_block", "network_id", "block_number"),
        {"schema": "ledger"},
    )

    normalized_event_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
    asset_id: Mapped[uuid.UUID] = mapped_column()
    collection_id: Mapped[uuid.UUID | None] = mapped_column()
    event_kind: Mapped[str] = mapped_column(String(16))     # EventKind
    contract_address: Mapped[str] = mapped_column(String(256))
    block_number: Mapped[int] = mapped_column(BigInteger)
    tx_hash: Mapped[str] = mapped_column(String(128))
    timestamp: Mapped[int] = mapped_column(BigInteger)       # unix epoch
    finality_status: Mapped[str] = mapped_column(String(16))
    created_at: Mapped[datetime] = mapped_column(server_default="now()")

class NormalizedEventDelta(Base):
    __tablename__ = "normalized_event_deltas"
    __table_args__ = (
        Index("ix_norm_deltas_event", "normalized_event_id"),
        {"schema": "ledger"},
    )

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    normalized_event_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ledger.normalized_events.normalized_event_id"))
    account_address: Mapped[str] = mapped_column(String(256))
    qty_delta: Mapped[int] = mapped_column(Integer)   # +1 for receive, -1 for send

class AppliedEvent(Base):
    """Idempotency gate. INSERT ON CONFLICT DO NOTHING before every state mutation."""
    __tablename__ = "applied_events"
    __table_args__ = {"schema": "ledger"}

    consumer_id: Mapped[str] = mapped_column(String(128), primary_key=True)
    normalized_event_id: Mapped[uuid.UUID] = mapped_column(
        ForeignKey("ledger.normalized_events.normalized_event_id"),
        primary_key=True,
    )
    applied_at: Mapped[datetime] = mapped_column(server_default="now()")

Step 2: Implement catalog models

# src/ft/core/models/catalog.py
import uuid
from datetime import datetime
from sqlalchemy import String, Boolean, ForeignKey, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base, TimestampMixin

class Collection(Base, TimestampMixin):
    __tablename__ = "collections"
    __table_args__ = (
        Index("ix_collections_network", "network_id"),
        {"schema": "catalog"},
    )

    collection_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
    contract_address: Mapped[str] = mapped_column(String(256))
    name: Mapped[str | None] = mapped_column(String(256))
    is_stub: Mapped[bool] = mapped_column(Boolean, default=True)

class Asset(Base, TimestampMixin):
    __tablename__ = "assets"
    __table_args__ = (
        Index("ix_assets_collection", "collection_id"),
        {"schema": "catalog"},
    )

    asset_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    collection_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("catalog.collections.collection_id"))
    token_id: Mapped[str] = mapped_column(String(256))
    name: Mapped[str | None] = mapped_column(String(256))
    image_url: Mapped[str | None] = mapped_column(String(1024))
    metadata_json: Mapped[dict | None] = mapped_column(JSONB)
    is_stub: Mapped[bool] = mapped_column(Boolean, default=True)

Step 3: Implement market models

# src/ft/core/models/market.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, Numeric, ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base

class OwnershipCurrent(Base):
    """UPSERT qty += delta. DELETE WHERE qty <= 0."""
    __tablename__ = "ownership_current"
    __table_args__ = (
        UniqueConstraint("asset_id", "account_address", name="uq_ownership"),
        Index("ix_ownership_account", "account_address"),
        {"schema": "market"},
    )

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    asset_id: Mapped[uuid.UUID] = mapped_column()
    account_address: Mapped[str] = mapped_column(String(256))
    qty: Mapped[int] = mapped_column(Integer, default=0)
    finality_status: Mapped[str] = mapped_column(String(16))
    updated_at: Mapped[datetime] = mapped_column(server_default="now()")

class ListingCurrent(Base):
    __tablename__ = "listings_current"
    __table_args__ = (
        Index("ix_listings_asset", "asset_id"),
        {"schema": "market"},
    )

    listing_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    asset_id: Mapped[uuid.UUID] = mapped_column()
    marketplace_slug: Mapped[str] = mapped_column(String(64))
    seller: Mapped[str] = mapped_column(String(256))
    price: Mapped[float] = mapped_column(Numeric(38, 18))
    currency: Mapped[str] = mapped_column(String(32))
    listed_at: Mapped[datetime] = mapped_column()

class SaleHistory(Base):
    __tablename__ = "sales_history"
    __table_args__ = (
        Index("ix_sales_asset", "asset_id"),
        Index("ix_sales_time", "sold_at"),
        {"schema": "market"},
    )

    sale_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    asset_id: Mapped[uuid.UUID] = mapped_column()
    seller: Mapped[str] = mapped_column(String(256))
    buyer: Mapped[str] = mapped_column(String(256))
    price: Mapped[float] = mapped_column(Numeric(38, 18))
    currency: Mapped[str] = mapped_column(String(32))
    sold_at: Mapped[datetime] = mapped_column()

Step 4: Implement projection models

# src/ft/core/models/projection.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, Numeric, Boolean, Index
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base

class OwnershipView(Base):
    __tablename__ = "ownership_view"
    __table_args__ = (
        Index("ix_ownview_asset", "asset_id"),
        Index("ix_ownview_owner", "owner_address"),
        {"schema": "projection"},
    )

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    asset_id: Mapped[uuid.UUID] = mapped_column()
    owner_address: Mapped[str] = mapped_column(String(256))
    qty: Mapped[int] = mapped_column(Integer)
    finality_status: Mapped[str] = mapped_column(String(16))
    updated_at: Mapped[datetime] = mapped_column(server_default="now()")

class AssetCard(Base):
    __tablename__ = "asset_cards"
    __table_args__ = (
        Index("ix_assetcards_collection", "collection_id"),
        {"schema": "projection"},
    )

    asset_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    collection_id: Mapped[uuid.UUID] = mapped_column()
    name: Mapped[str | None] = mapped_column(String(256))
    image_url: Mapped[str | None] = mapped_column(String(1024))
    is_stub: Mapped[bool] = mapped_column(Boolean, default=True)
    floor_price: Mapped[float | None] = mapped_column(Numeric(38, 18))
    last_sale_price: Mapped[float | None] = mapped_column(Numeric(38, 18))
    finality_status: Mapped[str] = mapped_column(String(16))
    updated_at: Mapped[datetime] = mapped_column(server_default="now()")

class CollectionStats(Base):
    __tablename__ = "collection_stats"
    __table_args__ = {"schema": "projection"}

    collection_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    floor_price: Mapped[float | None] = mapped_column(Numeric(38, 18))
    total_volume: Mapped[float | None] = mapped_column(Numeric(38, 18))
    owner_count: Mapped[int] = mapped_column(Integer, default=0)
    asset_count: Mapped[int] = mapped_column(Integer, default=0)
    listed_count: Mapped[int] = mapped_column(Integer, default=0)
    updated_at: Mapped[datetime] = mapped_column(server_default="now()")

class PortfolioAsset(Base):
    __tablename__ = "portfolio_assets"
    __table_args__ = (
        Index("ix_portfolio_owner", "owner_address"),
        {"schema": "projection"},
    )

    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    owner_address: Mapped[str] = mapped_column(String(256))
    asset_id: Mapped[uuid.UUID] = mapped_column()
    qty: Mapped[int] = mapped_column(Integer)
    estimated_value_usd: Mapped[float | None] = mapped_column(Numeric(38, 18))
    updated_at: Mapped[datetime] = mapped_column(server_default="now()")

class ListingCard(Base):
    __tablename__ = "listing_cards"
    __table_args__ = (
        Index("ix_listcards_asset", "asset_id"),
        {"schema": "projection"},
    )

    listing_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
    asset_id: Mapped[uuid.UUID] = mapped_column()
    collection_id: Mapped[uuid.UUID] = mapped_column()
    seller: Mapped[str] = mapped_column(String(256))
    price: Mapped[float] = mapped_column(Numeric(38, 18))
    currency: Mapped[str] = mapped_column(String(32))
    asset_name: Mapped[str | None] = mapped_column(String(256))
    asset_image_url: Mapped[str | None] = mapped_column(String(1024))
    listed_at: Mapped[datetime] = mapped_column()

Step 5: Verify all models compile and no circular imports

uv run python -c "
from ft.core.models import base, ref, ingest, ledger, catalog, market, projection, system
print('All models imported successfully')
print(f'Tables: {len(base.Base.metadata.tables)}')
"

Step 6: Commit

git add src/ft/core/models/
git commit -m "feat: domain models — ledger, catalog, market, projection schemas"

Task 6: Baseline SQL Migration

Files: - Create: migrations/001_baseline.sql

Step 1: Write the baseline migration

Generate from models but hand-write SQL (project convention: raw SQL, additive only).

-- migrations/001_baseline.sql
-- Phase 1 baseline: 7 schemas, all tables, indexes, constraints
-- Additive only. All statements use IF NOT EXISTS.

BEGIN;

-- === Extensions ===
CREATE EXTENSION IF NOT EXISTS pgcrypto;    -- gen_random_uuid() for non-deterministic IDs (api_keys, outbox, dlq)
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";  -- uuid_generate_v5() for deterministic IDs in seed data

-- === Schemas ===
CREATE SCHEMA IF NOT EXISTS ref;
CREATE SCHEMA IF NOT EXISTS ingest;
CREATE SCHEMA IF NOT EXISTS ledger;
CREATE SCHEMA IF NOT EXISTS catalog;
CREATE SCHEMA IF NOT EXISTS market;
CREATE SCHEMA IF NOT EXISTS projection;
CREATE SCHEMA IF NOT EXISTS system;

-- === ref ===
CREATE TABLE IF NOT EXISTS ref.chains (
    chain_id UUID PRIMARY KEY,
    slug VARCHAR(32) UNIQUE NOT NULL,
    display_name VARCHAR(64) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS ref.networks (
    network_id UUID PRIMARY KEY,
    chain_id UUID REFERENCES ref.chains(chain_id),
    slug VARCHAR(64) UNIQUE NOT NULL,
    display_name VARCHAR(64) NOT NULL,
    rpc_url VARCHAR(512) NOT NULL,
    finality_depth INTEGER DEFAULT 128,
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS ref.token_standards (
    standard_id UUID PRIMARY KEY,
    chain_id UUID REFERENCES ref.chains(chain_id),
    slug VARCHAR(32) UNIQUE NOT NULL,
    display_name VARCHAR(64) NOT NULL
);

CREATE TABLE IF NOT EXISTS ref.marketplaces (
    marketplace_id UUID PRIMARY KEY,
    slug VARCHAR(64) UNIQUE NOT NULL,
    display_name VARCHAR(64) NOT NULL,
    network_id UUID REFERENCES ref.networks(network_id),
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);

-- === ingest ===
CREATE TABLE IF NOT EXISTS ingest.raw_events (
    raw_event_id UUID PRIMARY KEY,
    network_id UUID REFERENCES ref.networks(network_id),
    block_number BIGINT NOT NULL,
    tx_hash VARCHAR(128) NOT NULL,
    log_index INTEGER NOT NULL,
    source_event_id VARCHAR(256) NOT NULL,
    raw_payload JSONB,
    finality_status VARCHAR(16) DEFAULT 'pending',
    received_at TIMESTAMPTZ DEFAULT now()
);
CREATE UNIQUE INDEX IF NOT EXISTS uq_raw_events_network_source ON ingest.raw_events(network_id, source_event_id);
CREATE INDEX IF NOT EXISTS ix_raw_events_network_block ON ingest.raw_events(network_id, block_number);

-- === ledger ===
CREATE TABLE IF NOT EXISTS ledger.normalized_events (
    normalized_event_id UUID PRIMARY KEY,
    network_id UUID REFERENCES ref.networks(network_id),
    asset_id UUID NOT NULL,
    collection_id UUID,
    event_kind VARCHAR(16) NOT NULL,
    contract_address VARCHAR(256) NOT NULL,
    block_number BIGINT NOT NULL,
    tx_hash VARCHAR(128) NOT NULL,
    timestamp BIGINT NOT NULL,
    finality_status VARCHAR(16) NOT NULL,
    created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_norm_events_asset ON ledger.normalized_events(asset_id);
CREATE INDEX IF NOT EXISTS ix_norm_events_block ON ledger.normalized_events(network_id, block_number);

CREATE TABLE IF NOT EXISTS ledger.normalized_event_keys (
    id BIGSERIAL PRIMARY KEY,
    source_event_id VARCHAR(256) NOT NULL,
    sub_index INTEGER DEFAULT 0,
    normalized_event_id UUID REFERENCES ledger.normalized_events(normalized_event_id),
    is_reverted BOOLEAN DEFAULT false,
    CONSTRAINT uq_event_key UNIQUE (source_event_id, sub_index)
);

CREATE TABLE IF NOT EXISTS ledger.normalized_event_deltas (
    id BIGSERIAL PRIMARY KEY,
    normalized_event_id UUID REFERENCES ledger.normalized_events(normalized_event_id),
    account_address VARCHAR(256) NOT NULL,
    qty_delta INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_norm_deltas_event ON ledger.normalized_event_deltas(normalized_event_id);

CREATE TABLE IF NOT EXISTS ledger.applied_events (
    consumer_id VARCHAR(128) NOT NULL,
    normalized_event_id UUID REFERENCES ledger.normalized_events(normalized_event_id),
    applied_at TIMESTAMPTZ DEFAULT now(),
    PRIMARY KEY (consumer_id, normalized_event_id)
);

-- === catalog ===
CREATE TABLE IF NOT EXISTS catalog.collections (
    collection_id UUID PRIMARY KEY,
    network_id UUID REFERENCES ref.networks(network_id),
    contract_address VARCHAR(256) NOT NULL,
    name VARCHAR(256),
    is_stub BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_collections_network ON catalog.collections(network_id);

CREATE TABLE IF NOT EXISTS catalog.assets (
    asset_id UUID PRIMARY KEY,
    collection_id UUID REFERENCES catalog.collections(collection_id),
    token_id VARCHAR(256) NOT NULL,
    name VARCHAR(256),
    image_url VARCHAR(1024),
    metadata_json JSONB,
    is_stub BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT now(),
    updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_assets_collection ON catalog.assets(collection_id);

-- === market ===
CREATE TABLE IF NOT EXISTS market.ownership_current (
    id BIGSERIAL PRIMARY KEY,
    asset_id UUID NOT NULL,
    account_address VARCHAR(256) NOT NULL,
    qty INTEGER DEFAULT 0,
    finality_status VARCHAR(16) NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT now(),
    CONSTRAINT uq_ownership UNIQUE (asset_id, account_address)
);
CREATE INDEX IF NOT EXISTS ix_ownership_account ON market.ownership_current(account_address);

CREATE TABLE IF NOT EXISTS market.listings_current (
    listing_id UUID PRIMARY KEY,
    asset_id UUID NOT NULL,
    marketplace_slug VARCHAR(64) NOT NULL,
    seller VARCHAR(256) NOT NULL,
    price NUMERIC(38,18) NOT NULL,
    currency VARCHAR(32) NOT NULL,
    listed_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_listings_asset ON market.listings_current(asset_id);

CREATE TABLE IF NOT EXISTS market.sales_history (
    sale_id UUID PRIMARY KEY,
    asset_id UUID NOT NULL,
    seller VARCHAR(256) NOT NULL,
    buyer VARCHAR(256) NOT NULL,
    price NUMERIC(38,18) NOT NULL,
    currency VARCHAR(32) NOT NULL,
    sold_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_sales_asset ON market.sales_history(asset_id);
CREATE INDEX IF NOT EXISTS ix_sales_time ON market.sales_history(sold_at);

-- === projection ===
CREATE TABLE IF NOT EXISTS projection.ownership_view (
    id BIGSERIAL PRIMARY KEY,
    asset_id UUID NOT NULL,
    owner_address VARCHAR(256) NOT NULL,
    qty INTEGER NOT NULL,
    finality_status VARCHAR(16) NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_ownview_asset ON projection.ownership_view(asset_id);
CREATE INDEX IF NOT EXISTS ix_ownview_owner ON projection.ownership_view(owner_address);

CREATE TABLE IF NOT EXISTS projection.asset_cards (
    asset_id UUID PRIMARY KEY,
    collection_id UUID NOT NULL,
    name VARCHAR(256),
    image_url VARCHAR(1024),
    is_stub BOOLEAN DEFAULT true,
    floor_price NUMERIC(38,18),
    last_sale_price NUMERIC(38,18),
    finality_status VARCHAR(16) NOT NULL,
    updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_assetcards_collection ON projection.asset_cards(collection_id);

CREATE TABLE IF NOT EXISTS projection.collection_stats (
    collection_id UUID PRIMARY KEY,
    floor_price NUMERIC(38,18),
    total_volume NUMERIC(38,18),
    owner_count INTEGER DEFAULT 0,
    asset_count INTEGER DEFAULT 0,
    listed_count INTEGER DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS projection.portfolio_assets (
    id BIGSERIAL PRIMARY KEY,
    owner_address VARCHAR(256) NOT NULL,
    asset_id UUID NOT NULL,
    qty INTEGER NOT NULL,
    estimated_value_usd NUMERIC(38,18),
    updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_portfolio_owner ON projection.portfolio_assets(owner_address);

CREATE TABLE IF NOT EXISTS projection.listing_cards (
    listing_id UUID PRIMARY KEY,
    asset_id UUID NOT NULL,
    collection_id UUID NOT NULL,
    seller VARCHAR(256) NOT NULL,
    price NUMERIC(38,18) NOT NULL,
    currency VARCHAR(32) NOT NULL,
    asset_name VARCHAR(256),
    asset_image_url VARCHAR(1024),
    listed_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_listcards_asset ON projection.listing_cards(asset_id);

-- === system ===
CREATE TABLE IF NOT EXISTS system.sync_cursors (
    network_id UUID REFERENCES ref.networks(network_id) PRIMARY KEY,
    last_block BIGINT DEFAULT 0,
    last_block_hash VARCHAR(128),
    last_updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS system.outbox_events (
    event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    topic VARCHAR(128) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMPTZ DEFAULT now(),
    published_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS ix_outbox_unpublished ON system.outbox_events(published_at) WHERE published_at IS NULL;

CREATE TABLE IF NOT EXISTS system.dlq (
    dlq_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    event_id UUID REFERENCES system.outbox_events(event_id),
    error TEXT NOT NULL,
    attempts INTEGER DEFAULT 0,
    last_attempt_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE IF NOT EXISTS system.api_keys (
    key_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    key_hash VARCHAR(128) UNIQUE NOT NULL,
    label VARCHAR(128) NOT NULL,
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMPTZ DEFAULT now()
);

COMMIT;

Step 2: Create seed data migration

-- migrations/002_seed_ref_data.sql
-- Seed reference data for Phase 1 (Polygon + TON)
--
-- IDs are uuidv5(NAMESPACE_URL:ft.supply, natural_key) — deterministic.
-- Generate with: python -c "import uuid; NS=uuid.uuid5(uuid.NAMESPACE_URL,'ft.supply'); print(uuid.uuid5(NS,'evm'))"
-- This matches make_id() from src/ft/core/models/base.py.

BEGIN;

-- Chains (make_id("evm"), make_id("ton"))
INSERT INTO ref.chains (chain_id, slug, display_name) VALUES
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'), 'evm', 'EVM'),
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton'), 'ton', 'TON')
ON CONFLICT (slug) DO NOTHING;

-- Networks (make_id("polygon"), make_id("ton-mainnet"))
INSERT INTO ref.networks (network_id, chain_id, slug, display_name, rpc_url, finality_depth) VALUES
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'polygon'),
     uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'),
     'polygon', 'Polygon', 'https://polygon-rpc.com', 128),
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton-mainnet'),
     uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton'),
     'ton-mainnet', 'TON Mainnet', 'https://tonapi.io/v2', 1)
ON CONFLICT (slug) DO NOTHING;

-- Token Standards (make_id("ERC-721"), etc.)
INSERT INTO ref.token_standards (standard_id, chain_id, slug, display_name) VALUES
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ERC-721'),
     uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'), 'ERC-721', 'ERC-721'),
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ERC-1155'),
     uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'), 'ERC-1155', 'ERC-1155'),
    (uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'TEP-62'),
     uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton'), 'TEP-62', 'TEP-62 NFT')
ON CONFLICT (slug) DO NOTHING;

COMMIT;

Step 3: Commit

git add migrations/
git commit -m "feat: baseline SQL migration + ref seed data"

Task 7: Docker Compose + Migration Runner

Files: - Create: docker/docker-compose.yml - Create: docker/Dockerfile - Create: scripts/migrate.sh

Step 1: Create dev docker-compose

# docker/docker-compose.yml
services:
  postgres:
    image: pgvector/pgvector:pg16
    ports: ["5432:5432"]
    environment:
      POSTGRES_DB: ftsupply
      POSTGRES_USER: ft
      POSTGRES_PASSWORD: ft
    volumes:
      - pg_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ft -d ftsupply"]
      interval: 5s
      retries: 5

  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      retries: 5

  minio:
    image: minio/minio
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio_data:/data

volumes:
  pg_data:
  minio_data:

Step 2: Create migration runner script

#!/usr/bin/env bash
# scripts/migrate.sh — Apply all SQL migrations in order
set -euo pipefail

DB_HOST="${POSTGRES_HOST:-localhost}"
DB_PORT="${POSTGRES_PORT:-5432}"
DB_NAME="${POSTGRES_DB:-ftsupply}"
DB_USER="${POSTGRES_USER:-ft}"

MIGRATIONS_DIR="$(cd "$(dirname "$0")/../migrations" && pwd)"

echo "Applying migrations from $MIGRATIONS_DIR to $DB_NAME@$DB_HOST:$DB_PORT"

for f in "$MIGRATIONS_DIR"/*.sql; do
    echo "  → $(basename "$f")"
    PGPASSWORD="${POSTGRES_PASSWORD:-ft}" psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -f "$f"
done

echo "Done."

Step 3: Create Dockerfile

# docker/Dockerfile
FROM python:3.12-slim AS base

RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc libpq-dev && rm -rf /var/lib/apt/lists/*

WORKDIR /app

# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

# Install dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev

# Copy source
COPY src/ src/
COPY migrations/ migrations/
COPY workers/ workers/

# Default: run API
CMD ["uv", "run", "uvicorn", "ft.api.app:app", "--host", "0.0.0.0", "--port", "8000"]

Step 4: Verify docker-compose starts

cd /c/Users/rusla/Projects/ft.supply
docker compose -f docker/docker-compose.yml up -d
# Wait for healthy
docker compose -f docker/docker-compose.yml ps

Step 5: Apply migrations

chmod +x scripts/migrate.sh
bash scripts/migrate.sh

Expected: all migrations applied without errors.

Step 6: Commit

git add docker/ scripts/
git commit -m "feat: Docker Compose (postgres, redis, minio) + migration runner"

Task 8: Event Envelope + Address Canonicalization

Files: - Create: src/ft/core/events.py - Create: src/ft/core/address.py - Test: tests/test_events.py - Test: tests/test_address.py

Step 1: Write event envelope tests

# tests/test_events.py
import pytest
from pydantic import ValidationError
from ft.core.events import RawEvent, RawEventDelta, EventKind, FinalityStatus

def test_raw_event_valid():
    event = RawEvent(
        source_event_id="0xabc:42",
        network_slug="polygon",
        block_number=65000000,
        tx_hash="0xabc123",
        log_index=42,
        timestamp=1700000000,
        event_kind=EventKind.TRANSFER,
        contract_address="0x1234567890abcdef",
        deltas=[RawEventDelta(from_address="0xAAA", to_address="0xBBB", token_id="1", qty=1)],
        finality_status=FinalityStatus.PENDING,
    )
    assert event.schema_version == 1
    assert len(event.deltas) == 1

def test_raw_event_mint_no_from():
    delta = RawEventDelta(from_address=None, to_address="0xBBB", token_id="1", qty=1)
    assert delta.from_address is None

def test_raw_event_erc1155_batch():
    deltas = [
        RawEventDelta(from_address="0xA", to_address="0xB", token_id=str(i), qty=1)
        for i in range(100)
    ]
    event = RawEvent(
        source_event_id="0xabc:0",
        network_slug="polygon",
        block_number=1,
        tx_hash="0x",
        log_index=0,
        timestamp=0,
        event_kind=EventKind.TRANSFER,
        contract_address="0x",
        deltas=deltas,
        finality_status=FinalityStatus.PENDING,
    )
    assert len(event.deltas) == 100

def test_raw_event_serialization_roundtrip():
    event = RawEvent(
        source_event_id="0xabc:42",
        network_slug="polygon",
        block_number=1,
        tx_hash="0x",
        log_index=0,
        timestamp=0,
        event_kind=EventKind.TRANSFER,
        contract_address="0x",
        deltas=[],
        finality_status=FinalityStatus.PENDING,
    )
    data = event.model_dump()
    restored = RawEvent.model_validate(data)
    assert restored == event

Step 2: Write address canonicalization tests

# tests/test_address.py
import pytest
from ft.core.address import canonicalize_address

def test_evm_lowercase():
    assert canonicalize_address("evm", "0xAbCdEf") == "0xabcdef"

def test_ton_raw_form():
    # bounceable → raw form conversion
    result = canonicalize_address("ton", "0:abc123")
    assert result == "0:abc123"  # already raw

def test_solana_case_preserved():
    addr = "7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU"
    assert canonicalize_address("solana", addr) == addr

def test_unknown_chain_raises():
    with pytest.raises(ValueError, match="Unknown chain"):
        canonicalize_address("bitcoin", "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa")

Step 3: Run tests — verify they fail

uv run pytest tests/test_events.py tests/test_address.py -v

Step 4: Implement events.py

# src/ft/core/events.py
from pydantic import BaseModel
from ft.core.models.enums import EventKind, FinalityStatus

__all__ = ["RawEvent", "RawEventDelta", "EventKind", "FinalityStatus"]

class RawEventDelta(BaseModel):
    """One ownership change within an event. ERC-1155 batch → multiple deltas."""
    from_address: str | None = None
    to_address: str | None = None
    token_id: str
    qty: int = 1

class RawEvent(BaseModel):
    """Frozen envelope between adapters and normalizer.
    schema_version bumps = breaking change. Fields never removed."""
    model_config = {"frozen": True}

    schema_version: int = 1
    source_event_id: str
    network_slug: str
    block_number: int
    tx_hash: str
    log_index: int
    sub_index: int = 0  # 0 for single events, 0..N-1 for batch expansions
    timestamp: int
    event_kind: EventKind
    contract_address: str
    collection_id: str | None = None
    deltas: list[RawEventDelta]
    finality_status: FinalityStatus
    raw_payload: dict | None = None

Step 5: Implement address.py

# src/ft/core/address.py

def canonicalize_address(chain: str, address: str) -> str:
    """Per-chain address canonicalization at ingestion time.
    CRITICAL: Never apply universal lower(). Solana addresses are case-sensitive."""
    match chain:
        case "evm":
            return address.lower()
        case "ton":
            return _ton_to_raw(address)
        case "solana":
            return address  # case-sensitive, do NOT touch
        case _:
            raise ValueError(f"Unknown chain: {chain}")


def _ton_to_raw(address: str) -> str:
    """Convert TON address to raw form (0:hex).
    If already raw form, return as-is."""
    if address.startswith("0:") or address.startswith("-1:"):
        return address.lower()
    # TODO: implement bounceable → raw conversion using tvm_hash
    # For now, return lowered (placeholder until TON adapter is built)
    return address.lower()

Step 6: Run tests

uv run pytest tests/test_events.py tests/test_address.py -v

Expected: PASS

Step 7: Commit

git add src/ft/core/events.py src/ft/core/address.py tests/test_events.py tests/test_address.py
git commit -m "feat: frozen event envelope (RawEvent) + address canonicalization"

Task 9: Idempotency Library (applied_events)

Files: - Create: src/ft/core/idempotency.py - Test: tests/idempotency/test_applied_events.py - Test: tests/conftest.py

Step 1: Create testcontainers conftest

# tests/conftest.py
import asyncio
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import text

@pytest.fixture(scope="session")
def event_loop():
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()

@pytest.fixture(scope="session")
def postgres_url():
    """Start a real Postgres via testcontainers."""
    from testcontainers.postgres import PostgresContainer

    with PostgresContainer(
        image="pgvector/pgvector:pg16",
        username="ft",
        password="ft",
        dbname="ftsupply_test",
    ) as pg:
        # Convert to asyncpg URL
        url = pg.get_connection_url()
        async_url = url.replace("psycopg2", "asyncpg", 1)
        if "postgresql://" in async_url and "asyncpg" not in async_url:
            async_url = async_url.replace("postgresql://", "postgresql+asyncpg://", 1)
        yield async_url

@pytest.fixture(scope="session")
async def engine(postgres_url):
    eng = create_async_engine(postgres_url)
    yield eng
    await eng.dispose()

@pytest.fixture(scope="session")
async def _apply_migrations(engine):
    """Apply all migrations once per session.

    IMPORTANT: Do NOT use sql.split(";") — it breaks PL/pgSQL function bodies,
    dollar-quoted strings, and comments containing semicolons.
    Instead, execute each .sql file as a single statement via raw connection.
    asyncpg supports multi-statement execution through connection.execute().
    """
    import pathlib
    migrations_dir = pathlib.Path(__file__).parent.parent / "migrations"
    async with engine.begin() as conn:
        for sql_file in sorted(migrations_dir.glob("*.sql")):
            sql = sql_file.read_text()
            # Execute via raw asyncpg connection to preserve multi-statement semantics.
            raw = await conn.get_raw_connection()
            await raw.driver_connection.execute(sql)

@pytest.fixture
async def db(engine, _apply_migrations) -> AsyncSession:
    """Fresh transaction per test, rolled back after."""
    async with engine.connect() as conn:
        trans = await conn.begin()
        session = AsyncSession(bind=conn, expire_on_commit=False)
        yield session
        await session.close()
        await trans.rollback()

Step 2: Write idempotency tests

# tests/idempotency/test_applied_events.py
import uuid
import asyncio
import pytest
from ft.core.idempotency import try_apply_event
from ft.core.models.base import make_id

# We need some normalized events in the DB for FK constraints.
# Helper to insert a minimal normalized_event row.
from sqlalchemy import text

async def _insert_normalized_event(db, event_id: uuid.UUID):
    await db.execute(text("""
        INSERT INTO ledger.normalized_events
        (normalized_event_id, network_id, asset_id, event_kind, contract_address,
         block_number, tx_hash, timestamp, finality_status)
        VALUES (:eid, :nid, :aid, 'transfer', '0x', 1, '0x', 0, 'pending')
        ON CONFLICT DO NOTHING
    """), {
        "eid": event_id,
        "nid": make_id("polygon"),  # dummy
        "aid": make_id("polygon", "0x", "1"),
    })
    await db.flush()


class TestIdempotency:
    async def test_single_delivery(self, db):
        """First apply returns True."""
        eid = make_id("test", "event", "1")
        await _insert_normalized_event(db, eid)
        result = await try_apply_event(db, "test_consumer", eid)
        assert result is True

    async def test_duplicate_delivery(self, db):
        """Second apply of same event returns False."""
        eid = make_id("test", "event", "2")
        await _insert_normalized_event(db, eid)
        r1 = await try_apply_event(db, "test_consumer", eid)
        r2 = await try_apply_event(db, "test_consumer", eid)
        assert r1 is True
        assert r2 is False

    async def test_different_consumers_independent(self, db):
        """Different consumers can apply the same event."""
        eid = make_id("test", "event", "3")
        await _insert_normalized_event(db, eid)
        r1 = await try_apply_event(db, "consumer_a", eid)
        r2 = await try_apply_event(db, "consumer_b", eid)
        assert r1 is True
        assert r2 is True

Step 3: Run tests — verify they fail

uv run pytest tests/idempotency/ -v

Step 4: Implement idempotency library

# src/ft/core/idempotency.py
import uuid
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession


async def try_apply_event(db: AsyncSession, consumer_id: str, event_id: uuid.UUID) -> bool:
    """Idempotency gate. Returns True if event was newly applied, False if already seen.

    Uses INSERT ON CONFLICT DO NOTHING on ledger.applied_events.
    Must be called BEFORE any state mutation. If returns False, skip the mutation.
    """
    result = await db.execute(
        text("""
            INSERT INTO ledger.applied_events (consumer_id, normalized_event_id)
            VALUES (:consumer_id, :event_id)
            ON CONFLICT (consumer_id, normalized_event_id) DO NOTHING
            RETURNING consumer_id
        """),
        {"consumer_id": consumer_id, "event_id": event_id},
    )
    return result.rowcount > 0

Step 5: Run tests

uv run pytest tests/idempotency/ -v

Expected: PASS (requires Docker running for testcontainers)

Step 6: Commit

git add src/ft/core/idempotency.py tests/conftest.py tests/idempotency/
git commit -m "feat: applied_events idempotency library with testcontainers tests"

Task 10: Chain Adapter Base + Contract Test Suite

Files: - Create: src/ft/ingest/base.py - Create: tests/contract/test_adapter_contract.py - Create: tests/contract/conftest.py

Step 1: Implement adapter ABC

# src/ft/ingest/base.py
from abc import ABC, abstractmethod
from ft.core.events import RawEvent


class ChainAdapter(ABC):
    """Abstract chain adapter. Every concrete adapter MUST pass all 5 contract tests."""

    @abstractmethod
    async def fetch_blocks(self, from_block: int, to_block: int) -> list[RawEvent]:
        """Fetch and parse events from inclusive block range.
        Returns list of RawEvents with canonicalized addresses."""

    @abstractmethod
    async def get_latest_block(self) -> int:
        """Return current chain head block number."""

    @abstractmethod
    async def detect_reorg(self, block_number: int, expected_hash: str) -> bool:
        """Return True if block at block_number has a different hash than expected.
        True = reorg detected."""

    @abstractmethod
    def canonicalize_address(self, address: str) -> str:
        """Chain-specific address canonicalization."""

    @property
    @abstractmethod
    def network_slug(self) -> str:
        """Network identifier, e.g. 'polygon', 'ton-mainnet'."""

    @property
    @abstractmethod
    def chain_slug(self) -> str:
        """Chain family, e.g. 'evm', 'ton'."""

Step 2: Write contract test suite (adapter-agnostic)

# tests/contract/test_adapter_contract.py
"""
Contract tests that EVERY ChainAdapter must pass.
5 tests from architecture spec §16.5.
Parametrized over all registered adapters.
"""
import pytest
from ft.core.events import RawEvent, EventKind


class AdapterContractSuite:
    """Mixin — inherit in adapter-specific test files and set `adapter` fixture."""

    async def test_schema_validation(self, adapter, fixture_blocks):
        """All returned RawEvents pass Pydantic validation."""
        for from_b, to_b in fixture_blocks:
            events = await adapter.fetch_blocks(from_b, to_b)
            for event in events:
                assert isinstance(event, RawEvent)
                # Re-validate to catch any mutation
                RawEvent.model_validate(event.model_dump())

    async def test_sub_index_completeness(self, adapter, batch_fixture_block):
        """For batch events: sub_index values are contiguous 0..N-1."""
        from_b, to_b = batch_fixture_block
        events = await adapter.fetch_blocks(from_b, to_b)
        # Group events by source_event_id
        by_source: dict[str, list[RawEvent]] = {}
        for e in events:
            by_source.setdefault(e.source_event_id, []).append(e)
        for source_id, group in by_source.items():
            if len(group) > 1:
                # Batch — verify explicit sub_index values are contiguous 0..N-1
                indices = sorted(e.sub_index for e in group)
                expected = list(range(len(group)))
                assert indices == expected, (
                    f"sub_index not contiguous for {source_id}: got {indices}, expected {expected}"
                )

    async def test_delta_conservation(self, adapter, fixture_blocks):
        """Delta conservation using signed qty_delta at normalized level.
        Transfer: sum(qty_delta)==0. Mint: sum>0. Burn: sum<0.
        Deltas use signed values: from_address gets -qty, to_address gets +qty."""
        for from_b, to_b in fixture_blocks:
            events = await adapter.fetch_blocks(from_b, to_b)
            for event in events:
                if not event.deltas:
                    continue  # list/cancel events have no deltas
                # Compute signed sum: from=-qty, to=+qty
                signed_total = 0
                for d in event.deltas:
                    if d.from_address:
                        signed_total -= d.qty
                    if d.to_address:
                        signed_total += d.qty
                if event.event_kind == EventKind.TRANSFER:
                    assert signed_total == 0, (
                        f"Transfer must conserve: sum={signed_total}, event={event.source_event_id}"
                    )
                elif event.event_kind == EventKind.MINT:
                    assert signed_total > 0, (
                        f"Mint must have positive net: sum={signed_total}, event={event.source_event_id}"
                    )
                elif event.event_kind == EventKind.BURN:
                    assert signed_total < 0, (
                        f"Burn must have negative net: sum={signed_total}, event={event.source_event_id}"
                    )

    async def test_source_event_id_stability(self, adapter, fixture_blocks):
        """Same block fetched twice produces identical source_event_ids."""
        for from_b, to_b in fixture_blocks:
            events_1 = await adapter.fetch_blocks(from_b, to_b)
            events_2 = await adapter.fetch_blocks(from_b, to_b)
            ids_1 = sorted(e.source_event_id for e in events_1)
            ids_2 = sorted(e.source_event_id for e in events_2)
            assert ids_1 == ids_2

    async def test_empty_deltas_for_non_transfers(self, adapter, fixture_blocks):
        """List/cancel events have no ownership deltas."""
        for from_b, to_b in fixture_blocks:
            events = await adapter.fetch_blocks(from_b, to_b)
            for event in events:
                if event.event_kind in (EventKind.LIST, EventKind.CANCEL):
                    assert len(event.deltas) == 0, (
                        f"{event.event_kind} should have no deltas: {event}"
                    )

Step 3: Commit

git add src/ft/ingest/base.py tests/contract/
git commit -m "feat: ChainAdapter ABC + 5 contract test suite"

Task 11: Polygon Adapter

Files: - Create: src/ft/ingest/polygon/__init__.py - Create: src/ft/ingest/polygon/adapter.py - Create: src/ft/ingest/polygon/parser.py - Create: src/ft/ingest/polygon/abi/erc721.json - Create: src/ft/ingest/polygon/abi/erc1155.json - Test: tests/contract/test_polygon_adapter.py - Create: tests/fixtures/polygon/ (fixture block data)

This task is implementation-heavy. The adapter must: 1. Connect to Polygon RPC via httpx (or web3.py) 2. Fetch logs for ERC-721 Transfer + ERC-1155 TransferSingle/TransferBatch events 3. Parse into RawEvent with canonicalized addresses 4. Handle ERC-1155 batch → multiple RawEvents with sub_index 5. Pass all 5 contract tests

Step 1: Create fixture data by fetching real Polygon blocks

Write a script scripts/fetch_polygon_fixtures.py that fetches specific blocks with known NFT transfers and saves them as JSON in tests/fixtures/polygon/.

Step 2: Implement parser (ERC-721/1155 log decoding)

# src/ft/ingest/polygon/parser.py
from ft.core.events import RawEvent, RawEventDelta, EventKind, FinalityStatus

# ERC-721 Transfer(address,address,uint256)
ERC721_TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
# ERC-1155 TransferSingle(address,address,address,uint256,uint256)
ERC1155_SINGLE_TOPIC = "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62"
# ERC-1155 TransferBatch(address,address,address,uint256[],uint256[])
ERC1155_BATCH_TOPIC = "0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb"

ZERO_ADDRESS = "0x" + "0" * 40


def parse_evm_logs(logs: list[dict], network_slug: str, block_number: int, block_timestamp: int) -> list[RawEvent]:
    """Parse EVM event logs into RawEvents."""
    events = []
    for log in logs:
        topics = log.get("topics", [])
        if not topics:
            continue
        topic0 = topics[0]
        if topic0 == ERC721_TRANSFER_TOPIC and len(topics) >= 4:
            events.extend(_parse_erc721_transfer(log, network_slug, block_number, block_timestamp))
        elif topic0 == ERC1155_SINGLE_TOPIC:
            events.extend(_parse_erc1155_single(log, network_slug, block_number, block_timestamp))
        elif topic0 == ERC1155_BATCH_TOPIC:
            events.extend(_parse_erc1155_batch(log, network_slug, block_number, block_timestamp))
    return events


def _parse_erc721_transfer(log: dict, network_slug: str, block_number: int, timestamp: int) -> list[RawEvent]:
    topics = log["topics"]
    from_addr = "0x" + topics[1][-40:]
    to_addr = "0x" + topics[2][-40:]
    token_id = str(int(topics[3], 16))
    contract = log["address"].lower()

    is_mint = from_addr == ZERO_ADDRESS
    is_burn = to_addr == ZERO_ADDRESS

    if is_mint:
        event_kind = EventKind.MINT
    elif is_burn:
        event_kind = EventKind.BURN
    else:
        event_kind = EventKind.TRANSFER

    deltas = []
    if not is_mint:
        deltas.append(RawEventDelta(from_address=from_addr.lower(), to_address=None, token_id=token_id, qty=1))
    if not is_burn:
        deltas.append(RawEventDelta(from_address=None, to_address=to_addr.lower(), token_id=token_id, qty=1))

    return [RawEvent(
        source_event_id=f"{log['transactionHash']}:{int(log['logIndex'], 16)}",
        network_slug=network_slug,
        block_number=block_number,
        tx_hash=log["transactionHash"],
        log_index=int(log["logIndex"], 16),
        sub_index=0,
        timestamp=timestamp,
        event_kind=event_kind,
        contract_address=contract,
        deltas=deltas,
        finality_status=FinalityStatus.PENDING,
        raw_payload=log,
    )]

# _parse_erc1155_single and _parse_erc1155_batch follow similar patterns
# with ABI decoding of data field for token IDs and quantities.
# For TransferBatch, emit one RawEvent per token class with explicit sub_index=0..N-1.

Step 3: Implement adapter

# src/ft/ingest/polygon/adapter.py
import httpx
from ft.ingest.base import ChainAdapter
from ft.ingest.polygon.parser import parse_evm_logs
from ft.core.events import RawEvent


class PolygonAdapter(ChainAdapter):
    def __init__(self, rpc_url: str = "https://polygon-rpc.com"):
        self.rpc_url = rpc_url
        self._client = httpx.AsyncClient(timeout=30)

    @property
    def network_slug(self) -> str:
        return "polygon"

    @property
    def chain_slug(self) -> str:
        return "evm"

    def canonicalize_address(self, address: str) -> str:
        return address.lower()

    async def get_latest_block(self) -> int:
        resp = await self._rpc("eth_blockNumber", [])
        return int(resp, 16)

    async def detect_reorg(self, block_number: int, expected_hash: str) -> bool:
        resp = await self._rpc("eth_getBlockByNumber", [hex(block_number), False])
        return resp["hash"] != expected_hash

    async def fetch_blocks(self, from_block: int, to_block: int) -> list[RawEvent]:
        # Get block timestamps
        timestamps = {}
        for bn in range(from_block, to_block + 1):
            block = await self._rpc("eth_getBlockByNumber", [hex(bn), False])
            timestamps[bn] = int(block["timestamp"], 16)

        # Get logs for NFT transfer topics
        logs = await self._rpc("eth_getLogs", [{
            "fromBlock": hex(from_block),
            "toBlock": hex(to_block),
            "topics": [[
                "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",  # ERC-721
                "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62",  # ERC-1155 Single
                "0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb",  # ERC-1155 Batch
            ]],
        }])

        events = []
        for log in logs:
            bn = int(log["blockNumber"], 16)
            ts = timestamps.get(bn, 0)
            events.extend(parse_evm_logs([log], self.network_slug, bn, ts))
        return events

    async def _rpc(self, method: str, params: list) -> dict:
        resp = await self._client.post(self.rpc_url, json={
            "jsonrpc": "2.0", "method": method, "params": params, "id": 1,
        })
        resp.raise_for_status()
        data = resp.json()
        if "error" in data:
            raise RuntimeError(f"RPC error: {data['error']}")
        return data["result"]

Step 4: Write adapter-specific contract tests

# tests/contract/test_polygon_adapter.py
import pytest
from tests.contract.test_adapter_contract import AdapterContractSuite

# For now, use fixture-based adapter (no real RPC calls in tests)
# TODO: create FixturePolygonAdapter that reads from tests/fixtures/polygon/

class TestPolygonContract(AdapterContractSuite):
    @pytest.fixture
    def adapter(self):
        """Must return a concrete adapter initialized with fixture data.
        Example: return FixturePolygonAdapter("tests/fixtures/polygon/")
        """
        raise NotImplementedError(
            "Subclass must provide adapter fixture with real fixture data"
        )

    @pytest.fixture
    def fixture_blocks(self):
        return [(65000000, 65000000)]

    @pytest.fixture
    def batch_fixture_block(self):
        return (65000000, 65000000)

Step 5: Commit

git add src/ft/ingest/polygon/ tests/contract/test_polygon_adapter.py
git commit -m "feat: Polygon adapter — ERC-721/1155 parser + RPC client"

Task 12: TON Adapter

Files: - Create: src/ft/ingest/ton/__init__.py - Create: src/ft/ingest/ton/adapter.py - Create: src/ft/ingest/ton/parser.py - Test: tests/contract/test_ton_adapter.py

Follows same pattern as Task 11 but for TonAPI v2: - Masterchain seqno as cursor - TEP-62 NFT events (transfer_notification, ownership_assigned) - Raw form addresses (0:hex) - GetGems marketplace events

Step 1-5: Mirror Task 11 structure with TON-specific parsing.

Step 6: Commit

git add src/ft/ingest/ton/ tests/contract/test_ton_adapter.py
git commit -m "feat: TON adapter — TEP-62 NFT + GetGems parser"

Task 13: Block Fetcher + Sync Cursor

Files: - Create: src/ft/ingest/fetcher.py - Test: tests/test_fetcher.py

Step 1: Write fetcher tests

# tests/test_fetcher.py
import pytest
from unittest.mock import AsyncMock
from ft.ingest.fetcher import BlockFetcher

async def test_fetcher_advances_cursor(db):
    adapter = AsyncMock()
    adapter.get_latest_block.return_value = 10
    adapter.fetch_blocks.return_value = []
    adapter.detect_reorg.return_value = False
    adapter.network_slug = "polygon"

    fetcher = BlockFetcher(adapter, network_slug="polygon", batch_size=5)
    # Simulate one iteration
    await fetcher.step(db)
    # Cursor should advance
    assert fetcher.last_block > 0

Step 2: Implement fetcher

# src/ft/ingest/fetcher.py
import asyncio
import structlog
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ft.ingest.base import ChainAdapter
from ft.core.events import RawEvent

logger = structlog.get_logger()

class BlockFetcher:
    def __init__(
        self,
        adapter: ChainAdapter,
        network_slug: str,
        batch_size: int = 100,
        poll_interval: float = 2.0,
        max_retries: int = 10,
    ):
        self.adapter = adapter
        self.network_slug = network_slug
        self.batch_size = batch_size
        self.poll_interval = poll_interval
        self.max_retries = max_retries
        self.last_block: int = 0
        self.last_hash: str | None = None
        self._consecutive_errors = 0

    async def load_cursor(self, db: AsyncSession) -> None:
        row = await db.execute(text(
            "SELECT last_block, last_block_hash FROM system.sync_cursors WHERE network_id = ("
            "  SELECT network_id FROM ref.networks WHERE slug = :slug"
            ")"
        ), {"slug": self.network_slug})
        row = row.first()
        if row:
            self.last_block = row.last_block
            self.last_hash = row.last_block_hash

    async def save_cursor(self, db: AsyncSession, block: int, block_hash: str | None = None) -> None:
        await db.execute(text("""
            INSERT INTO system.sync_cursors (network_id, last_block, last_block_hash, last_updated_at)
            SELECT network_id, :block, :hash, now() FROM ref.networks WHERE slug = :slug
            ON CONFLICT (network_id)
            DO UPDATE SET last_block = :block, last_block_hash = :hash, last_updated_at = now()
        """), {"slug": self.network_slug, "block": block, "hash": block_hash})
        await db.commit()
        self.last_block = block
        self.last_hash = block_hash

    async def step(self, db: AsyncSession) -> list[RawEvent]:
        """One fetch iteration. Returns events or empty list."""
        head = await self.adapter.get_latest_block()
        if self.last_block >= head:
            return []

        to_block = min(self.last_block + self.batch_size, head)

        # Reorg check
        if self.last_block > 0 and self.last_hash:
            is_reorg = await self.adapter.detect_reorg(self.last_block, self.last_hash)
            if is_reorg:
                logger.warning("reorg_detected", block=self.last_block, network=self.network_slug)
                return []  # reorg handler will take over

        events = await self.adapter.fetch_blocks(self.last_block + 1, to_block)
        await self.save_cursor(db, to_block)
        self._consecutive_errors = 0
        return events

Step 3: Run tests, commit

uv run pytest tests/test_fetcher.py -v
git add src/ft/ingest/fetcher.py tests/test_fetcher.py
git commit -m "feat: BlockFetcher with cursor persistence + reorg check"

Task 14: Normalizer Worker

Files: - Create: src/ft/pipeline/normalizer.py - Test: tests/test_normalizer.py

The normalizer: 1. Consumes RawEvent from Redis Stream 2. UPSERT normalized_event_keys on (source_event_id, sub_index) — dedup 3. Creates normalized_events + normalized_event_deltas 4. Creates catalog stubs (collection + asset) via uuidv5 5. Publishes to ledger.normalized stream via outbox

Step 1: Write normalizer tests against real Postgres

Test that: same event processed twice → exactly 1 normalized_event row, exactly 1 set of deltas (dedup works). On replay/reorg re-inclusion: metadata updated (is_reverted=false), but no duplicate normalized_event_deltas created.

Step 2: Implement normalizer

Key SQL pattern for dedup UPSERT:

# src/ft/pipeline/normalizer.py
async def normalize_event(db: AsyncSession, raw: RawEvent) -> uuid.UUID | None:
    """Process one RawEvent. Returns normalized_event_id or None if duplicate."""
    normalized_id = make_id(raw.network_slug, raw.source_event_id)
    asset_id = make_id(raw.network_slug, raw.contract_address, raw.deltas[0].token_id if raw.deltas else "")
    collection_id = make_id(raw.network_slug, raw.contract_address)

    # 1. Ensure catalog stubs exist
    await _ensure_collection_stub(db, collection_id, raw)
    await _ensure_asset_stub(db, asset_id, collection_id, raw)

    # 2. Insert normalized event
    await db.execute(text("""
        INSERT INTO ledger.normalized_events
        (normalized_event_id, network_id, asset_id, collection_id, event_kind,
         contract_address, block_number, tx_hash, timestamp, finality_status)
        SELECT :eid, n.network_id, :aid, :cid, :kind, :contract, :block, :tx, :ts, :fin
        FROM ref.networks n WHERE n.slug = :net
        ON CONFLICT (normalized_event_id) DO UPDATE
        SET finality_status = EXCLUDED.finality_status
    """), {...})

    # 3. UPSERT event key (dedup gate) — returns xmax=0 for new rows
    #    On replay/reorg re-inclusion: updates is_reverted=false, but does NOT re-insert deltas.
    is_first_seen = False
    for i, delta in enumerate(raw.deltas):
        result = await db.execute(text("""
            INSERT INTO ledger.normalized_event_keys (source_event_id, sub_index, normalized_event_id, is_reverted)
            VALUES (:seid, :sub, :eid, false)
            ON CONFLICT ON CONSTRAINT uq_event_key
            DO UPDATE SET is_reverted = false, normalized_event_id = EXCLUDED.normalized_event_id
            RETURNING (xmax = 0) AS inserted
        """), {"seid": raw.source_event_id, "sub": i, "eid": normalized_id})
        row = result.fetchone()
        if row and row.inserted:
            is_first_seen = True

    # 4. Insert deltas ONLY on first-seen events — never duplicate on replay
    #    Replay/reorg re-inclusion only flips is_reverted back to false (step 3).
    if is_first_seen:
        for delta in raw.deltas:
            if delta.from_address:
                await db.execute(text("""
                    INSERT INTO ledger.normalized_event_deltas (normalized_event_id, account_address, qty_delta)
                    VALUES (:eid, :addr, :delta)
                """), {"eid": normalized_id, "addr": delta.from_address, "delta": -delta.qty})
            if delta.to_address:
                await db.execute(text("""
                    INSERT INTO ledger.normalized_event_deltas (normalized_event_id, account_address, qty_delta)
                    VALUES (:eid, :addr, :delta)
                """), {"eid": normalized_id, "addr": delta.to_address, "delta": delta.qty})

    # 5. Publish via outbox
    await _publish_outbox(db, "ledger.normalized", normalized_id)
    await db.commit()
    return normalized_id

Step 3: Run tests, commit

git add src/ft/pipeline/normalizer.py tests/test_normalizer.py
git commit -m "feat: normalizer — dedup UPSERT + catalog stubs + outbox publish"

Task 15: State Updater — Ownership

Files: - Create: src/ft/pipeline/state_updater.py - Test: tests/test_state_updater.py

Core pattern: consume from ledger.normalized → check applied_events → UPSERT ownership_current qty += delta → DELETE WHERE qty <= 0.

Step 1: Write tests — transfer A→B, verify balances. Double delivery, verify no double-count.

Step 2: Implementtry_apply_event() guard + UPSERT ownership + outbox publish.

Step 3: Commit


Task 16: State Updater — Listings & Sales

Files: - Modify: src/ft/pipeline/state_updater.py (add listing/sale handlers) - Test: tests/test_state_updater.py (add listing/sale test cases)

Handle LIST → INSERT listings_current, SALE → INSERT sales_history + DELETE listing, CANCEL → DELETE listing.

Step 1-3: Tests, implement, commit.


Task 17: Projection Pipeline

Files: - Create: src/ft/pipeline/projections.py - Test: tests/test_projections.py - Test: tests/reconciliation/test_ownership_reconciliation.py

Consume domain events → UPSERT projection tables (all using applied_events idempotency).

Step 1: Write reconciliation test

# tests/reconciliation/test_ownership_reconciliation.py
async def test_ownership_view_matches_ground_truth(db):
    """projection.ownership_view == recomputed from ledger deltas."""
    # Process N events through full pipeline
    # Compare projection.ownership_view vs:
    #   SELECT asset_id, account_address, SUM(qty_delta)
    #   FROM ledger.normalized_event_deltas d
    #   JOIN ledger.normalized_event_keys k ON d.normalized_event_id = k.normalized_event_id
    #   WHERE NOT k.is_reverted
    #   GROUP BY asset_id, account_address
    #   HAVING SUM(qty_delta) > 0
    # Assert: 0 mismatches

Step 2: Implement projections, commit


Task 18: Reorg Handler

Files: - Create: src/ft/pipeline/reorg.py - Test: tests/reorg/test_reorg_scenarios.py

All 5 scenarios from architecture spec. Tests against real Postgres.

Step 1: Write all 5 reorg scenario tests (see design doc section 6 for test bodies)

Step 2: Implement reorg handler

# src/ft/pipeline/reorg.py
async def handle_reorg(db: AsyncSession, network_slug: str, from_block: int, to_block: int):
    """Mark events in reverted blocks as is_reverted=true, recompute ownership."""
    # 1. Find affected normalized_event_ids
    # 2. SET is_reverted = true on normalized_event_keys
    # 3. Find affected asset_ids
    # 4. For each asset: recompute ownership_current from non-reverted deltas
    # 5. Update projections for affected assets

Step 3: Run reorg tests, commit


Task 19: workerlib v2 Framework

Files: - Create: src/ft/worker/framework.py - Create: src/ft/worker/redis_streams.py - Create: src/ft/worker/__main__.py - Test: tests/test_worker_framework.py

Step 1: Implement Redis Streams consumer/producer

Step 2: Implement @task decorator, Trigger enum, DI

Step 3: Implement __main__.py — entry point python -m ft.worker <name>

Step 4: Test, commit


Task 20: Outbox Publisher Worker

Files: - Create: src/ft/worker/outbox.py - Test: tests/test_outbox.py

Polls system.outbox_events WHERE published_at IS NULL, publishes to Redis Streams, marks published. After 5 failures → DLQ.

Step 1-3: Test, implement, commit.


Task 21: API Skeleton + Auth + Health

Files: - Create: src/ft/api/app.py - Create: src/ft/api/deps.py - Create: src/ft/api/auth.py - Create: src/ft/api/routers/__init__.py - Create: src/ft/api/routers/health.py - Test: tests/api/test_health.py

Step 1: Write health endpoint test

# tests/api/test_health.py
from httpx import AsyncClient, ASGITransport
from ft.api.app import app

async def test_health():
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as client:
        resp = await client.get("/health")
        assert resp.status_code == 200
        assert resp.json()["status"] == "ok"

Step 2: Implement app.py, deps.py, health router

Step 3: Test, commit


Task 22: API Endpoints — Assets + Collections

Files: - Create: src/ft/api/routers/assets.py - Create: src/ft/api/routers/collections.py - Create: src/ft/api/schemas/assets.py - Create: src/ft/api/schemas/collections.py - Test: tests/api/test_assets.py - Test: tests/api/test_collections.py - Test: tests/api/test_domain_isolation.py

Step 1: Write domain isolation test

# tests/api/test_domain_isolation.py
async def test_api_reads_only_projection_schema(db):
    """All API SQL queries touch ONLY projection.* tables."""
    # Intercept SQL statements during API request
    # Assert: every FROM/JOIN references projection.* only

Step 2: Implement endpoints, schemas

Step 3: Test, commit


Task 23: Observability — Metrics + Logging

Files: - Create: src/ft/observability/metrics.py - Create: src/ft/observability/logging.py

Step 1: Implement Prometheus metrics (ingest_lag, normalizer_lag, dlq_depth, etc.)

Step 2: Implement structlog configuration (JSON to stdout)

Step 3: Add metrics endpoint to API (/metrics)

Step 4: Commit


Task 24: E2E Test + CI Pipeline

Files: - Create: tests/e2e/test_pipeline.py - Create: .github/workflows/ci.yml

Step 1: Write E2E test

# tests/e2e/test_pipeline.py
async def test_chain_event_to_api_response(db):
    """Full pipeline: RawEvent → normalize → state update → projection → API query."""
    # 1. Create a RawEvent (mint)
    # 2. Run normalizer
    # 3. Run state updater
    # 4. Run projection pipeline
    # 5. Query projection.asset_cards
    # 6. Assert correct data + finality_status

Step 2: Create CI workflow

# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: astral-sh/setup-uv@v4
      - run: uv sync
      - run: uv run ruff check src/ tests/
      - run: uv run ruff format --check src/ tests/
      - run: uv run pytest tests/ -v --tb=short

Step 3: Commit

git add tests/e2e/ .github/
git commit -m "feat: E2E pipeline test + GitHub Actions CI"

Task 25: Docker Production Setup

Files: - Finalize: docker/Dockerfile - Create: docker/docker-compose.prod.yml - Update: docker/docker-compose.yml (add worker services)

Add all worker services to docker-compose with proper depends_on, health checks, and single image with different CMD.

Step 1: Finalize, commit


Task 26: Final Verification — All Exit Criteria

Step 1: Run full test suite

uv run pytest tests/ -v --tb=short

Expected results: - tests/contract/ — 10 tests PASS (5 × 2 adapters) - tests/idempotency/ — 3 tests PASS - tests/reorg/ — 5 tests PASS - tests/e2e/ — 1 test PASS - tests/reconciliation/ — 1 test PASS - tests/api/test_domain_isolation.py — 1 test PASS

Step 2: Run linter

uv run ruff check src/ tests/
uv run ruff format --check src/ tests/

Step 3: Verify docker-compose prod starts cleanly

docker compose -f docker/docker-compose.yml -f docker/docker-compose.prod.yml up -d
# Apply migrations
bash scripts/migrate.sh
# Check health
curl http://localhost:8000/health

Step 4: Update CLAUDE.md with build/test commands

Step 5: Final commit

git add .
git commit -m "feat: Phase 1 complete — all exit criteria passing"