Phase 1 Implementation Plan¶
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Build a working multi-chain NFT indexing pipeline (Polygon + TON) with dedup, idempotency, reorg handling, projections, and a read-only API — all Phase 1 exit criteria passing.
Architecture: Modular monolith under src/ft/* namespace. Event-driven pipeline: Chain Adapters → Normalizer → State Updater → Projection Pipeline → FastAPI. Redis Streams as event bus, PostgreSQL as source of truth, applied_events for idempotency, outbox pattern for reliable publishing.
Tech Stack: Python 3.12+, uv, FastAPI, SQLAlchemy 2.0 (Mapped[]), asyncpg, Redis Streams, PostgreSQL 16 + pgvector, Pydantic v2, pytest + testcontainers, structlog, Prometheus client, Docker Compose.
Design doc: docs/plans/2026-02-28-phase1-design.md
Reference: docs/NFT_Platform_Implementation_Plan.md (architecture spec)
Task 1: Project Scaffold¶
Files:
- Create: pyproject.toml
- Create: src/ft/__init__.py
- Create: src/ft/core/__init__.py
- Create: tests/__init__.py
- Create: .gitignore
- Create: .python-version
Step 1: Initialize git repo
cd /c/Users/rusla/Projects/ft.supply
git init
Step 2: Create .python-version
3.12
Step 3: Create .gitignore
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.eggs/
*.egg
.venv/
.env
.env.*
!.env.example
*.db
*.sqlite3
.mypy_cache/
.ruff_cache/
.pytest_cache/
.coverage
htmlcov/
node_modules/
Step 4: Create pyproject.toml
[project]
name = "ft-supply"
version = "0.1.0"
description = "Multi-chain NFT indexing and analytics platform"
requires-python = ">=3.12"
dependencies = [
# Web
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
# Database
"sqlalchemy[asyncio]>=2.0.36",
"asyncpg>=0.30.0",
"alembic>=1.14.0",
# Redis
"redis[hiredis]>=5.2.0",
# Validation & Config
"pydantic>=2.10.0",
"pydantic-settings>=2.7.0",
# HTTP
"httpx>=0.28.0",
# Blockchain (EVM)
"web3>=7.6.0",
# Observability
"structlog>=24.4.0",
"prometheus-client>=0.21.0",
# Utils
"orjson>=3.10.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3.0",
"pytest-asyncio>=0.24.0",
"testcontainers[postgres]>=4.9.0",
"ruff>=0.8.0",
"httpx>=0.28.0", # for FastAPI TestClient
]
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/ft"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
pythonpath = ["src"]
[tool.ruff]
src = ["src", "tests"]
line-length = 120
target-version = "py312"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "UP", "B", "SIM", "RUF"]
ignore = [
"E712", # SQLAlchemy `== True` comparisons
]
[tool.ruff.lint.isort]
known-first-party = ["ft"]
Step 5: Create package init files
mkdir -p src/ft/core src/ft/ingest src/ft/pipeline src/ft/api src/ft/worker src/ft/observability
mkdir -p tests/contract tests/idempotency tests/reorg tests/e2e tests/reconciliation tests/api
mkdir -p tests/fixtures/polygon tests/fixtures/ton tests/fixtures/shared
mkdir -p migrations docker workers
Create empty __init__.py in every src/ft/ subpackage and tests/ directory.
Step 6: Install with uv
uv sync
Expected: lockfile generated, venv created, all deps installed.
Step 7: Verify setup
uv run python -c "import ft; print('ok')"
uv run ruff check src/ tests/
uv run pytest --co # collect only, no tests yet
Step 8: Commit
git add .
git commit -m "feat: project scaffold with uv + pyproject.toml"
Task 2: Core Config + DB Engine¶
Files:
- Create: src/ft/core/config.py
- Create: src/ft/core/db.py
- Create: .env.example
- Test: tests/test_config.py
Step 1: Write test for config loading
# tests/test_config.py
import os
import pytest
from ft.core.config import Settings
def test_settings_from_env(monkeypatch):
monkeypatch.setenv("POSTGRES_HOST", "localhost")
monkeypatch.setenv("POSTGRES_PORT", "5432")
monkeypatch.setenv("POSTGRES_DB", "ftsupply")
monkeypatch.setenv("POSTGRES_USER", "ft")
monkeypatch.setenv("POSTGRES_PASSWORD", "ft")
monkeypatch.setenv("REDIS_URL", "redis://localhost:6379/0")
settings = Settings()
assert "localhost" in settings.database_url
assert "5432" in settings.database_url
assert settings.redis_url == "redis://localhost:6379/0"
def test_settings_defaults(monkeypatch):
monkeypatch.setenv("POSTGRES_PASSWORD", "ft")
settings = Settings()
assert settings.postgres_host == "localhost"
assert settings.postgres_port == 5432
Step 2: Run test to verify it fails
uv run pytest tests/test_config.py -v
Expected: FAIL — ModuleNotFoundError: No module named 'ft.core.config'
Step 3: Implement config
# src/ft/core/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
model_config = {"env_prefix": "", "case_sensitive": False}
# Postgres
postgres_host: str = "localhost"
postgres_port: int = 5432
postgres_db: str = "ftsupply"
postgres_user: str = "ft"
postgres_password: str = "ft"
# Redis
redis_url: str = "redis://localhost:6379/0"
# MinIO
minio_endpoint: str = "localhost:9000"
minio_access_key: str = "minioadmin"
minio_secret_key: str = "minioadmin"
# API
api_host: str = "0.0.0.0"
api_port: int = 8000
@property
def database_url(self) -> str:
return (
f"postgresql+asyncpg://{self.postgres_user}:{self.postgres_password}"
f"@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}"
)
@property
def database_url_sync(self) -> str:
return (
f"postgresql://{self.postgres_user}:{self.postgres_password}"
f"@{self.postgres_host}:{self.postgres_port}/{self.postgres_db}"
)
def get_settings() -> Settings:
return Settings()
Step 4: Implement DB engine
# src/ft/core/db.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from ft.core.config import Settings
def create_engine(settings: Settings):
return create_async_engine(
settings.database_url,
echo=False,
pool_size=10,
max_overflow=20,
)
def create_session_factory(engine) -> async_sessionmaker[AsyncSession]:
return async_sessionmaker(engine, expire_on_commit=False)
Step 5: Create .env.example
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DB=ftsupply
POSTGRES_USER=ft
POSTGRES_PASSWORD=ft
REDIS_URL=redis://localhost:6379/0
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
Step 6: Run tests
uv run pytest tests/test_config.py -v
Expected: PASS
Step 7: Commit
git add src/ft/core/config.py src/ft/core/db.py tests/test_config.py .env.example
git commit -m "feat: core config (Pydantic Settings) + async DB engine"
Task 3: Model Base + uuidv5 ID Strategy¶
Files:
- Create: src/ft/core/models/__init__.py
- Create: src/ft/core/models/base.py
- Test: tests/test_models_base.py
Step 1: Write test for uuidv5 ID generation
# tests/test_models_base.py
import uuid
from ft.core.models.base import make_id, FT_NAMESPACE
def test_make_id_deterministic():
"""Same inputs always produce same ID."""
id1 = make_id("ton", "EQCollection", "42")
id2 = make_id("ton", "EQCollection", "42")
assert id1 == id2
assert isinstance(id1, uuid.UUID)
def test_make_id_different_inputs():
"""Different inputs produce different IDs."""
id1 = make_id("ton", "EQCollection", "42")
id2 = make_id("ton", "EQCollection", "43")
assert id1 != id2
def test_make_id_is_uuid5():
"""IDs are uuidv5."""
id1 = make_id("polygon", "0xabc", "0")
assert id1.version == 5
def test_namespace_is_stable():
"""Namespace doesn't change between imports."""
expected = uuid.uuid5(uuid.NAMESPACE_URL, "ft.supply")
assert FT_NAMESPACE == expected
Step 2: Run test to verify it fails
uv run pytest tests/test_models_base.py -v
Step 3: Implement base models
# src/ft/core/models/base.py
import uuid
from datetime import datetime
from sqlalchemy import MetaData, text
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
FT_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_URL, "ft.supply")
def make_id(*parts: str) -> uuid.UUID:
"""Deterministic uuidv5 from natural key parts. No FK race conditions."""
return uuid.uuid5(FT_NAMESPACE, ":".join(parts))
# Convention: each schema gets its own MetaData for clean separation
SCHEMA_METADATA = {
"ref": MetaData(schema="ref"),
"ingest": MetaData(schema="ingest"),
"ledger": MetaData(schema="ledger"),
"catalog": MetaData(schema="catalog"),
"market": MetaData(schema="market"),
"projection": MetaData(schema="projection"),
"system": MetaData(schema="system"),
}
class Base(DeclarativeBase):
"""Base for all ORM models. No default schema — each model declares its own."""
pass
class TimestampMixin:
"""created_at / updated_at for tables that need it."""
created_at: Mapped[datetime] = mapped_column(server_default=text("now()"))
updated_at: Mapped[datetime] = mapped_column(server_default=text("now()"), onupdate=datetime.utcnow)
# src/ft/core/models/__init__.py
from ft.core.models.base import Base, make_id, FT_NAMESPACE, TimestampMixin
__all__ = ["Base", "make_id", "FT_NAMESPACE", "TimestampMixin"]
Step 4: Run tests
uv run pytest tests/test_models_base.py -v
Expected: PASS
Step 5: Commit
git add src/ft/core/models/ tests/test_models_base.py
git commit -m "feat: DeclarativeBase + uuidv5 ID strategy"
Task 4: Domain Models — ref, ingest, system¶
Files:
- Create: src/ft/core/models/ref.py
- Create: src/ft/core/models/ingest.py
- Create: src/ft/core/models/system.py
- Create: src/ft/core/models/enums.py
Step 1: Create shared enums
# src/ft/core/models/enums.py
from enum import StrEnum
class FinalityStatus(StrEnum):
PENDING = "pending"
CONFIRMED = "confirmed"
FINALIZED = "finalized"
class EventKind(StrEnum):
TRANSFER = "transfer"
MINT = "mint"
BURN = "burn"
LIST = "list"
SALE = "sale"
CANCEL = "cancel"
Step 2: Implement ref models
# src/ft/core/models/ref.py
import uuid
from sqlalchemy import String, ForeignKey, Integer
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base, TimestampMixin
class Chain(Base, TimestampMixin):
__tablename__ = "chains"
__table_args__ = {"schema": "ref"}
chain_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
slug: Mapped[str] = mapped_column(String(32), unique=True) # "evm", "ton", "solana"
display_name: Mapped[str] = mapped_column(String(64))
class Network(Base, TimestampMixin):
__tablename__ = "networks"
__table_args__ = {"schema": "ref"}
network_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
chain_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.chains.chain_id"))
slug: Mapped[str] = mapped_column(String(64), unique=True) # "polygon", "ton-mainnet"
display_name: Mapped[str] = mapped_column(String(64))
rpc_url: Mapped[str] = mapped_column(String(512))
finality_depth: Mapped[int] = mapped_column(Integer, default=128)
is_active: Mapped[bool] = mapped_column(default=True)
class TokenStandard(Base):
__tablename__ = "token_standards"
__table_args__ = {"schema": "ref"}
standard_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
chain_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.chains.chain_id"))
slug: Mapped[str] = mapped_column(String(32), unique=True) # "ERC-721", "ERC-1155", "TEP-62"
display_name: Mapped[str] = mapped_column(String(64))
class Marketplace(Base, TimestampMixin):
__tablename__ = "marketplaces"
__table_args__ = {"schema": "ref"}
marketplace_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
slug: Mapped[str] = mapped_column(String(64), unique=True) # "opensea", "getgems"
display_name: Mapped[str] = mapped_column(String(64))
network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
Step 3: Implement ingest models
# src/ft/core/models/ingest.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, ForeignKey, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base, TimestampMixin
from ft.core.models.enums import FinalityStatus
class RawEvent(Base):
__tablename__ = "raw_events"
__table_args__ = (
Index("ix_raw_events_network_block", "network_id", "block_number"),
{"schema": "ingest"},
)
raw_event_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
block_number: Mapped[int] = mapped_column(BigInteger)
tx_hash: Mapped[str] = mapped_column(String(128))
log_index: Mapped[int] = mapped_column(Integer)
source_event_id: Mapped[str] = mapped_column(String(256))
raw_payload: Mapped[dict] = mapped_column(JSONB)
finality_status: Mapped[str] = mapped_column(String(16), default=FinalityStatus.PENDING)
received_at: Mapped[datetime] = mapped_column(server_default="now()")
Step 4: Implement system models
# src/ft/core/models/system.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, ForeignKey, Index, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base
class SyncCursor(Base):
__tablename__ = "sync_cursors"
__table_args__ = {"schema": "system"}
network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"), primary_key=True)
last_block: Mapped[int] = mapped_column(BigInteger, default=0)
last_block_hash: Mapped[str | None] = mapped_column(String(128))
last_updated_at: Mapped[datetime] = mapped_column(server_default="now()")
class OutboxEvent(Base):
__tablename__ = "outbox_events"
__table_args__ = (
Index("ix_outbox_unpublished", "published_at", postgresql_where="published_at IS NULL"),
{"schema": "system"},
)
event_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
topic: Mapped[str] = mapped_column(String(128))
payload: Mapped[dict] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column(server_default="now()")
published_at: Mapped[datetime | None] = mapped_column(default=None)
class DeadLetterItem(Base):
__tablename__ = "dlq"
__table_args__ = {"schema": "system"}
dlq_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
event_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("system.outbox_events.event_id"))
error: Mapped[str] = mapped_column(Text)
attempts: Mapped[int] = mapped_column(Integer, default=0)
last_attempt_at: Mapped[datetime] = mapped_column(server_default="now()")
class ApiKey(Base):
__tablename__ = "api_keys"
__table_args__ = {"schema": "system"}
key_id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4)
key_hash: Mapped[str] = mapped_column(String(128), unique=True)
label: Mapped[str] = mapped_column(String(128))
is_active: Mapped[bool] = mapped_column(default=True)
created_at: Mapped[datetime] = mapped_column(server_default="now()")
Step 5: Verify models compile
uv run python -c "from ft.core.models import ref, ingest, system; print('ok')"
Step 6: Commit
git add src/ft/core/models/
git commit -m "feat: domain models — ref, ingest, system schemas"
Task 5: Domain Models — ledger, catalog, market, projection¶
Files:
- Create: src/ft/core/models/ledger.py
- Create: src/ft/core/models/catalog.py
- Create: src/ft/core/models/market.py
- Create: src/ft/core/models/projection.py
Step 1: Implement ledger models (dedup + idempotency core)
# src/ft/core/models/ledger.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, ForeignKey, Index, Boolean, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base
class NormalizedEventKey(Base):
"""Dedup table. UPSERT on (source_event_id, sub_index).
Handles reorg re-inclusion: is_reverted flipped back to false."""
__tablename__ = "normalized_event_keys"
__table_args__ = (
UniqueConstraint("source_event_id", "sub_index", name="uq_event_key"),
{"schema": "ledger"},
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
source_event_id: Mapped[str] = mapped_column(String(256))
sub_index: Mapped[int] = mapped_column(Integer, default=0)
normalized_event_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ledger.normalized_events.normalized_event_id"))
is_reverted: Mapped[bool] = mapped_column(Boolean, default=False)
class NormalizedEvent(Base):
__tablename__ = "normalized_events"
__table_args__ = (
Index("ix_norm_events_asset", "asset_id"),
Index("ix_norm_events_block", "network_id", "block_number"),
{"schema": "ledger"},
)
normalized_event_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
asset_id: Mapped[uuid.UUID] = mapped_column()
collection_id: Mapped[uuid.UUID | None] = mapped_column()
event_kind: Mapped[str] = mapped_column(String(16)) # EventKind
contract_address: Mapped[str] = mapped_column(String(256))
block_number: Mapped[int] = mapped_column(BigInteger)
tx_hash: Mapped[str] = mapped_column(String(128))
timestamp: Mapped[int] = mapped_column(BigInteger) # unix epoch
finality_status: Mapped[str] = mapped_column(String(16))
created_at: Mapped[datetime] = mapped_column(server_default="now()")
class NormalizedEventDelta(Base):
__tablename__ = "normalized_event_deltas"
__table_args__ = (
Index("ix_norm_deltas_event", "normalized_event_id"),
{"schema": "ledger"},
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
normalized_event_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ledger.normalized_events.normalized_event_id"))
account_address: Mapped[str] = mapped_column(String(256))
qty_delta: Mapped[int] = mapped_column(Integer) # +1 for receive, -1 for send
class AppliedEvent(Base):
"""Idempotency gate. INSERT ON CONFLICT DO NOTHING before every state mutation."""
__tablename__ = "applied_events"
__table_args__ = {"schema": "ledger"}
consumer_id: Mapped[str] = mapped_column(String(128), primary_key=True)
normalized_event_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey("ledger.normalized_events.normalized_event_id"),
primary_key=True,
)
applied_at: Mapped[datetime] = mapped_column(server_default="now()")
Step 2: Implement catalog models
# src/ft/core/models/catalog.py
import uuid
from datetime import datetime
from sqlalchemy import String, Boolean, ForeignKey, Index
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base, TimestampMixin
class Collection(Base, TimestampMixin):
__tablename__ = "collections"
__table_args__ = (
Index("ix_collections_network", "network_id"),
{"schema": "catalog"},
)
collection_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
network_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("ref.networks.network_id"))
contract_address: Mapped[str] = mapped_column(String(256))
name: Mapped[str | None] = mapped_column(String(256))
is_stub: Mapped[bool] = mapped_column(Boolean, default=True)
class Asset(Base, TimestampMixin):
__tablename__ = "assets"
__table_args__ = (
Index("ix_assets_collection", "collection_id"),
{"schema": "catalog"},
)
asset_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
collection_id: Mapped[uuid.UUID] = mapped_column(ForeignKey("catalog.collections.collection_id"))
token_id: Mapped[str] = mapped_column(String(256))
name: Mapped[str | None] = mapped_column(String(256))
image_url: Mapped[str | None] = mapped_column(String(1024))
metadata_json: Mapped[dict | None] = mapped_column(JSONB)
is_stub: Mapped[bool] = mapped_column(Boolean, default=True)
Step 3: Implement market models
# src/ft/core/models/market.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, Numeric, ForeignKey, Index, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base
class OwnershipCurrent(Base):
"""UPSERT qty += delta. DELETE WHERE qty <= 0."""
__tablename__ = "ownership_current"
__table_args__ = (
UniqueConstraint("asset_id", "account_address", name="uq_ownership"),
Index("ix_ownership_account", "account_address"),
{"schema": "market"},
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
asset_id: Mapped[uuid.UUID] = mapped_column()
account_address: Mapped[str] = mapped_column(String(256))
qty: Mapped[int] = mapped_column(Integer, default=0)
finality_status: Mapped[str] = mapped_column(String(16))
updated_at: Mapped[datetime] = mapped_column(server_default="now()")
class ListingCurrent(Base):
__tablename__ = "listings_current"
__table_args__ = (
Index("ix_listings_asset", "asset_id"),
{"schema": "market"},
)
listing_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
asset_id: Mapped[uuid.UUID] = mapped_column()
marketplace_slug: Mapped[str] = mapped_column(String(64))
seller: Mapped[str] = mapped_column(String(256))
price: Mapped[float] = mapped_column(Numeric(38, 18))
currency: Mapped[str] = mapped_column(String(32))
listed_at: Mapped[datetime] = mapped_column()
class SaleHistory(Base):
__tablename__ = "sales_history"
__table_args__ = (
Index("ix_sales_asset", "asset_id"),
Index("ix_sales_time", "sold_at"),
{"schema": "market"},
)
sale_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
asset_id: Mapped[uuid.UUID] = mapped_column()
seller: Mapped[str] = mapped_column(String(256))
buyer: Mapped[str] = mapped_column(String(256))
price: Mapped[float] = mapped_column(Numeric(38, 18))
currency: Mapped[str] = mapped_column(String(32))
sold_at: Mapped[datetime] = mapped_column()
Step 4: Implement projection models
# src/ft/core/models/projection.py
import uuid
from datetime import datetime
from sqlalchemy import String, Integer, BigInteger, Numeric, Boolean, Index
from sqlalchemy.orm import Mapped, mapped_column
from ft.core.models.base import Base
class OwnershipView(Base):
__tablename__ = "ownership_view"
__table_args__ = (
Index("ix_ownview_asset", "asset_id"),
Index("ix_ownview_owner", "owner_address"),
{"schema": "projection"},
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
asset_id: Mapped[uuid.UUID] = mapped_column()
owner_address: Mapped[str] = mapped_column(String(256))
qty: Mapped[int] = mapped_column(Integer)
finality_status: Mapped[str] = mapped_column(String(16))
updated_at: Mapped[datetime] = mapped_column(server_default="now()")
class AssetCard(Base):
__tablename__ = "asset_cards"
__table_args__ = (
Index("ix_assetcards_collection", "collection_id"),
{"schema": "projection"},
)
asset_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
collection_id: Mapped[uuid.UUID] = mapped_column()
name: Mapped[str | None] = mapped_column(String(256))
image_url: Mapped[str | None] = mapped_column(String(1024))
is_stub: Mapped[bool] = mapped_column(Boolean, default=True)
floor_price: Mapped[float | None] = mapped_column(Numeric(38, 18))
last_sale_price: Mapped[float | None] = mapped_column(Numeric(38, 18))
finality_status: Mapped[str] = mapped_column(String(16))
updated_at: Mapped[datetime] = mapped_column(server_default="now()")
class CollectionStats(Base):
__tablename__ = "collection_stats"
__table_args__ = {"schema": "projection"}
collection_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
floor_price: Mapped[float | None] = mapped_column(Numeric(38, 18))
total_volume: Mapped[float | None] = mapped_column(Numeric(38, 18))
owner_count: Mapped[int] = mapped_column(Integer, default=0)
asset_count: Mapped[int] = mapped_column(Integer, default=0)
listed_count: Mapped[int] = mapped_column(Integer, default=0)
updated_at: Mapped[datetime] = mapped_column(server_default="now()")
class PortfolioAsset(Base):
__tablename__ = "portfolio_assets"
__table_args__ = (
Index("ix_portfolio_owner", "owner_address"),
{"schema": "projection"},
)
id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
owner_address: Mapped[str] = mapped_column(String(256))
asset_id: Mapped[uuid.UUID] = mapped_column()
qty: Mapped[int] = mapped_column(Integer)
estimated_value_usd: Mapped[float | None] = mapped_column(Numeric(38, 18))
updated_at: Mapped[datetime] = mapped_column(server_default="now()")
class ListingCard(Base):
__tablename__ = "listing_cards"
__table_args__ = (
Index("ix_listcards_asset", "asset_id"),
{"schema": "projection"},
)
listing_id: Mapped[uuid.UUID] = mapped_column(primary_key=True)
asset_id: Mapped[uuid.UUID] = mapped_column()
collection_id: Mapped[uuid.UUID] = mapped_column()
seller: Mapped[str] = mapped_column(String(256))
price: Mapped[float] = mapped_column(Numeric(38, 18))
currency: Mapped[str] = mapped_column(String(32))
asset_name: Mapped[str | None] = mapped_column(String(256))
asset_image_url: Mapped[str | None] = mapped_column(String(1024))
listed_at: Mapped[datetime] = mapped_column()
Step 5: Verify all models compile and no circular imports
uv run python -c "
from ft.core.models import base, ref, ingest, ledger, catalog, market, projection, system
print('All models imported successfully')
print(f'Tables: {len(base.Base.metadata.tables)}')
"
Step 6: Commit
git add src/ft/core/models/
git commit -m "feat: domain models — ledger, catalog, market, projection schemas"
Task 6: Baseline SQL Migration¶
Files:
- Create: migrations/001_baseline.sql
Step 1: Write the baseline migration
Generate from models but hand-write SQL (project convention: raw SQL, additive only).
-- migrations/001_baseline.sql
-- Phase 1 baseline: 7 schemas, all tables, indexes, constraints
-- Additive only. All statements use IF NOT EXISTS.
BEGIN;
-- === Extensions ===
CREATE EXTENSION IF NOT EXISTS pgcrypto; -- gen_random_uuid() for non-deterministic IDs (api_keys, outbox, dlq)
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- uuid_generate_v5() for deterministic IDs in seed data
-- === Schemas ===
CREATE SCHEMA IF NOT EXISTS ref;
CREATE SCHEMA IF NOT EXISTS ingest;
CREATE SCHEMA IF NOT EXISTS ledger;
CREATE SCHEMA IF NOT EXISTS catalog;
CREATE SCHEMA IF NOT EXISTS market;
CREATE SCHEMA IF NOT EXISTS projection;
CREATE SCHEMA IF NOT EXISTS system;
-- === ref ===
CREATE TABLE IF NOT EXISTS ref.chains (
chain_id UUID PRIMARY KEY,
slug VARCHAR(32) UNIQUE NOT NULL,
display_name VARCHAR(64) NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS ref.networks (
network_id UUID PRIMARY KEY,
chain_id UUID REFERENCES ref.chains(chain_id),
slug VARCHAR(64) UNIQUE NOT NULL,
display_name VARCHAR(64) NOT NULL,
rpc_url VARCHAR(512) NOT NULL,
finality_depth INTEGER DEFAULT 128,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS ref.token_standards (
standard_id UUID PRIMARY KEY,
chain_id UUID REFERENCES ref.chains(chain_id),
slug VARCHAR(32) UNIQUE NOT NULL,
display_name VARCHAR(64) NOT NULL
);
CREATE TABLE IF NOT EXISTS ref.marketplaces (
marketplace_id UUID PRIMARY KEY,
slug VARCHAR(64) UNIQUE NOT NULL,
display_name VARCHAR(64) NOT NULL,
network_id UUID REFERENCES ref.networks(network_id),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- === ingest ===
CREATE TABLE IF NOT EXISTS ingest.raw_events (
raw_event_id UUID PRIMARY KEY,
network_id UUID REFERENCES ref.networks(network_id),
block_number BIGINT NOT NULL,
tx_hash VARCHAR(128) NOT NULL,
log_index INTEGER NOT NULL,
source_event_id VARCHAR(256) NOT NULL,
raw_payload JSONB,
finality_status VARCHAR(16) DEFAULT 'pending',
received_at TIMESTAMPTZ DEFAULT now()
);
CREATE UNIQUE INDEX IF NOT EXISTS uq_raw_events_network_source ON ingest.raw_events(network_id, source_event_id);
CREATE INDEX IF NOT EXISTS ix_raw_events_network_block ON ingest.raw_events(network_id, block_number);
-- === ledger ===
CREATE TABLE IF NOT EXISTS ledger.normalized_events (
normalized_event_id UUID PRIMARY KEY,
network_id UUID REFERENCES ref.networks(network_id),
asset_id UUID NOT NULL,
collection_id UUID,
event_kind VARCHAR(16) NOT NULL,
contract_address VARCHAR(256) NOT NULL,
block_number BIGINT NOT NULL,
tx_hash VARCHAR(128) NOT NULL,
timestamp BIGINT NOT NULL,
finality_status VARCHAR(16) NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_norm_events_asset ON ledger.normalized_events(asset_id);
CREATE INDEX IF NOT EXISTS ix_norm_events_block ON ledger.normalized_events(network_id, block_number);
CREATE TABLE IF NOT EXISTS ledger.normalized_event_keys (
id BIGSERIAL PRIMARY KEY,
source_event_id VARCHAR(256) NOT NULL,
sub_index INTEGER DEFAULT 0,
normalized_event_id UUID REFERENCES ledger.normalized_events(normalized_event_id),
is_reverted BOOLEAN DEFAULT false,
CONSTRAINT uq_event_key UNIQUE (source_event_id, sub_index)
);
CREATE TABLE IF NOT EXISTS ledger.normalized_event_deltas (
id BIGSERIAL PRIMARY KEY,
normalized_event_id UUID REFERENCES ledger.normalized_events(normalized_event_id),
account_address VARCHAR(256) NOT NULL,
qty_delta INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_norm_deltas_event ON ledger.normalized_event_deltas(normalized_event_id);
CREATE TABLE IF NOT EXISTS ledger.applied_events (
consumer_id VARCHAR(128) NOT NULL,
normalized_event_id UUID REFERENCES ledger.normalized_events(normalized_event_id),
applied_at TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (consumer_id, normalized_event_id)
);
-- === catalog ===
CREATE TABLE IF NOT EXISTS catalog.collections (
collection_id UUID PRIMARY KEY,
network_id UUID REFERENCES ref.networks(network_id),
contract_address VARCHAR(256) NOT NULL,
name VARCHAR(256),
is_stub BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_collections_network ON catalog.collections(network_id);
CREATE TABLE IF NOT EXISTS catalog.assets (
asset_id UUID PRIMARY KEY,
collection_id UUID REFERENCES catalog.collections(collection_id),
token_id VARCHAR(256) NOT NULL,
name VARCHAR(256),
image_url VARCHAR(1024),
metadata_json JSONB,
is_stub BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_assets_collection ON catalog.assets(collection_id);
-- === market ===
CREATE TABLE IF NOT EXISTS market.ownership_current (
id BIGSERIAL PRIMARY KEY,
asset_id UUID NOT NULL,
account_address VARCHAR(256) NOT NULL,
qty INTEGER DEFAULT 0,
finality_status VARCHAR(16) NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now(),
CONSTRAINT uq_ownership UNIQUE (asset_id, account_address)
);
CREATE INDEX IF NOT EXISTS ix_ownership_account ON market.ownership_current(account_address);
CREATE TABLE IF NOT EXISTS market.listings_current (
listing_id UUID PRIMARY KEY,
asset_id UUID NOT NULL,
marketplace_slug VARCHAR(64) NOT NULL,
seller VARCHAR(256) NOT NULL,
price NUMERIC(38,18) NOT NULL,
currency VARCHAR(32) NOT NULL,
listed_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_listings_asset ON market.listings_current(asset_id);
CREATE TABLE IF NOT EXISTS market.sales_history (
sale_id UUID PRIMARY KEY,
asset_id UUID NOT NULL,
seller VARCHAR(256) NOT NULL,
buyer VARCHAR(256) NOT NULL,
price NUMERIC(38,18) NOT NULL,
currency VARCHAR(32) NOT NULL,
sold_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_sales_asset ON market.sales_history(asset_id);
CREATE INDEX IF NOT EXISTS ix_sales_time ON market.sales_history(sold_at);
-- === projection ===
CREATE TABLE IF NOT EXISTS projection.ownership_view (
id BIGSERIAL PRIMARY KEY,
asset_id UUID NOT NULL,
owner_address VARCHAR(256) NOT NULL,
qty INTEGER NOT NULL,
finality_status VARCHAR(16) NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_ownview_asset ON projection.ownership_view(asset_id);
CREATE INDEX IF NOT EXISTS ix_ownview_owner ON projection.ownership_view(owner_address);
CREATE TABLE IF NOT EXISTS projection.asset_cards (
asset_id UUID PRIMARY KEY,
collection_id UUID NOT NULL,
name VARCHAR(256),
image_url VARCHAR(1024),
is_stub BOOLEAN DEFAULT true,
floor_price NUMERIC(38,18),
last_sale_price NUMERIC(38,18),
finality_status VARCHAR(16) NOT NULL,
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_assetcards_collection ON projection.asset_cards(collection_id);
CREATE TABLE IF NOT EXISTS projection.collection_stats (
collection_id UUID PRIMARY KEY,
floor_price NUMERIC(38,18),
total_volume NUMERIC(38,18),
owner_count INTEGER DEFAULT 0,
asset_count INTEGER DEFAULT 0,
listed_count INTEGER DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS projection.portfolio_assets (
id BIGSERIAL PRIMARY KEY,
owner_address VARCHAR(256) NOT NULL,
asset_id UUID NOT NULL,
qty INTEGER NOT NULL,
estimated_value_usd NUMERIC(38,18),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX IF NOT EXISTS ix_portfolio_owner ON projection.portfolio_assets(owner_address);
CREATE TABLE IF NOT EXISTS projection.listing_cards (
listing_id UUID PRIMARY KEY,
asset_id UUID NOT NULL,
collection_id UUID NOT NULL,
seller VARCHAR(256) NOT NULL,
price NUMERIC(38,18) NOT NULL,
currency VARCHAR(32) NOT NULL,
asset_name VARCHAR(256),
asset_image_url VARCHAR(1024),
listed_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX IF NOT EXISTS ix_listcards_asset ON projection.listing_cards(asset_id);
-- === system ===
CREATE TABLE IF NOT EXISTS system.sync_cursors (
network_id UUID REFERENCES ref.networks(network_id) PRIMARY KEY,
last_block BIGINT DEFAULT 0,
last_block_hash VARCHAR(128),
last_updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS system.outbox_events (
event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
topic VARCHAR(128) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX IF NOT EXISTS ix_outbox_unpublished ON system.outbox_events(published_at) WHERE published_at IS NULL;
CREATE TABLE IF NOT EXISTS system.dlq (
dlq_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_id UUID REFERENCES system.outbox_events(event_id),
error TEXT NOT NULL,
attempts INTEGER DEFAULT 0,
last_attempt_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE IF NOT EXISTS system.api_keys (
key_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key_hash VARCHAR(128) UNIQUE NOT NULL,
label VARCHAR(128) NOT NULL,
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMPTZ DEFAULT now()
);
COMMIT;
Step 2: Create seed data migration
-- migrations/002_seed_ref_data.sql
-- Seed reference data for Phase 1 (Polygon + TON)
--
-- IDs are uuidv5(NAMESPACE_URL:ft.supply, natural_key) — deterministic.
-- Generate with: python -c "import uuid; NS=uuid.uuid5(uuid.NAMESPACE_URL,'ft.supply'); print(uuid.uuid5(NS,'evm'))"
-- This matches make_id() from src/ft/core/models/base.py.
BEGIN;
-- Chains (make_id("evm"), make_id("ton"))
INSERT INTO ref.chains (chain_id, slug, display_name) VALUES
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'), 'evm', 'EVM'),
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton'), 'ton', 'TON')
ON CONFLICT (slug) DO NOTHING;
-- Networks (make_id("polygon"), make_id("ton-mainnet"))
INSERT INTO ref.networks (network_id, chain_id, slug, display_name, rpc_url, finality_depth) VALUES
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'polygon'),
uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'),
'polygon', 'Polygon', 'https://polygon-rpc.com', 128),
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton-mainnet'),
uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton'),
'ton-mainnet', 'TON Mainnet', 'https://tonapi.io/v2', 1)
ON CONFLICT (slug) DO NOTHING;
-- Token Standards (make_id("ERC-721"), etc.)
INSERT INTO ref.token_standards (standard_id, chain_id, slug, display_name) VALUES
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ERC-721'),
uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'), 'ERC-721', 'ERC-721'),
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ERC-1155'),
uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'evm'), 'ERC-1155', 'ERC-1155'),
(uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'TEP-62'),
uuid_generate_v5(uuid_generate_v5(uuid_ns_url(), 'ft.supply'), 'ton'), 'TEP-62', 'TEP-62 NFT')
ON CONFLICT (slug) DO NOTHING;
COMMIT;
Step 3: Commit
git add migrations/
git commit -m "feat: baseline SQL migration + ref seed data"
Task 7: Docker Compose + Migration Runner¶
Files:
- Create: docker/docker-compose.yml
- Create: docker/Dockerfile
- Create: scripts/migrate.sh
Step 1: Create dev docker-compose
# docker/docker-compose.yml
services:
postgres:
image: pgvector/pgvector:pg16
ports: ["5432:5432"]
environment:
POSTGRES_DB: ftsupply
POSTGRES_USER: ft
POSTGRES_PASSWORD: ft
volumes:
- pg_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ft -d ftsupply"]
interval: 5s
retries: 5
redis:
image: redis:7-alpine
ports: ["6379:6379"]
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
retries: 5
minio:
image: minio/minio
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- minio_data:/data
volumes:
pg_data:
minio_data:
Step 2: Create migration runner script
#!/usr/bin/env bash
# scripts/migrate.sh — Apply all SQL migrations in order
set -euo pipefail
DB_HOST="${POSTGRES_HOST:-localhost}"
DB_PORT="${POSTGRES_PORT:-5432}"
DB_NAME="${POSTGRES_DB:-ftsupply}"
DB_USER="${POSTGRES_USER:-ft}"
MIGRATIONS_DIR="$(cd "$(dirname "$0")/../migrations" && pwd)"
echo "Applying migrations from $MIGRATIONS_DIR to $DB_NAME@$DB_HOST:$DB_PORT"
for f in "$MIGRATIONS_DIR"/*.sql; do
echo " → $(basename "$f")"
PGPASSWORD="${POSTGRES_PASSWORD:-ft}" psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -f "$f"
done
echo "Done."
Step 3: Create Dockerfile
# docker/Dockerfile
FROM python:3.12-slim AS base
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc libpq-dev && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Install uv
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv
# Install dependencies
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
# Copy source
COPY src/ src/
COPY migrations/ migrations/
COPY workers/ workers/
# Default: run API
CMD ["uv", "run", "uvicorn", "ft.api.app:app", "--host", "0.0.0.0", "--port", "8000"]
Step 4: Verify docker-compose starts
cd /c/Users/rusla/Projects/ft.supply
docker compose -f docker/docker-compose.yml up -d
# Wait for healthy
docker compose -f docker/docker-compose.yml ps
Step 5: Apply migrations
chmod +x scripts/migrate.sh
bash scripts/migrate.sh
Expected: all migrations applied without errors.
Step 6: Commit
git add docker/ scripts/
git commit -m "feat: Docker Compose (postgres, redis, minio) + migration runner"
Task 8: Event Envelope + Address Canonicalization¶
Files:
- Create: src/ft/core/events.py
- Create: src/ft/core/address.py
- Test: tests/test_events.py
- Test: tests/test_address.py
Step 1: Write event envelope tests
# tests/test_events.py
import pytest
from pydantic import ValidationError
from ft.core.events import RawEvent, RawEventDelta, EventKind, FinalityStatus
def test_raw_event_valid():
event = RawEvent(
source_event_id="0xabc:42",
network_slug="polygon",
block_number=65000000,
tx_hash="0xabc123",
log_index=42,
timestamp=1700000000,
event_kind=EventKind.TRANSFER,
contract_address="0x1234567890abcdef",
deltas=[RawEventDelta(from_address="0xAAA", to_address="0xBBB", token_id="1", qty=1)],
finality_status=FinalityStatus.PENDING,
)
assert event.schema_version == 1
assert len(event.deltas) == 1
def test_raw_event_mint_no_from():
delta = RawEventDelta(from_address=None, to_address="0xBBB", token_id="1", qty=1)
assert delta.from_address is None
def test_raw_event_erc1155_batch():
deltas = [
RawEventDelta(from_address="0xA", to_address="0xB", token_id=str(i), qty=1)
for i in range(100)
]
event = RawEvent(
source_event_id="0xabc:0",
network_slug="polygon",
block_number=1,
tx_hash="0x",
log_index=0,
timestamp=0,
event_kind=EventKind.TRANSFER,
contract_address="0x",
deltas=deltas,
finality_status=FinalityStatus.PENDING,
)
assert len(event.deltas) == 100
def test_raw_event_serialization_roundtrip():
event = RawEvent(
source_event_id="0xabc:42",
network_slug="polygon",
block_number=1,
tx_hash="0x",
log_index=0,
timestamp=0,
event_kind=EventKind.TRANSFER,
contract_address="0x",
deltas=[],
finality_status=FinalityStatus.PENDING,
)
data = event.model_dump()
restored = RawEvent.model_validate(data)
assert restored == event
Step 2: Write address canonicalization tests
# tests/test_address.py
import pytest
from ft.core.address import canonicalize_address
def test_evm_lowercase():
assert canonicalize_address("evm", "0xAbCdEf") == "0xabcdef"
def test_ton_raw_form():
# bounceable → raw form conversion
result = canonicalize_address("ton", "0:abc123")
assert result == "0:abc123" # already raw
def test_solana_case_preserved():
addr = "7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU"
assert canonicalize_address("solana", addr) == addr
def test_unknown_chain_raises():
with pytest.raises(ValueError, match="Unknown chain"):
canonicalize_address("bitcoin", "1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa")
Step 3: Run tests — verify they fail
uv run pytest tests/test_events.py tests/test_address.py -v
Step 4: Implement events.py
# src/ft/core/events.py
from pydantic import BaseModel
from ft.core.models.enums import EventKind, FinalityStatus
__all__ = ["RawEvent", "RawEventDelta", "EventKind", "FinalityStatus"]
class RawEventDelta(BaseModel):
"""One ownership change within an event. ERC-1155 batch → multiple deltas."""
from_address: str | None = None
to_address: str | None = None
token_id: str
qty: int = 1
class RawEvent(BaseModel):
"""Frozen envelope between adapters and normalizer.
schema_version bumps = breaking change. Fields never removed."""
model_config = {"frozen": True}
schema_version: int = 1
source_event_id: str
network_slug: str
block_number: int
tx_hash: str
log_index: int
sub_index: int = 0 # 0 for single events, 0..N-1 for batch expansions
timestamp: int
event_kind: EventKind
contract_address: str
collection_id: str | None = None
deltas: list[RawEventDelta]
finality_status: FinalityStatus
raw_payload: dict | None = None
Step 5: Implement address.py
# src/ft/core/address.py
def canonicalize_address(chain: str, address: str) -> str:
"""Per-chain address canonicalization at ingestion time.
CRITICAL: Never apply universal lower(). Solana addresses are case-sensitive."""
match chain:
case "evm":
return address.lower()
case "ton":
return _ton_to_raw(address)
case "solana":
return address # case-sensitive, do NOT touch
case _:
raise ValueError(f"Unknown chain: {chain}")
def _ton_to_raw(address: str) -> str:
"""Convert TON address to raw form (0:hex).
If already raw form, return as-is."""
if address.startswith("0:") or address.startswith("-1:"):
return address.lower()
# TODO: implement bounceable → raw conversion using tvm_hash
# For now, return lowered (placeholder until TON adapter is built)
return address.lower()
Step 6: Run tests
uv run pytest tests/test_events.py tests/test_address.py -v
Expected: PASS
Step 7: Commit
git add src/ft/core/events.py src/ft/core/address.py tests/test_events.py tests/test_address.py
git commit -m "feat: frozen event envelope (RawEvent) + address canonicalization"
Task 9: Idempotency Library (applied_events)¶
Files:
- Create: src/ft/core/idempotency.py
- Test: tests/idempotency/test_applied_events.py
- Test: tests/conftest.py
Step 1: Create testcontainers conftest
# tests/conftest.py
import asyncio
import pytest
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import text
@pytest.fixture(scope="session")
def event_loop():
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
def postgres_url():
"""Start a real Postgres via testcontainers."""
from testcontainers.postgres import PostgresContainer
with PostgresContainer(
image="pgvector/pgvector:pg16",
username="ft",
password="ft",
dbname="ftsupply_test",
) as pg:
# Convert to asyncpg URL
url = pg.get_connection_url()
async_url = url.replace("psycopg2", "asyncpg", 1)
if "postgresql://" in async_url and "asyncpg" not in async_url:
async_url = async_url.replace("postgresql://", "postgresql+asyncpg://", 1)
yield async_url
@pytest.fixture(scope="session")
async def engine(postgres_url):
eng = create_async_engine(postgres_url)
yield eng
await eng.dispose()
@pytest.fixture(scope="session")
async def _apply_migrations(engine):
"""Apply all migrations once per session.
IMPORTANT: Do NOT use sql.split(";") — it breaks PL/pgSQL function bodies,
dollar-quoted strings, and comments containing semicolons.
Instead, execute each .sql file as a single statement via raw connection.
asyncpg supports multi-statement execution through connection.execute().
"""
import pathlib
migrations_dir = pathlib.Path(__file__).parent.parent / "migrations"
async with engine.begin() as conn:
for sql_file in sorted(migrations_dir.glob("*.sql")):
sql = sql_file.read_text()
# Execute via raw asyncpg connection to preserve multi-statement semantics.
raw = await conn.get_raw_connection()
await raw.driver_connection.execute(sql)
@pytest.fixture
async def db(engine, _apply_migrations) -> AsyncSession:
"""Fresh transaction per test, rolled back after."""
async with engine.connect() as conn:
trans = await conn.begin()
session = AsyncSession(bind=conn, expire_on_commit=False)
yield session
await session.close()
await trans.rollback()
Step 2: Write idempotency tests
# tests/idempotency/test_applied_events.py
import uuid
import asyncio
import pytest
from ft.core.idempotency import try_apply_event
from ft.core.models.base import make_id
# We need some normalized events in the DB for FK constraints.
# Helper to insert a minimal normalized_event row.
from sqlalchemy import text
async def _insert_normalized_event(db, event_id: uuid.UUID):
await db.execute(text("""
INSERT INTO ledger.normalized_events
(normalized_event_id, network_id, asset_id, event_kind, contract_address,
block_number, tx_hash, timestamp, finality_status)
VALUES (:eid, :nid, :aid, 'transfer', '0x', 1, '0x', 0, 'pending')
ON CONFLICT DO NOTHING
"""), {
"eid": event_id,
"nid": make_id("polygon"), # dummy
"aid": make_id("polygon", "0x", "1"),
})
await db.flush()
class TestIdempotency:
async def test_single_delivery(self, db):
"""First apply returns True."""
eid = make_id("test", "event", "1")
await _insert_normalized_event(db, eid)
result = await try_apply_event(db, "test_consumer", eid)
assert result is True
async def test_duplicate_delivery(self, db):
"""Second apply of same event returns False."""
eid = make_id("test", "event", "2")
await _insert_normalized_event(db, eid)
r1 = await try_apply_event(db, "test_consumer", eid)
r2 = await try_apply_event(db, "test_consumer", eid)
assert r1 is True
assert r2 is False
async def test_different_consumers_independent(self, db):
"""Different consumers can apply the same event."""
eid = make_id("test", "event", "3")
await _insert_normalized_event(db, eid)
r1 = await try_apply_event(db, "consumer_a", eid)
r2 = await try_apply_event(db, "consumer_b", eid)
assert r1 is True
assert r2 is True
Step 3: Run tests — verify they fail
uv run pytest tests/idempotency/ -v
Step 4: Implement idempotency library
# src/ft/core/idempotency.py
import uuid
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def try_apply_event(db: AsyncSession, consumer_id: str, event_id: uuid.UUID) -> bool:
"""Idempotency gate. Returns True if event was newly applied, False if already seen.
Uses INSERT ON CONFLICT DO NOTHING on ledger.applied_events.
Must be called BEFORE any state mutation. If returns False, skip the mutation.
"""
result = await db.execute(
text("""
INSERT INTO ledger.applied_events (consumer_id, normalized_event_id)
VALUES (:consumer_id, :event_id)
ON CONFLICT (consumer_id, normalized_event_id) DO NOTHING
RETURNING consumer_id
"""),
{"consumer_id": consumer_id, "event_id": event_id},
)
return result.rowcount > 0
Step 5: Run tests
uv run pytest tests/idempotency/ -v
Expected: PASS (requires Docker running for testcontainers)
Step 6: Commit
git add src/ft/core/idempotency.py tests/conftest.py tests/idempotency/
git commit -m "feat: applied_events idempotency library with testcontainers tests"
Task 10: Chain Adapter Base + Contract Test Suite¶
Files:
- Create: src/ft/ingest/base.py
- Create: tests/contract/test_adapter_contract.py
- Create: tests/contract/conftest.py
Step 1: Implement adapter ABC
# src/ft/ingest/base.py
from abc import ABC, abstractmethod
from ft.core.events import RawEvent
class ChainAdapter(ABC):
"""Abstract chain adapter. Every concrete adapter MUST pass all 5 contract tests."""
@abstractmethod
async def fetch_blocks(self, from_block: int, to_block: int) -> list[RawEvent]:
"""Fetch and parse events from inclusive block range.
Returns list of RawEvents with canonicalized addresses."""
@abstractmethod
async def get_latest_block(self) -> int:
"""Return current chain head block number."""
@abstractmethod
async def detect_reorg(self, block_number: int, expected_hash: str) -> bool:
"""Return True if block at block_number has a different hash than expected.
True = reorg detected."""
@abstractmethod
def canonicalize_address(self, address: str) -> str:
"""Chain-specific address canonicalization."""
@property
@abstractmethod
def network_slug(self) -> str:
"""Network identifier, e.g. 'polygon', 'ton-mainnet'."""
@property
@abstractmethod
def chain_slug(self) -> str:
"""Chain family, e.g. 'evm', 'ton'."""
Step 2: Write contract test suite (adapter-agnostic)
# tests/contract/test_adapter_contract.py
"""
Contract tests that EVERY ChainAdapter must pass.
5 tests from architecture spec §16.5.
Parametrized over all registered adapters.
"""
import pytest
from ft.core.events import RawEvent, EventKind
class AdapterContractSuite:
"""Mixin — inherit in adapter-specific test files and set `adapter` fixture."""
async def test_schema_validation(self, adapter, fixture_blocks):
"""All returned RawEvents pass Pydantic validation."""
for from_b, to_b in fixture_blocks:
events = await adapter.fetch_blocks(from_b, to_b)
for event in events:
assert isinstance(event, RawEvent)
# Re-validate to catch any mutation
RawEvent.model_validate(event.model_dump())
async def test_sub_index_completeness(self, adapter, batch_fixture_block):
"""For batch events: sub_index values are contiguous 0..N-1."""
from_b, to_b = batch_fixture_block
events = await adapter.fetch_blocks(from_b, to_b)
# Group events by source_event_id
by_source: dict[str, list[RawEvent]] = {}
for e in events:
by_source.setdefault(e.source_event_id, []).append(e)
for source_id, group in by_source.items():
if len(group) > 1:
# Batch — verify explicit sub_index values are contiguous 0..N-1
indices = sorted(e.sub_index for e in group)
expected = list(range(len(group)))
assert indices == expected, (
f"sub_index not contiguous for {source_id}: got {indices}, expected {expected}"
)
async def test_delta_conservation(self, adapter, fixture_blocks):
"""Delta conservation using signed qty_delta at normalized level.
Transfer: sum(qty_delta)==0. Mint: sum>0. Burn: sum<0.
Deltas use signed values: from_address gets -qty, to_address gets +qty."""
for from_b, to_b in fixture_blocks:
events = await adapter.fetch_blocks(from_b, to_b)
for event in events:
if not event.deltas:
continue # list/cancel events have no deltas
# Compute signed sum: from=-qty, to=+qty
signed_total = 0
for d in event.deltas:
if d.from_address:
signed_total -= d.qty
if d.to_address:
signed_total += d.qty
if event.event_kind == EventKind.TRANSFER:
assert signed_total == 0, (
f"Transfer must conserve: sum={signed_total}, event={event.source_event_id}"
)
elif event.event_kind == EventKind.MINT:
assert signed_total > 0, (
f"Mint must have positive net: sum={signed_total}, event={event.source_event_id}"
)
elif event.event_kind == EventKind.BURN:
assert signed_total < 0, (
f"Burn must have negative net: sum={signed_total}, event={event.source_event_id}"
)
async def test_source_event_id_stability(self, adapter, fixture_blocks):
"""Same block fetched twice produces identical source_event_ids."""
for from_b, to_b in fixture_blocks:
events_1 = await adapter.fetch_blocks(from_b, to_b)
events_2 = await adapter.fetch_blocks(from_b, to_b)
ids_1 = sorted(e.source_event_id for e in events_1)
ids_2 = sorted(e.source_event_id for e in events_2)
assert ids_1 == ids_2
async def test_empty_deltas_for_non_transfers(self, adapter, fixture_blocks):
"""List/cancel events have no ownership deltas."""
for from_b, to_b in fixture_blocks:
events = await adapter.fetch_blocks(from_b, to_b)
for event in events:
if event.event_kind in (EventKind.LIST, EventKind.CANCEL):
assert len(event.deltas) == 0, (
f"{event.event_kind} should have no deltas: {event}"
)
Step 3: Commit
git add src/ft/ingest/base.py tests/contract/
git commit -m "feat: ChainAdapter ABC + 5 contract test suite"
Task 11: Polygon Adapter¶
Files:
- Create: src/ft/ingest/polygon/__init__.py
- Create: src/ft/ingest/polygon/adapter.py
- Create: src/ft/ingest/polygon/parser.py
- Create: src/ft/ingest/polygon/abi/erc721.json
- Create: src/ft/ingest/polygon/abi/erc1155.json
- Test: tests/contract/test_polygon_adapter.py
- Create: tests/fixtures/polygon/ (fixture block data)
This task is implementation-heavy. The adapter must:
1. Connect to Polygon RPC via httpx (or web3.py)
2. Fetch logs for ERC-721 Transfer + ERC-1155 TransferSingle/TransferBatch events
3. Parse into RawEvent with canonicalized addresses
4. Handle ERC-1155 batch → multiple RawEvents with sub_index
5. Pass all 5 contract tests
Step 1: Create fixture data by fetching real Polygon blocks
Write a script scripts/fetch_polygon_fixtures.py that fetches specific blocks with known NFT transfers and saves them as JSON in tests/fixtures/polygon/.
Step 2: Implement parser (ERC-721/1155 log decoding)
# src/ft/ingest/polygon/parser.py
from ft.core.events import RawEvent, RawEventDelta, EventKind, FinalityStatus
# ERC-721 Transfer(address,address,uint256)
ERC721_TRANSFER_TOPIC = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
# ERC-1155 TransferSingle(address,address,address,uint256,uint256)
ERC1155_SINGLE_TOPIC = "0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62"
# ERC-1155 TransferBatch(address,address,address,uint256[],uint256[])
ERC1155_BATCH_TOPIC = "0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb"
ZERO_ADDRESS = "0x" + "0" * 40
def parse_evm_logs(logs: list[dict], network_slug: str, block_number: int, block_timestamp: int) -> list[RawEvent]:
"""Parse EVM event logs into RawEvents."""
events = []
for log in logs:
topics = log.get("topics", [])
if not topics:
continue
topic0 = topics[0]
if topic0 == ERC721_TRANSFER_TOPIC and len(topics) >= 4:
events.extend(_parse_erc721_transfer(log, network_slug, block_number, block_timestamp))
elif topic0 == ERC1155_SINGLE_TOPIC:
events.extend(_parse_erc1155_single(log, network_slug, block_number, block_timestamp))
elif topic0 == ERC1155_BATCH_TOPIC:
events.extend(_parse_erc1155_batch(log, network_slug, block_number, block_timestamp))
return events
def _parse_erc721_transfer(log: dict, network_slug: str, block_number: int, timestamp: int) -> list[RawEvent]:
topics = log["topics"]
from_addr = "0x" + topics[1][-40:]
to_addr = "0x" + topics[2][-40:]
token_id = str(int(topics[3], 16))
contract = log["address"].lower()
is_mint = from_addr == ZERO_ADDRESS
is_burn = to_addr == ZERO_ADDRESS
if is_mint:
event_kind = EventKind.MINT
elif is_burn:
event_kind = EventKind.BURN
else:
event_kind = EventKind.TRANSFER
deltas = []
if not is_mint:
deltas.append(RawEventDelta(from_address=from_addr.lower(), to_address=None, token_id=token_id, qty=1))
if not is_burn:
deltas.append(RawEventDelta(from_address=None, to_address=to_addr.lower(), token_id=token_id, qty=1))
return [RawEvent(
source_event_id=f"{log['transactionHash']}:{int(log['logIndex'], 16)}",
network_slug=network_slug,
block_number=block_number,
tx_hash=log["transactionHash"],
log_index=int(log["logIndex"], 16),
sub_index=0,
timestamp=timestamp,
event_kind=event_kind,
contract_address=contract,
deltas=deltas,
finality_status=FinalityStatus.PENDING,
raw_payload=log,
)]
# _parse_erc1155_single and _parse_erc1155_batch follow similar patterns
# with ABI decoding of data field for token IDs and quantities.
# For TransferBatch, emit one RawEvent per token class with explicit sub_index=0..N-1.
Step 3: Implement adapter
# src/ft/ingest/polygon/adapter.py
import httpx
from ft.ingest.base import ChainAdapter
from ft.ingest.polygon.parser import parse_evm_logs
from ft.core.events import RawEvent
class PolygonAdapter(ChainAdapter):
def __init__(self, rpc_url: str = "https://polygon-rpc.com"):
self.rpc_url = rpc_url
self._client = httpx.AsyncClient(timeout=30)
@property
def network_slug(self) -> str:
return "polygon"
@property
def chain_slug(self) -> str:
return "evm"
def canonicalize_address(self, address: str) -> str:
return address.lower()
async def get_latest_block(self) -> int:
resp = await self._rpc("eth_blockNumber", [])
return int(resp, 16)
async def detect_reorg(self, block_number: int, expected_hash: str) -> bool:
resp = await self._rpc("eth_getBlockByNumber", [hex(block_number), False])
return resp["hash"] != expected_hash
async def fetch_blocks(self, from_block: int, to_block: int) -> list[RawEvent]:
# Get block timestamps
timestamps = {}
for bn in range(from_block, to_block + 1):
block = await self._rpc("eth_getBlockByNumber", [hex(bn), False])
timestamps[bn] = int(block["timestamp"], 16)
# Get logs for NFT transfer topics
logs = await self._rpc("eth_getLogs", [{
"fromBlock": hex(from_block),
"toBlock": hex(to_block),
"topics": [[
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", # ERC-721
"0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62", # ERC-1155 Single
"0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb", # ERC-1155 Batch
]],
}])
events = []
for log in logs:
bn = int(log["blockNumber"], 16)
ts = timestamps.get(bn, 0)
events.extend(parse_evm_logs([log], self.network_slug, bn, ts))
return events
async def _rpc(self, method: str, params: list) -> dict:
resp = await self._client.post(self.rpc_url, json={
"jsonrpc": "2.0", "method": method, "params": params, "id": 1,
})
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"RPC error: {data['error']}")
return data["result"]
Step 4: Write adapter-specific contract tests
# tests/contract/test_polygon_adapter.py
import pytest
from tests.contract.test_adapter_contract import AdapterContractSuite
# For now, use fixture-based adapter (no real RPC calls in tests)
# TODO: create FixturePolygonAdapter that reads from tests/fixtures/polygon/
class TestPolygonContract(AdapterContractSuite):
@pytest.fixture
def adapter(self):
"""Must return a concrete adapter initialized with fixture data.
Example: return FixturePolygonAdapter("tests/fixtures/polygon/")
"""
raise NotImplementedError(
"Subclass must provide adapter fixture with real fixture data"
)
@pytest.fixture
def fixture_blocks(self):
return [(65000000, 65000000)]
@pytest.fixture
def batch_fixture_block(self):
return (65000000, 65000000)
Step 5: Commit
git add src/ft/ingest/polygon/ tests/contract/test_polygon_adapter.py
git commit -m "feat: Polygon adapter — ERC-721/1155 parser + RPC client"
Task 12: TON Adapter¶
Files:
- Create: src/ft/ingest/ton/__init__.py
- Create: src/ft/ingest/ton/adapter.py
- Create: src/ft/ingest/ton/parser.py
- Test: tests/contract/test_ton_adapter.py
Follows same pattern as Task 11 but for TonAPI v2:
- Masterchain seqno as cursor
- TEP-62 NFT events (transfer_notification, ownership_assigned)
- Raw form addresses (0:hex)
- GetGems marketplace events
Step 1-5: Mirror Task 11 structure with TON-specific parsing.
Step 6: Commit
git add src/ft/ingest/ton/ tests/contract/test_ton_adapter.py
git commit -m "feat: TON adapter — TEP-62 NFT + GetGems parser"
Task 13: Block Fetcher + Sync Cursor¶
Files:
- Create: src/ft/ingest/fetcher.py
- Test: tests/test_fetcher.py
Step 1: Write fetcher tests
# tests/test_fetcher.py
import pytest
from unittest.mock import AsyncMock
from ft.ingest.fetcher import BlockFetcher
async def test_fetcher_advances_cursor(db):
adapter = AsyncMock()
adapter.get_latest_block.return_value = 10
adapter.fetch_blocks.return_value = []
adapter.detect_reorg.return_value = False
adapter.network_slug = "polygon"
fetcher = BlockFetcher(adapter, network_slug="polygon", batch_size=5)
# Simulate one iteration
await fetcher.step(db)
# Cursor should advance
assert fetcher.last_block > 0
Step 2: Implement fetcher
# src/ft/ingest/fetcher.py
import asyncio
import structlog
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ft.ingest.base import ChainAdapter
from ft.core.events import RawEvent
logger = structlog.get_logger()
class BlockFetcher:
def __init__(
self,
adapter: ChainAdapter,
network_slug: str,
batch_size: int = 100,
poll_interval: float = 2.0,
max_retries: int = 10,
):
self.adapter = adapter
self.network_slug = network_slug
self.batch_size = batch_size
self.poll_interval = poll_interval
self.max_retries = max_retries
self.last_block: int = 0
self.last_hash: str | None = None
self._consecutive_errors = 0
async def load_cursor(self, db: AsyncSession) -> None:
row = await db.execute(text(
"SELECT last_block, last_block_hash FROM system.sync_cursors WHERE network_id = ("
" SELECT network_id FROM ref.networks WHERE slug = :slug"
")"
), {"slug": self.network_slug})
row = row.first()
if row:
self.last_block = row.last_block
self.last_hash = row.last_block_hash
async def save_cursor(self, db: AsyncSession, block: int, block_hash: str | None = None) -> None:
await db.execute(text("""
INSERT INTO system.sync_cursors (network_id, last_block, last_block_hash, last_updated_at)
SELECT network_id, :block, :hash, now() FROM ref.networks WHERE slug = :slug
ON CONFLICT (network_id)
DO UPDATE SET last_block = :block, last_block_hash = :hash, last_updated_at = now()
"""), {"slug": self.network_slug, "block": block, "hash": block_hash})
await db.commit()
self.last_block = block
self.last_hash = block_hash
async def step(self, db: AsyncSession) -> list[RawEvent]:
"""One fetch iteration. Returns events or empty list."""
head = await self.adapter.get_latest_block()
if self.last_block >= head:
return []
to_block = min(self.last_block + self.batch_size, head)
# Reorg check
if self.last_block > 0 and self.last_hash:
is_reorg = await self.adapter.detect_reorg(self.last_block, self.last_hash)
if is_reorg:
logger.warning("reorg_detected", block=self.last_block, network=self.network_slug)
return [] # reorg handler will take over
events = await self.adapter.fetch_blocks(self.last_block + 1, to_block)
await self.save_cursor(db, to_block)
self._consecutive_errors = 0
return events
Step 3: Run tests, commit
uv run pytest tests/test_fetcher.py -v
git add src/ft/ingest/fetcher.py tests/test_fetcher.py
git commit -m "feat: BlockFetcher with cursor persistence + reorg check"
Task 14: Normalizer Worker¶
Files:
- Create: src/ft/pipeline/normalizer.py
- Test: tests/test_normalizer.py
The normalizer:
1. Consumes RawEvent from Redis Stream
2. UPSERT normalized_event_keys on (source_event_id, sub_index) — dedup
3. Creates normalized_events + normalized_event_deltas
4. Creates catalog stubs (collection + asset) via uuidv5
5. Publishes to ledger.normalized stream via outbox
Step 1: Write normalizer tests against real Postgres
Test that: same event processed twice → exactly 1 normalized_event row, exactly 1 set of deltas (dedup works). On replay/reorg re-inclusion: metadata updated (is_reverted=false), but no duplicate normalized_event_deltas created.
Step 2: Implement normalizer
Key SQL pattern for dedup UPSERT:
# src/ft/pipeline/normalizer.py
async def normalize_event(db: AsyncSession, raw: RawEvent) -> uuid.UUID | None:
"""Process one RawEvent. Returns normalized_event_id or None if duplicate."""
normalized_id = make_id(raw.network_slug, raw.source_event_id)
asset_id = make_id(raw.network_slug, raw.contract_address, raw.deltas[0].token_id if raw.deltas else "")
collection_id = make_id(raw.network_slug, raw.contract_address)
# 1. Ensure catalog stubs exist
await _ensure_collection_stub(db, collection_id, raw)
await _ensure_asset_stub(db, asset_id, collection_id, raw)
# 2. Insert normalized event
await db.execute(text("""
INSERT INTO ledger.normalized_events
(normalized_event_id, network_id, asset_id, collection_id, event_kind,
contract_address, block_number, tx_hash, timestamp, finality_status)
SELECT :eid, n.network_id, :aid, :cid, :kind, :contract, :block, :tx, :ts, :fin
FROM ref.networks n WHERE n.slug = :net
ON CONFLICT (normalized_event_id) DO UPDATE
SET finality_status = EXCLUDED.finality_status
"""), {...})
# 3. UPSERT event key (dedup gate) — returns xmax=0 for new rows
# On replay/reorg re-inclusion: updates is_reverted=false, but does NOT re-insert deltas.
is_first_seen = False
for i, delta in enumerate(raw.deltas):
result = await db.execute(text("""
INSERT INTO ledger.normalized_event_keys (source_event_id, sub_index, normalized_event_id, is_reverted)
VALUES (:seid, :sub, :eid, false)
ON CONFLICT ON CONSTRAINT uq_event_key
DO UPDATE SET is_reverted = false, normalized_event_id = EXCLUDED.normalized_event_id
RETURNING (xmax = 0) AS inserted
"""), {"seid": raw.source_event_id, "sub": i, "eid": normalized_id})
row = result.fetchone()
if row and row.inserted:
is_first_seen = True
# 4. Insert deltas ONLY on first-seen events — never duplicate on replay
# Replay/reorg re-inclusion only flips is_reverted back to false (step 3).
if is_first_seen:
for delta in raw.deltas:
if delta.from_address:
await db.execute(text("""
INSERT INTO ledger.normalized_event_deltas (normalized_event_id, account_address, qty_delta)
VALUES (:eid, :addr, :delta)
"""), {"eid": normalized_id, "addr": delta.from_address, "delta": -delta.qty})
if delta.to_address:
await db.execute(text("""
INSERT INTO ledger.normalized_event_deltas (normalized_event_id, account_address, qty_delta)
VALUES (:eid, :addr, :delta)
"""), {"eid": normalized_id, "addr": delta.to_address, "delta": delta.qty})
# 5. Publish via outbox
await _publish_outbox(db, "ledger.normalized", normalized_id)
await db.commit()
return normalized_id
Step 3: Run tests, commit
git add src/ft/pipeline/normalizer.py tests/test_normalizer.py
git commit -m "feat: normalizer — dedup UPSERT + catalog stubs + outbox publish"
Task 15: State Updater — Ownership¶
Files:
- Create: src/ft/pipeline/state_updater.py
- Test: tests/test_state_updater.py
Core pattern: consume from ledger.normalized → check applied_events → UPSERT ownership_current qty += delta → DELETE WHERE qty <= 0.
Step 1: Write tests — transfer A→B, verify balances. Double delivery, verify no double-count.
Step 2: Implement — try_apply_event() guard + UPSERT ownership + outbox publish.
Step 3: Commit
Task 16: State Updater — Listings & Sales¶
Files:
- Modify: src/ft/pipeline/state_updater.py (add listing/sale handlers)
- Test: tests/test_state_updater.py (add listing/sale test cases)
Handle LIST → INSERT listings_current, SALE → INSERT sales_history + DELETE listing, CANCEL → DELETE listing.
Step 1-3: Tests, implement, commit.
Task 17: Projection Pipeline¶
Files:
- Create: src/ft/pipeline/projections.py
- Test: tests/test_projections.py
- Test: tests/reconciliation/test_ownership_reconciliation.py
Consume domain events → UPSERT projection tables (all using applied_events idempotency).
Step 1: Write reconciliation test
# tests/reconciliation/test_ownership_reconciliation.py
async def test_ownership_view_matches_ground_truth(db):
"""projection.ownership_view == recomputed from ledger deltas."""
# Process N events through full pipeline
# Compare projection.ownership_view vs:
# SELECT asset_id, account_address, SUM(qty_delta)
# FROM ledger.normalized_event_deltas d
# JOIN ledger.normalized_event_keys k ON d.normalized_event_id = k.normalized_event_id
# WHERE NOT k.is_reverted
# GROUP BY asset_id, account_address
# HAVING SUM(qty_delta) > 0
# Assert: 0 mismatches
Step 2: Implement projections, commit
Task 18: Reorg Handler¶
Files:
- Create: src/ft/pipeline/reorg.py
- Test: tests/reorg/test_reorg_scenarios.py
All 5 scenarios from architecture spec. Tests against real Postgres.
Step 1: Write all 5 reorg scenario tests (see design doc section 6 for test bodies)
Step 2: Implement reorg handler
# src/ft/pipeline/reorg.py
async def handle_reorg(db: AsyncSession, network_slug: str, from_block: int, to_block: int):
"""Mark events in reverted blocks as is_reverted=true, recompute ownership."""
# 1. Find affected normalized_event_ids
# 2. SET is_reverted = true on normalized_event_keys
# 3. Find affected asset_ids
# 4. For each asset: recompute ownership_current from non-reverted deltas
# 5. Update projections for affected assets
Step 3: Run reorg tests, commit
Task 19: workerlib v2 Framework¶
Files:
- Create: src/ft/worker/framework.py
- Create: src/ft/worker/redis_streams.py
- Create: src/ft/worker/__main__.py
- Test: tests/test_worker_framework.py
Step 1: Implement Redis Streams consumer/producer
Step 2: Implement @task decorator, Trigger enum, DI
Step 3: Implement __main__.py — entry point python -m ft.worker <name>
Step 4: Test, commit
Task 20: Outbox Publisher Worker¶
Files:
- Create: src/ft/worker/outbox.py
- Test: tests/test_outbox.py
Polls system.outbox_events WHERE published_at IS NULL, publishes to Redis Streams, marks published. After 5 failures → DLQ.
Step 1-3: Test, implement, commit.
Task 21: API Skeleton + Auth + Health¶
Files:
- Create: src/ft/api/app.py
- Create: src/ft/api/deps.py
- Create: src/ft/api/auth.py
- Create: src/ft/api/routers/__init__.py
- Create: src/ft/api/routers/health.py
- Test: tests/api/test_health.py
Step 1: Write health endpoint test
# tests/api/test_health.py
from httpx import AsyncClient, ASGITransport
from ft.api.app import app
async def test_health():
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/health")
assert resp.status_code == 200
assert resp.json()["status"] == "ok"
Step 2: Implement app.py, deps.py, health router
Step 3: Test, commit
Task 22: API Endpoints — Assets + Collections¶
Files:
- Create: src/ft/api/routers/assets.py
- Create: src/ft/api/routers/collections.py
- Create: src/ft/api/schemas/assets.py
- Create: src/ft/api/schemas/collections.py
- Test: tests/api/test_assets.py
- Test: tests/api/test_collections.py
- Test: tests/api/test_domain_isolation.py
Step 1: Write domain isolation test
# tests/api/test_domain_isolation.py
async def test_api_reads_only_projection_schema(db):
"""All API SQL queries touch ONLY projection.* tables."""
# Intercept SQL statements during API request
# Assert: every FROM/JOIN references projection.* only
Step 2: Implement endpoints, schemas
Step 3: Test, commit
Task 23: Observability — Metrics + Logging¶
Files:
- Create: src/ft/observability/metrics.py
- Create: src/ft/observability/logging.py
Step 1: Implement Prometheus metrics (ingest_lag, normalizer_lag, dlq_depth, etc.)
Step 2: Implement structlog configuration (JSON to stdout)
Step 3: Add metrics endpoint to API (/metrics)
Step 4: Commit
Task 24: E2E Test + CI Pipeline¶
Files:
- Create: tests/e2e/test_pipeline.py
- Create: .github/workflows/ci.yml
Step 1: Write E2E test
# tests/e2e/test_pipeline.py
async def test_chain_event_to_api_response(db):
"""Full pipeline: RawEvent → normalize → state update → projection → API query."""
# 1. Create a RawEvent (mint)
# 2. Run normalizer
# 3. Run state updater
# 4. Run projection pipeline
# 5. Query projection.asset_cards
# 6. Assert correct data + finality_status
Step 2: Create CI workflow
# .github/workflows/ci.yml
name: CI
on: [push, pull_request]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/setup-uv@v4
- run: uv sync
- run: uv run ruff check src/ tests/
- run: uv run ruff format --check src/ tests/
- run: uv run pytest tests/ -v --tb=short
Step 3: Commit
git add tests/e2e/ .github/
git commit -m "feat: E2E pipeline test + GitHub Actions CI"
Task 25: Docker Production Setup¶
Files:
- Finalize: docker/Dockerfile
- Create: docker/docker-compose.prod.yml
- Update: docker/docker-compose.yml (add worker services)
Add all worker services to docker-compose with proper depends_on, health checks, and single image with different CMD.
Step 1: Finalize, commit
Task 26: Final Verification — All Exit Criteria¶
Step 1: Run full test suite
uv run pytest tests/ -v --tb=short
Expected results:
- tests/contract/ — 10 tests PASS (5 × 2 adapters)
- tests/idempotency/ — 3 tests PASS
- tests/reorg/ — 5 tests PASS
- tests/e2e/ — 1 test PASS
- tests/reconciliation/ — 1 test PASS
- tests/api/test_domain_isolation.py — 1 test PASS
Step 2: Run linter
uv run ruff check src/ tests/
uv run ruff format --check src/ tests/
Step 3: Verify docker-compose prod starts cleanly
docker compose -f docker/docker-compose.yml -f docker/docker-compose.prod.yml up -d
# Apply migrations
bash scripts/migrate.sh
# Check health
curl http://localhost:8000/health
Step 4: Update CLAUDE.md with build/test commands
Step 5: Final commit
git add .
git commit -m "feat: Phase 1 complete — all exit criteria passing"