Phase 1 Design — ft.supply¶
Approved: 2026-02-28
Decisions¶
| Aspect | Decision |
|---|---|
| Networks | Polygon (EVM) + TON in parallel |
| Package manager | uv + pyproject.toml |
| Python | 3.12+ |
| ORM | SQLAlchemy 2.0 Mapped[] (no dual model representation) |
| Migrations | Raw SQL, additive only (like gifts.supply) |
| Worker framework | workerlib v2 (custom, @task + DI + typed config + graceful shutdown) |
| Event bus | Redis Streams with outbox pattern |
| Testing | pytest + testcontainers (real Postgres, no mocks for DB) |
| API | FastAPI, reads ONLY from projection.* schema |
| Frontend | NOT in Phase 1 scope (internal admin panel moved to Phase 1.5) |
| Project structure | Flat monorepo, src/ft/* namespace package |
| Docker | Single multi-stage Dockerfile, different CMD per worker |
| CI | GitHub Actions: ruff lint/format + pytest |
| Observability | Prometheus metrics + structlog to stdout |
| DB schemas (Phase 1) | 7 of 10: ref, ingest, ledger, catalog, market, projection, system |
| IDs | uuidv5 from natural keys (deterministic, no FK race conditions) |
| Dedup | UPSERT on normalized_event_keys (source_event_id, sub_index) |
| Idempotency | applied_events table (INSERT ON CONFLICT DO NOTHING) |
| Reorg handling | is_reverted flag + recompute affected asset ownership |
Project Structure¶
ft.supply/
├── pyproject.toml
├── uv.lock
├── src/ft/
│ ├── core/ # models, events, config, idempotency, address
│ │ ├── config.py # Pydantic Settings
│ │ ├── db.py # async engine + session factory
│ │ ├── models/ # SQLAlchemy 2.0 Mapped[] per domain
│ │ │ ├── base.py # DeclarativeBase, uuidv5 mixin
│ │ │ ├── ref.py # chains, networks, token_standards
│ │ │ ├── ingest.py # raw_events, sync_cursors
│ │ │ ├── ledger.py # normalized_events, event_keys, applied_events
│ │ │ ├── catalog.py # assets, metadata_versions
│ │ │ ├── market.py # listings_current, sales_history, ownership_current
│ │ │ ├── projection.py # ownership_view, asset_cards, collection_stats, etc.
│ │ │ └── system.py # outbox_events, dlq, api_keys
│ │ ├── events.py # RawEvent / NormalizedEvent (frozen Pydantic envelope)
│ │ ├── idempotency.py # applied_events pattern (reusable)
│ │ └── address.py # per-chain canonicalization
│ ├── ingest/ # Chain adapters
│ │ ├── base.py # ChainAdapter ABC
│ │ ├── polygon/ # PolygonAdapter + ERC-721/1155 parser + ABIs
│ │ ├── ton/ # TonAdapter + TEP-62 parser
│ │ └── fetcher.py # BlockFetcher (retry, circuit-breaker, cursor)
│ ├── pipeline/ # Processing pipeline
│ │ ├── normalizer.py # Dedup via UPSERT, catalog stubs
│ │ ├── state_updater.py # Ownership + listings/sales
│ │ ├── projections.py # ownership_view, asset_cards, portfolio, listings
│ │ └── reorg.py # Reorg detection + handler
│ ├── api/ # FastAPI
│ │ ├── app.py # Lifespan, middleware, mount routers
│ │ ├── deps.py # Depends: read-only session, auth
│ │ ├── auth.py # API key auth (Phase 1 minimal)
│ │ ├── routers/ # assets, collections, health
│ │ └── schemas/ # Pydantic response models
│ ├── worker/ # workerlib v2
│ │ ├── framework.py # @task, Trigger, DI, Beat scheduler
│ │ ├── redis_streams.py # Redis Streams consumer/producer
│ │ └── outbox.py # Outbox publisher
│ └── observability/ # Metrics + logging
│ ├── metrics.py # Prometheus counters/gauges
│ └── logging.py # structlog JSON to stdout
├── workers/ # Thin entry points (or python -m ft.worker <name>)
├── migrations/ # Raw SQL, numbered (001_baseline.sql, ...)
├── tests/
│ ├── conftest.py # testcontainers Postgres fixture
│ ├── fixtures/ # Real mainnet blocks as JSON
│ ├── contract/ # 5 contract tests per adapter
│ ├── idempotency/ # 3 idempotency scenarios
│ ├── reorg/ # 5 reorg scenarios
│ ├── e2e/ # Full pipeline test
│ ├── reconciliation/ # ownership_view vs ground truth
│ └── api/ # API + domain isolation tests
├── docker/
│ ├── Dockerfile # Multi-stage, single image
│ ├── docker-compose.yml # Dev: postgres, redis, minio
│ └── docker-compose.prod.yml
└── docs/
Database Schemas (Phase 1)¶
7 schemas: ref, ingest, ledger, catalog, market, projection, system.
Key Tables¶
ref: chains, networks (with finality_depth, rpc_url), token_standards, marketplaces
ingest: raw_events (source_event_id unique per network, raw_payload JSONB)
ledger:
- normalized_event_keys — PK(source_event_id, sub_index), is_reverted flag. UPSERT handles reorg re-inclusion.
- normalized_events — event_kind, asset_id, block_number, finality_status
- normalized_event_deltas — account_address, qty_delta (+1/-1, >1 for ERC-1155)
- applied_events — PK(consumer_id, normalized_event_id). INSERT ON CONFLICT DO NOTHING = idempotency.
catalog: assets (stubs with is_stub=true until metadata fetched in Phase 2)
market: ownership_current (UPSERT qty+=delta, DELETE WHERE qty<=0), listings_current, sales_history
projection: ownership_view, asset_cards, collection_stats, portfolio_assets, listing_cards — API reads ONLY these.
system: sync_cursors, outbox_events (published_at NULL = pending), dlq, api_keys
ID Strategy¶
All IDs: uuidv5(NAMESPACE, ":".join(natural_key_parts)). Deterministic. No auto-increment.
Address Canonicalization¶
Per-chain at ingestion time. EVM: lower(). TON: raw form 0:abc.... Solana: untouched (case-sensitive). Never universal lower().
Pipeline Architecture¶
Chain RPC/API → Adapter (per network) → Redis Stream (chain.{network}.raw_events)
→ Normalizer (dedup UPSERT) → Redis Stream (ledger.normalized)
→ State Updater (ownership, listings, sales) → Redis Streams (domain events)
→ Projection Pipeline (applied_events idempotency) → projection.* tables
→ API (read-only from projection.*)
Event Envelope¶
RawEvent — frozen Pydantic model with schema_version. Fields never removed/renamed after deploy. Optional fields can be added.
Outbox Pattern¶
Workers write to PostgreSQL + system.outbox_events in one transaction. Outbox Publisher polls unpublished events and pushes to Redis Streams. After 5 failures → DLQ.
Reorg Handling¶
Adapter detects (parent_hash mismatch). Marks is_reverted=true on affected normalized_event_keys. Recomputes ownership_current for affected assets only. Re-included events: UPSERT flips is_reverted back to false.
Chain Adapters¶
Polygon (EVM)¶
- RPC: public endpoint or Alchemy.
eth_getLogswith topic0 filter. - Finality depth: ~128 blocks.
- Parses: ERC-721 Transfer, ERC-1155 TransferSingle/TransferBatch, Seaport events.
- ERC-1155 batch: one log → N deltas with contiguous sub_index 0..N-1.
TON¶
- API: TonAPI v2.
- Finality: masterchain block level.
- Addresses: raw form
0:abc..., never bounceable. - Parses: TEP-62 transfer_notification, GetGems marketplace events.
- Cursor: masterchain seqno.
Contract Tests (5 per adapter)¶
- Schema validation — all RawEvents pass Pydantic
- Sub-index completeness — contiguous 0..N-1 for batches
- Delta conservation — transfer: sum=0, mint: >0, burn: <0
- Source event ID stability — same block twice = same IDs
- Empty deltas for non-transfers — list/cancel have no deltas
workerlib v2¶
Decorator-based task registration with typed DI:
@task(trigger=Trigger.stream("ledger.normalized"), consumer_group="ownership_updater")
async def update_ownership(event: NormalizedEvent, db: AsyncSession = Depends()):
...
Single Docker image, different CMD: python -m ft.worker <task_name>.
Graceful shutdown on SIGTERM. Configurable max_in_flight for backpressure.
API¶
FastAPI with read-only sessions bound to projection.* schema.
Phase 1 endpoints:
- GET /api/v1/assets/{id} — asset card + finality_status
- GET /api/v1/assets/{id}/owners — ownership view
- GET /api/v1/collections/{id} — collection stats
- GET /api/v1/collections/{id}/assets — asset cards filtered
- GET /api/v1/collections/{id}/listings — listing cards
- GET /health / GET /health/ready
All responses include finality_status. Stubs return is_stub: true.
Observability¶
Prometheus metrics: ingest_lag, normalizer_lag, projection_lag, outbox_unpublished, dlq_depth, events_processed, reorgs_detected.
Alerts: dlq_depth > 0 (immediate), outbox stuck > 5m, ingest_lag > 300s.
Structured logging via structlog → JSON stdout → Docker log driver.
Testing & Exit Criteria¶
All tests run against real Postgres via testcontainers. Fixture data from mainnet blocks.
| Exit Criterion | Test Location |
|---|---|
| Contract tests (5 × 2 adapters) | tests/contract/ |
| Idempotency (3 scenarios: single delivery, concurrent delivery, partial failure recovery) | tests/idempotency/ |
| Reorg (5 scenarios from arch §16.2, against real Postgres — no mocks) | tests/reorg/ |
| E2E pipeline (chain event → API response with correct finality_status) | tests/e2e/ |
| Ownership reconciliation (0 mismatches for 100 assets) | tests/reconciliation/ |
| Domain isolation (API reads ONLY projection.* — no cross-domain SQL joins) | tests/api/ |
| Observability (pipeline lag dashboard visible, dlq_depth alert firing) | manual / monitoring |
| Lint + format | ruff check + ruff format |
CI: GitHub Actions blocks merge if any test fails.
Context: Lessons from gifts.supply¶
Carried forward: CatalogClient-like unified data access, @task decorator + DI pattern, PostgreSQL SKIP LOCKED for simple queuing, structured logging pipeline, schema-first approach.
Improved: single model representation (no ORM+dataclass dual), models split by domain (not 1500-line schema.py), proper package manager (uv), single Dockerfile, typed config (Pydantic Settings), formal test coverage matching exit criteria.