Skip to content

Phase 1 Design — ft.supply

Approved: 2026-02-28

Decisions

Aspect Decision
Networks Polygon (EVM) + TON in parallel
Package manager uv + pyproject.toml
Python 3.12+
ORM SQLAlchemy 2.0 Mapped[] (no dual model representation)
Migrations Raw SQL, additive only (like gifts.supply)
Worker framework workerlib v2 (custom, @task + DI + typed config + graceful shutdown)
Event bus Redis Streams with outbox pattern
Testing pytest + testcontainers (real Postgres, no mocks for DB)
API FastAPI, reads ONLY from projection.* schema
Frontend NOT in Phase 1 scope (internal admin panel moved to Phase 1.5)
Project structure Flat monorepo, src/ft/* namespace package
Docker Single multi-stage Dockerfile, different CMD per worker
CI GitHub Actions: ruff lint/format + pytest
Observability Prometheus metrics + structlog to stdout
DB schemas (Phase 1) 7 of 10: ref, ingest, ledger, catalog, market, projection, system
IDs uuidv5 from natural keys (deterministic, no FK race conditions)
Dedup UPSERT on normalized_event_keys (source_event_id, sub_index)
Idempotency applied_events table (INSERT ON CONFLICT DO NOTHING)
Reorg handling is_reverted flag + recompute affected asset ownership

Project Structure

ft.supply/
├── pyproject.toml
├── uv.lock
├── src/ft/
│   ├── core/                   # models, events, config, idempotency, address
│   │   ├── config.py           # Pydantic Settings
│   │   ├── db.py               # async engine + session factory
│   │   ├── models/             # SQLAlchemy 2.0 Mapped[] per domain
│   │   │   ├── base.py         # DeclarativeBase, uuidv5 mixin
│   │   │   ├── ref.py          # chains, networks, token_standards
│   │   │   ├── ingest.py       # raw_events, sync_cursors
│   │   │   ├── ledger.py       # normalized_events, event_keys, applied_events
│   │   │   ├── catalog.py      # assets, metadata_versions
│   │   │   ├── market.py       # listings_current, sales_history, ownership_current
│   │   │   ├── projection.py   # ownership_view, asset_cards, collection_stats, etc.
│   │   │   └── system.py       # outbox_events, dlq, api_keys
│   │   ├── events.py           # RawEvent / NormalizedEvent (frozen Pydantic envelope)
│   │   ├── idempotency.py      # applied_events pattern (reusable)
│   │   └── address.py          # per-chain canonicalization
│   ├── ingest/                 # Chain adapters
│   │   ├── base.py             # ChainAdapter ABC
│   │   ├── polygon/            # PolygonAdapter + ERC-721/1155 parser + ABIs
│   │   ├── ton/                # TonAdapter + TEP-62 parser
│   │   └── fetcher.py          # BlockFetcher (retry, circuit-breaker, cursor)
│   ├── pipeline/               # Processing pipeline
│   │   ├── normalizer.py       # Dedup via UPSERT, catalog stubs
│   │   ├── state_updater.py    # Ownership + listings/sales
│   │   ├── projections.py      # ownership_view, asset_cards, portfolio, listings
│   │   └── reorg.py            # Reorg detection + handler
│   ├── api/                    # FastAPI
│   │   ├── app.py              # Lifespan, middleware, mount routers
│   │   ├── deps.py             # Depends: read-only session, auth
│   │   ├── auth.py             # API key auth (Phase 1 minimal)
│   │   ├── routers/            # assets, collections, health
│   │   └── schemas/            # Pydantic response models
│   ├── worker/                 # workerlib v2
│   │   ├── framework.py        # @task, Trigger, DI, Beat scheduler
│   │   ├── redis_streams.py    # Redis Streams consumer/producer
│   │   └── outbox.py           # Outbox publisher
│   └── observability/          # Metrics + logging
│       ├── metrics.py          # Prometheus counters/gauges
│       └── logging.py          # structlog JSON to stdout
├── workers/                    # Thin entry points (or python -m ft.worker <name>)
├── migrations/                 # Raw SQL, numbered (001_baseline.sql, ...)
├── tests/
│   ├── conftest.py             # testcontainers Postgres fixture
│   ├── fixtures/               # Real mainnet blocks as JSON
│   ├── contract/               # 5 contract tests per adapter
│   ├── idempotency/            # 3 idempotency scenarios
│   ├── reorg/                  # 5 reorg scenarios
│   ├── e2e/                    # Full pipeline test
│   ├── reconciliation/         # ownership_view vs ground truth
│   └── api/                    # API + domain isolation tests
├── docker/
│   ├── Dockerfile              # Multi-stage, single image
│   ├── docker-compose.yml      # Dev: postgres, redis, minio
│   └── docker-compose.prod.yml
└── docs/

Database Schemas (Phase 1)

7 schemas: ref, ingest, ledger, catalog, market, projection, system.

Key Tables

ref: chains, networks (with finality_depth, rpc_url), token_standards, marketplaces

ingest: raw_events (source_event_id unique per network, raw_payload JSONB)

ledger: - normalized_event_keys — PK(source_event_id, sub_index), is_reverted flag. UPSERT handles reorg re-inclusion. - normalized_events — event_kind, asset_id, block_number, finality_status - normalized_event_deltas — account_address, qty_delta (+1/-1, >1 for ERC-1155) - applied_events — PK(consumer_id, normalized_event_id). INSERT ON CONFLICT DO NOTHING = idempotency.

catalog: assets (stubs with is_stub=true until metadata fetched in Phase 2)

market: ownership_current (UPSERT qty+=delta, DELETE WHERE qty<=0), listings_current, sales_history

projection: ownership_view, asset_cards, collection_stats, portfolio_assets, listing_cards — API reads ONLY these.

system: sync_cursors, outbox_events (published_at NULL = pending), dlq, api_keys

ID Strategy

All IDs: uuidv5(NAMESPACE, ":".join(natural_key_parts)). Deterministic. No auto-increment.

Address Canonicalization

Per-chain at ingestion time. EVM: lower(). TON: raw form 0:abc.... Solana: untouched (case-sensitive). Never universal lower().

Pipeline Architecture

Chain RPC/API → Adapter (per network) → Redis Stream (chain.{network}.raw_events)
→ Normalizer (dedup UPSERT) → Redis Stream (ledger.normalized)
→ State Updater (ownership, listings, sales) → Redis Streams (domain events)
→ Projection Pipeline (applied_events idempotency) → projection.* tables
→ API (read-only from projection.*)

Event Envelope

RawEvent — frozen Pydantic model with schema_version. Fields never removed/renamed after deploy. Optional fields can be added.

Outbox Pattern

Workers write to PostgreSQL + system.outbox_events in one transaction. Outbox Publisher polls unpublished events and pushes to Redis Streams. After 5 failures → DLQ.

Reorg Handling

Adapter detects (parent_hash mismatch). Marks is_reverted=true on affected normalized_event_keys. Recomputes ownership_current for affected assets only. Re-included events: UPSERT flips is_reverted back to false.

Chain Adapters

Polygon (EVM)

  • RPC: public endpoint or Alchemy. eth_getLogs with topic0 filter.
  • Finality depth: ~128 blocks.
  • Parses: ERC-721 Transfer, ERC-1155 TransferSingle/TransferBatch, Seaport events.
  • ERC-1155 batch: one log → N deltas with contiguous sub_index 0..N-1.

TON

  • API: TonAPI v2.
  • Finality: masterchain block level.
  • Addresses: raw form 0:abc..., never bounceable.
  • Parses: TEP-62 transfer_notification, GetGems marketplace events.
  • Cursor: masterchain seqno.

Contract Tests (5 per adapter)

  1. Schema validation — all RawEvents pass Pydantic
  2. Sub-index completeness — contiguous 0..N-1 for batches
  3. Delta conservation — transfer: sum=0, mint: >0, burn: <0
  4. Source event ID stability — same block twice = same IDs
  5. Empty deltas for non-transfers — list/cancel have no deltas

workerlib v2

Decorator-based task registration with typed DI:

@task(trigger=Trigger.stream("ledger.normalized"), consumer_group="ownership_updater")
async def update_ownership(event: NormalizedEvent, db: AsyncSession = Depends()):
    ...

Single Docker image, different CMD: python -m ft.worker <task_name>. Graceful shutdown on SIGTERM. Configurable max_in_flight for backpressure.

API

FastAPI with read-only sessions bound to projection.* schema.

Phase 1 endpoints: - GET /api/v1/assets/{id} — asset card + finality_status - GET /api/v1/assets/{id}/owners — ownership view - GET /api/v1/collections/{id} — collection stats - GET /api/v1/collections/{id}/assets — asset cards filtered - GET /api/v1/collections/{id}/listings — listing cards - GET /health / GET /health/ready

All responses include finality_status. Stubs return is_stub: true.

Observability

Prometheus metrics: ingest_lag, normalizer_lag, projection_lag, outbox_unpublished, dlq_depth, events_processed, reorgs_detected.

Alerts: dlq_depth > 0 (immediate), outbox stuck > 5m, ingest_lag > 300s.

Structured logging via structlog → JSON stdout → Docker log driver.

Testing & Exit Criteria

All tests run against real Postgres via testcontainers. Fixture data from mainnet blocks.

Exit Criterion Test Location
Contract tests (5 × 2 adapters) tests/contract/
Idempotency (3 scenarios: single delivery, concurrent delivery, partial failure recovery) tests/idempotency/
Reorg (5 scenarios from arch §16.2, against real Postgres — no mocks) tests/reorg/
E2E pipeline (chain event → API response with correct finality_status) tests/e2e/
Ownership reconciliation (0 mismatches for 100 assets) tests/reconciliation/
Domain isolation (API reads ONLY projection.* — no cross-domain SQL joins) tests/api/
Observability (pipeline lag dashboard visible, dlq_depth alert firing) manual / monitoring
Lint + format ruff check + ruff format

CI: GitHub Actions blocks merge if any test fails.

Context: Lessons from gifts.supply

Carried forward: CatalogClient-like unified data access, @task decorator + DI pattern, PostgreSQL SKIP LOCKED for simple queuing, structured logging pipeline, schema-first approach.

Improved: single model representation (no ORM+dataclass dual), models split by domain (not 1500-line schema.py), proper package manager (uv), single Dockerfile, typed config (Pydantic Settings), formal test coverage matching exit criteria.