# Agencio Predict - Data Feed Pipeline & 3rd Party Ingestion

## Overview

The Data Feed Pipeline is the nervous system of Agencio Predict. It handles
all external data ingestion — from prediction market prices to social sentiment
to custom user-defined data streams.

**Design principles:**
- Every data source is a "feed" with a consistent interface
- Feeds are pluggable — add new sources without changing core code
- Each feed run is tracked for observability and debugging
- Failed ingestion never corrupts existing data
- Rate limits and quotas are respected automatically

---

## Architecture

```
┌──────────────────────────────────────────────────────────────────────┐
│                        DATA SOURCES (External)                       │
│                                                                      │
│  Prediction Markets    News          Social         Custom            │
│  ├─ Polymarket        ├─ NewsAPI    ├─ X/Twitter   ├─ Webhooks       │
│  ├─ Kalshi            ├─ RSS Feeds  ├─ Reddit      ├─ CSV Upload     │
│  ├─ Manifold          ├─ Google     ├─ TikTok      ├─ API Push       │
│  ├─ Metaculus         │  News       ├─ YouTube     ├─ Google Sheets  │
│  └─ PredictIt         ├─ GDELT      ├─ LinkedIn    └─ Airtable      │
│                       └─ MediaStack └─ Telegram                      │
│                                                                      │
│  Financial             Alternative    Government                     │
│  ├─ Yahoo Finance     ├─ Google      ├─ Fed Reserve                  │
│  ├─ Alpha Vantage     │  Trends      ├─ BLS (labor)                  │
│  ├─ CoinGecko         ├─ Wikipedia   ├─ SEC filings                  │
│  └─ FRED              │  pageviews   ├─ Congress.gov                 │
│                       └─ App Store   └─ Open data portals            │
│                         rankings                                     │
└──────────────────────────────┬───────────────────────────────────────┘
                               │
                               ▼
┌──────────────────────────────────────────────────────────────────────┐
│                     INGESTION LAYER                                  │
│                                                                      │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               │
│  │  Scheduler   │  │  Webhook     │  │  Manual      │               │
│  │  (Cron)      │  │  Receiver    │  │  Upload      │               │
│  │              │  │              │  │              │               │
│  │ Polls on     │  │ Receives     │  │ CSV, JSON    │               │
│  │ interval     │  │ push data    │  │ file upload  │               │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘               │
│         │                 │                 │                        │
│         └─────────────────┼─────────────────┘                        │
│                           ▼                                          │
│  ┌─────────────────────────────────────────────────────────────────┐ │
│  │                    FEED PROCESSOR                               │ │
│  │                                                                 │ │
│  │  1. Authenticate (API key, OAuth, scrape)                       │ │
│  │  2. Fetch raw data                                              │ │
│  │  3. Validate schema                                             │ │
│  │  4. Transform → normalized signal format                        │ │
│  │  5. Deduplicate (prevent duplicate signals)                     │ │
│  │  6. Enrich (add metadata, compute derived values)               │ │
│  │  7. Match to events (auto-map or manual mapping)                │ │
│  │  8. Write to prediction_signals                                 │ │
│  │  9. Log feed run result                                         │ │
│  └─────────────────────────────────────────────────────────────────┘ │
│                           │                                          │
│                           ▼                                          │
│  ┌─────────────────────────────────────────────────────────────────┐ │
│  │                    POST-INGEST PIPELINE                         │ │
│  │                                                                 │ │
│  │  1. Recompute event aggregates (probability, sentiment, etc.)   │ │
│  │  2. Create snapshot (time-series data point)                    │ │
│  │  3. Run anomaly detection                                       │ │
│  │  4. Evaluate trigger rules (→ fires actions if matched)         │ │
│  │  5. Check if explanation needed (significant movement)          │ │
│  │  6. Update trust scores                                         │ │
│  │  7. Broadcast via Supabase Realtime                             │ │
│  └─────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘
```

---

## Feed Adapter Interface

Every data source implements a standard adapter interface. This makes adding
new sources a matter of writing one file.

```typescript
interface FeedAdapter {
  // Metadata
  id: string;                    // 'polymarket', 'newsapi', 'x_api', etc.
  name: string;                  // 'Polymarket'
  category: FeedCategory;        // 'prediction_market' | 'news' | 'social' | 'financial' | 'alternative' | 'custom'

  // Authentication
  authType: 'api_key' | 'oauth2' | 'bearer' | 'none' | 'scrape';

  // Capabilities
  supportsPull: boolean;         // can we poll it?
  supportsPush: boolean;         // can it push to us via webhook?
  supportsBackfill: boolean;     // can we fetch historical data?

  // Rate limits
  rateLimit: {
    requests_per_minute: number;
    requests_per_day: number;
    concurrent: number;
  };

  // Methods
  authenticate(credentials: Credentials): Promise<AuthResult>;
  fetch(params: FetchParams): Promise<RawDataBatch>;
  transform(raw: RawDataBatch): Promise<NormalizedSignal[]>;
  mapToEvents(signals: NormalizedSignal[]): Promise<MappedSignal[]>;
  healthCheck(): Promise<HealthStatus>;
}

interface NormalizedSignal {
  source: string;
  source_event_id: string;       // ID on the external platform
  signal_type: SignalType;
  value: number;                 // normalized to our scale
  raw_data: object;              // original payload preserved
  narrative?: string;            // extracted headline / text
  fetched_at: Date;
  metadata: {
    confidence: number;          // how reliable is this signal
    latency_ms: number;          // how long the fetch took
    source_url?: string;
  };
}

interface MappedSignal extends NormalizedSignal {
  event_id: string;              // matched to our prediction_events
  match_confidence: number;      // how confident is the event mapping
  match_method: 'exact_id' | 'keyword' | 'ai_match' | 'manual';
}
```

---

## Built-in Feed Adapters

### Prediction Markets

#### Polymarket
```yaml
id: polymarket
category: prediction_market
auth: none (public API) / API key (private)
poll_interval: 5 min
signal_type: MARKET_PRICE
data_points:
  - market probability (YES price)
  - 24h volume
  - total liquidity
  - number of traders
  - price history
api_base: https://clob.polymarket.com
notes: Primary source. Highest weight for political/geopolitical events.
```

#### Kalshi
```yaml
id: kalshi
category: prediction_market
auth: api_key
poll_interval: 5 min
signal_type: MARKET_PRICE
data_points:
  - contract price
  - volume
  - open interest
  - order book depth
api_base: https://trading-api.kalshi.com/trade-api/v2
notes: Regulated US exchange. Higher trust weight than unregulated markets.
```

#### Manifold Markets
```yaml
id: manifold
category: prediction_market
auth: api_key (optional)
poll_interval: 15 min
signal_type: MARKET_PRICE
data_points:
  - probability
  - volume
  - trader count
  - comments/discussion
api_base: https://api.manifold.markets/v0
notes: Community-driven. Good for niche/tech predictions. Lower weight.
```

#### Metaculus
```yaml
id: metaculus
category: prediction_market
auth: api_key
poll_interval: 30 min
signal_type: EXPERT
data_points:
  - community median prediction
  - number of forecasters
  - metaculus prediction (calibrated)
api_base: https://www.metaculus.com/api2
notes: Expert forecaster community. High weight for calibrated predictions.
```

### News Sources

#### NewsAPI
```yaml
id: newsapi
category: news
auth: api_key
poll_interval: 15 min
signal_type: NEWS
data_points:
  - headline
  - description
  - source
  - published_at
  - relevance_score
api_base: https://newsapi.org/v2
notes: 80K+ sources. Use keyword matching to map to events.
```

#### GDELT Project
```yaml
id: gdelt
category: news
auth: none
poll_interval: 15 min
signal_type: NEWS
data_points:
  - event tone (sentiment)
  - event count
  - geographic spread
  - media attention volume
  - goldstein scale (conflict/cooperation)
api_base: https://api.gdeltproject.org/api/v2
notes: Massive global event database. Excellent for geopolitical events.
```

#### RSS Aggregator
```yaml
id: rss
category: news
auth: none
poll_interval: 10 min
signal_type: NEWS
data_points:
  - title
  - content snippet
  - source
  - published_at
config:
  feeds:
    - https://feeds.reuters.com/reuters/topNews
    - https://feeds.bbci.co.uk/news/world/rss.xml
    - https://rss.nytimes.com/services/xml/rss/nyt/World.xml
    # ... user-configurable
notes: Customizable. Users can add their own RSS feeds.
```

### Social Sentiment

#### X / Twitter
```yaml
id: x_api
category: social
auth: oauth2 (X API v2)
poll_interval: 10 min
signal_type: SENTIMENT
data_points:
  - tweet volume (keyword-matched)
  - aggregate sentiment (NLP scored)
  - engagement metrics (likes, retweets, replies)
  - influential account mentions
  - trending status
api_base: https://api.x.com/2
notes: Primary social signal. Sentiment computed via Claude API or local NLP.
rate_limit: 300 req/15min (Basic), 900 req/15min (Pro)
```

#### Reddit
```yaml
id: reddit
category: social
auth: oauth2
poll_interval: 15 min
signal_type: SENTIMENT
data_points:
  - post volume in relevant subreddits
  - comment sentiment
  - upvote ratio
  - discussion intensity (comments per post)
api_base: https://oauth.reddit.com/api/v1
subreddits:
  - r/prediction_markets
  - r/geopolitics
  - r/marketing
  - r/technology
  # ... event-specific subreddits configurable
notes: Strong signal for niche communities. Good leading indicator.
```

### Financial Data

#### Yahoo Finance
```yaml
id: yahoo_finance
category: financial
auth: none (public) / rapidapi_key
poll_interval: 15 min
signal_type: MARKET_PRICE
data_points:
  - stock/commodity/crypto price
  - volume
  - moving averages
  - volatility (implied + historical)
notes: Useful for finance-adjacent predictions. Oil price, crypto, etc.
```

#### FRED (Federal Reserve Economic Data)
```yaml
id: fred
category: financial
auth: api_key
poll_interval: daily
signal_type: MARKET_PRICE
data_points:
  - GDP, CPI, unemployment
  - interest rates
  - money supply
  - consumer confidence
api_base: https://api.stlouisfed.org/fred
notes: Macroeconomic data. Low frequency but high signal for economic predictions.
```

### Alternative Data

#### Google Trends
```yaml
id: google_trends
category: alternative
auth: none (scrape) / serpapi_key
poll_interval: 60 min
signal_type: SENTIMENT
data_points:
  - search interest (0-100)
  - related queries
  - geographic breakdown
  - rising vs top queries
notes: Leading indicator for public attention. Good for marketing predictions.
```

#### Wikipedia Pageviews
```yaml
id: wikipedia
category: alternative
auth: none
poll_interval: daily
signal_type: SENTIMENT
data_points:
  - daily pageviews per article
  - view trend (7-day moving average)
api_base: https://wikimedia.org/api/rest_v1
notes: Surprising leading indicator. Pageview spikes often precede events.
```

---

## Custom Data Feeds (User-Defined)

Users can create custom feeds via three methods:

### 1. Webhook Receiver (Push)

External systems push data to us.

```
POST /api/predict/v1/feeds/webhook/:feed_id
Authorization: Bearer <feed_api_key>
Content-Type: application/json

{
  "signals": [
    {
      "event_id": "uuid",              // or event_slug
      "signal_type": "MANUAL",
      "value": 0.65,
      "narrative": "Internal survey shows 65% expect launch delay",
      "metadata": { "source": "internal_survey", "sample_size": 200 }
    }
  ]
}
```

### 2. CSV / JSON Upload (Batch)

Upload files via the UI or API.

```
POST /api/predict/v1/feeds/upload
Content-Type: multipart/form-data

file: signals.csv
mapping: { "event_column": "question", "value_column": "probability", "date_column": "timestamp" }
```

**CSV format:**
```csv
question,probability,sentiment,source,timestamp
"Will oil hit $100?",0.72,0.41,internal_model,2026-03-24T10:00:00Z
"TikTok ban by 2026?",0.35,-0.22,internal_model,2026-03-24T10:00:00Z
```

### 3. Polling Custom API (Pull)

Configure a custom API endpoint we'll poll.

```json
{
  "feed_id": "my-custom-model",
  "name": "Internal ML Model",
  "type": "custom_api",
  "config": {
    "url": "https://ml.mycompany.com/predictions",
    "method": "GET",
    "headers": { "Authorization": "Bearer {{secret:ml_api_key}}" },
    "poll_interval_minutes": 30,
    "response_mapping": {
      "signals_path": "$.predictions",
      "event_id_field": "$.question_id",
      "value_field": "$.probability",
      "narrative_field": "$.explanation"
    }
  }
}
```

---

## Event Auto-Matching

When signals come in without an explicit `event_id`, the pipeline tries to match them.

```
Incoming signal (no event_id)
         │
         ▼
┌─────────────────────────┐
│ 1. Exact ID match       │ ← source_event_id matches a tracked event's source_urls
│    (Polymarket ID, etc.) │
└──────────┬──────────────┘
           │ no match
           ▼
┌─────────────────────────┐
│ 2. Keyword match        │ ← TF-IDF or keyword overlap with event titles
│    (fast, deterministic) │
└──────────┬──────────────┘
           │ low confidence
           ▼
┌─────────────────────────┐
│ 3. AI match (Claude)    │ ← "Does this signal relate to any of these events?"
│    (slower, higher acc)  │
└──────────┬──────────────┘
           │ no match
           ▼
┌─────────────────────────┐
│ 4. Unmatched queue      │ ← holds for manual review or auto-creates new event
└─────────────────────────┘
```

### Auto-Event Creation

If enough unmatched signals cluster around a topic:

```
Unmatched signals accumulate
         │
         ▼
┌─────────────────────────┐
│ Clustering algorithm    │ ← groups similar unmatched signals
│ (every 30 min)          │
└──────────┬──────────────┘
           │ cluster > 5 signals
           ▼
┌─────────────────────────┐
│ AI event generation     │ ← Claude generates title, description, category
│                         │
│ "Should Iran escalation │
│  be tracked as a new    │
│  prediction event?"     │
└──────────┬──────────────┘
           │
           ▼
┌─────────────────────────┐
│ Auto-create as DRAFT    │ ← requires human approval to go ACTIVE
│ Notify admins           │
└─────────────────────────┘
```

---

## Database Schema

### prediction_data_sources

Registry of all available data source types.

```sql
CREATE TABLE prediction_data_sources (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

  -- Identity
  source_id TEXT NOT NULL UNIQUE,    -- 'polymarket', 'newsapi', 'x_api', 'custom_webhook_abc'
  name TEXT NOT NULL,                -- 'Polymarket'
  category TEXT NOT NULL CHECK (category IN (
    'prediction_market', 'news', 'social', 'financial', 'alternative', 'government', 'custom'
  )),
  description TEXT,
  icon_url TEXT,

  -- Type
  adapter_type TEXT NOT NULL CHECK (adapter_type IN (
    'built_in',    -- shipped with the platform
    'custom_api',  -- user-configured polling endpoint
    'webhook',     -- user receives push data
    'upload'       -- manual file upload
  )),

  -- Configuration template (for custom feeds)
  config_schema JSONB,              -- JSON Schema for the config object
  default_config JSONB,

  -- Capabilities
  supports_pull BOOLEAN DEFAULT true,
  supports_push BOOLEAN DEFAULT false,
  supports_backfill BOOLEAN DEFAULT false,

  -- Rate limits
  rate_limit_rpm INTEGER,            -- requests per minute
  rate_limit_rpd INTEGER,            -- requests per day

  -- Status
  is_system BOOLEAN DEFAULT false,   -- built-in vs user-created
  status TEXT DEFAULT 'ACTIVE' CHECK (status IN ('ACTIVE', 'DEPRECATED', 'DISABLED')),

  created_at TIMESTAMPTZ DEFAULT now()
);
```

### prediction_data_feeds

Instances of feeds — a source configured for a specific user/purpose.

```sql
CREATE TABLE prediction_data_feeds (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  source_id UUID NOT NULL REFERENCES prediction_data_sources(id),
  user_id UUID NOT NULL REFERENCES auth.users(id),

  -- Configuration
  name TEXT NOT NULL,                -- "My Polymarket feed" or "Company ML Model"
  config JSONB NOT NULL DEFAULT '{}',
  -- For built-in: { "api_key": "...", "markets": ["uuid1", "uuid2"] }
  -- For custom_api: { "url": "...", "headers": {...}, "response_mapping": {...} }
  -- For webhook: { "secret": "...", "allowed_ips": [...] }

  -- Scheduling
  poll_interval_minutes INTEGER DEFAULT 15,
  cron_expression TEXT,              -- optional: custom cron schedule
  next_run_at TIMESTAMPTZ,

  -- Event mapping
  event_mapping_mode TEXT DEFAULT 'auto'
    CHECK (event_mapping_mode IN ('auto', 'manual', 'ai')),
  manual_event_mappings JSONB,       -- { "source_event_id": "our_event_id", ... }

  -- Signal configuration
  default_signal_type TEXT DEFAULT 'MARKET_PRICE',
  default_weight DECIMAL(3,2) DEFAULT 1.0,
  transform_config JSONB,           -- custom value transformations

  -- Authentication
  credentials_encrypted JSONB,       -- encrypted API keys, tokens
  auth_status TEXT DEFAULT 'ACTIVE'
    CHECK (auth_status IN ('ACTIVE', 'EXPIRED', 'INVALID', 'PENDING')),
  auth_expires_at TIMESTAMPTZ,

  -- Status
  enabled BOOLEAN DEFAULT true,
  last_successful_run TIMESTAMPTZ,
  last_error TEXT,
  consecutive_failures INTEGER DEFAULT 0,
  auto_disabled_at TIMESTAMPTZ,      -- auto-disabled after N failures

  -- Health
  avg_latency_ms INTEGER,
  signals_ingested_total BIGINT DEFAULT 0,
  signals_ingested_24h INTEGER DEFAULT 0,

  created_at TIMESTAMPTZ DEFAULT now(),
  updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_feeds_source ON prediction_data_feeds(source_id);
CREATE INDEX idx_feeds_user ON prediction_data_feeds(user_id);
CREATE INDEX idx_feeds_next_run ON prediction_data_feeds(next_run_at) WHERE enabled = true;
CREATE INDEX idx_feeds_enabled ON prediction_data_feeds(enabled);
```

### prediction_feed_runs

Log of every feed execution — complete observability.

```sql
CREATE TABLE prediction_feed_runs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  feed_id UUID NOT NULL REFERENCES prediction_data_feeds(id) ON DELETE CASCADE,

  -- Timing
  started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  completed_at TIMESTAMPTZ,
  duration_ms INTEGER,

  -- Results
  status TEXT NOT NULL DEFAULT 'RUNNING'
    CHECK (status IN ('RUNNING', 'SUCCESS', 'PARTIAL', 'FAILED', 'TIMEOUT', 'RATE_LIMITED', 'AUTH_FAILED')),

  -- Metrics
  signals_fetched INTEGER DEFAULT 0,        -- raw signals from source
  signals_valid INTEGER DEFAULT 0,          -- passed validation
  signals_deduplicated INTEGER DEFAULT 0,   -- removed as duplicates
  signals_matched INTEGER DEFAULT 0,        -- matched to events
  signals_unmatched INTEGER DEFAULT 0,      -- couldn't match to events
  signals_written INTEGER DEFAULT 0,        -- actually saved to DB
  events_updated INTEGER DEFAULT 0,         -- events whose aggregates were recomputed

  -- Debug
  request_count INTEGER DEFAULT 0,          -- API calls made
  bytes_received BIGINT DEFAULT 0,
  rate_limit_remaining INTEGER,
  error_message TEXT,
  error_details JSONB,
  raw_response_sample JSONB,                -- first N items of raw response (for debugging)

  -- Trigger evaluation
  triggers_evaluated INTEGER DEFAULT 0,
  triggers_fired INTEGER DEFAULT 0,

  created_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_runs_feed ON prediction_feed_runs(feed_id);
CREATE INDEX idx_runs_status ON prediction_feed_runs(status);
CREATE INDEX idx_runs_started ON prediction_feed_runs(started_at DESC);

-- Retention policy: keep 30 days of run logs, archive older
-- (implement via pg_cron or application-level cleanup)
```

### prediction_unmatched_signals

Signals that couldn't be matched to any event.

```sql
CREATE TABLE prediction_unmatched_signals (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  feed_run_id UUID REFERENCES prediction_feed_runs(id),
  feed_id UUID NOT NULL REFERENCES prediction_data_feeds(id),

  -- Signal data
  source_event_id TEXT,
  signal_type TEXT,
  value DECIMAL(10,4),
  narrative TEXT,
  raw_data JSONB,

  -- Matching attempts
  match_attempts JSONB,          -- [{ "method": "keyword", "best_match": "event_id", "confidence": 0.3 }]
  suggested_event_id UUID,       -- AI's best guess
  suggestion_confidence DECIMAL(3,2),

  -- Resolution
  status TEXT DEFAULT 'PENDING'
    CHECK (status IN ('PENDING', 'MATCHED', 'NEW_EVENT_CREATED', 'DISCARDED')),
  resolved_event_id UUID REFERENCES prediction_events(id),
  resolved_by UUID REFERENCES auth.users(id),
  resolved_at TIMESTAMPTZ,

  created_at TIMESTAMPTZ DEFAULT now()
);

CREATE INDEX idx_unmatched_feed ON prediction_unmatched_signals(feed_id);
CREATE INDEX idx_unmatched_status ON prediction_unmatched_signals(status);
```

---

## API Endpoints

### Feed Management

```
GET    /api/predict/v1/feeds                     → list user's feeds
GET    /api/predict/v1/feeds/:id                 → feed detail + recent runs
POST   /api/predict/v1/feeds                     → create feed
PUT    /api/predict/v1/feeds/:id                 → update feed config
DELETE /api/predict/v1/feeds/:id                 → delete feed
POST   /api/predict/v1/feeds/:id/toggle          → enable/disable
POST   /api/predict/v1/feeds/:id/run             → trigger manual run
POST   /api/predict/v1/feeds/:id/backfill        → fetch historical data
POST   /api/predict/v1/feeds/:id/test            → test connection (dry run)
```

### Feed Runs

```
GET    /api/predict/v1/feeds/:id/runs            → list run history
GET    /api/predict/v1/feeds/runs/:runId         → run detail + metrics
```

### Data Sources (Registry)

```
GET    /api/predict/v1/sources                   → list available source types
GET    /api/predict/v1/sources/:sourceId         → source detail + config schema
```

### Webhook Receiver

```
POST   /api/predict/v1/feeds/webhook/:feedId     → receive push data
```

### Upload

```
POST   /api/predict/v1/feeds/upload              → upload CSV/JSON file
```

### Unmatched Signals

```
GET    /api/predict/v1/signals/unmatched          → list unmatched signals
POST   /api/predict/v1/signals/unmatched/:id/match → manually match to event
POST   /api/predict/v1/signals/unmatched/:id/discard → discard signal
POST   /api/predict/v1/signals/unmatched/auto-create → create event from cluster
```

---

## Feed Health Dashboard

### Frontend: `/predict/feeds`

```
┌──────────────────────────────────────────────────────────────────┐
│  Data Feeds                                    [+ Add Feed]      │
│                                                                  │
│  ┌── Active Feeds ─────────────────────────────────────────────┐ │
│  │                                                             │ │
│  │  ✅ Polymarket       every 5m    │ 2.4K signals/24h │ 45ms │ │
│  │     Last run: 2 min ago (SUCCESS, 34 signals)        [▶Run]│ │
│  │                                                             │ │
│  │  ✅ NewsAPI           every 15m  │ 890 signals/24h  │ 120ms│ │
│  │     Last run: 8 min ago (SUCCESS, 12 signals)        [▶Run]│ │
│  │                                                             │ │
│  │  ✅ X / Twitter       every 10m  │ 5.1K signals/24h │ 340ms│ │
│  │     Last run: 3 min ago (SUCCESS, 156 signals)       [▶Run]│ │
│  │                                                             │ │
│  │  ⚠️ Reddit            every 15m  │ 420 signals/24h  │ 890ms│ │
│  │     Last run: 5 min ago (PARTIAL, 8/15 subreddits)   [▶Run]│ │
│  │     ⚠️ Rate limited on 7 subreddits                         │ │
│  │                                                             │ │
│  │  ❌ Custom: ML Model  every 30m  │ 0 signals/24h    │ ---  │ │
│  │     Last run: 2h ago (AUTH_FAILED)                   [Fix] │ │
│  │     ❌ API key expired. Reconnect required.                  │ │
│  │                                                             │ │
│  │  ✅ Webhook: CRM Data  push     │ 45 signals/24h   │ 12ms │ │
│  │     Last received: 20 min ago                               │ │
│  └─────────────────────────────────────────────────────────────┘ │
│                                                                  │
│  ┌── Pipeline Metrics (24h) ───────────────────────────────────┐ │
│  │                                                             │ │
│  │  Total signals ingested:  8,855                             │ │
│  │  Events updated:          142                               │ │
│  │  Unmatched signals:       23  [Review]                      │ │
│  │  Triggers evaluated:      1,204                             │ │
│  │  Triggers fired:          18                                │ │
│  │  Avg pipeline latency:    180ms                             │ │
│  │                                                             │ │
│  │  ┌─ Signals by Source ──────────────────────────────────┐   │ │
│  │  │ X/Twitter   ████████████████████████████  5,100  57% │   │ │
│  │  │ Polymarket  ███████████                   2,400  27% │   │ │
│  │  │ NewsAPI     ████                            890  10% │   │ │
│  │  │ Reddit      ██                              420   5% │   │ │
│  │  │ Webhook     ▌                                45   1% │   │ │
│  │  └──────────────────────────────────────────────────────┘   │ │
│  └─────────────────────────────────────────────────────────────┘ │
│                                                                  │
│  ┌── Unmatched Signals (23) ───────────────────────────────────┐ │
│  │                                                             │ │
│  │  "New tariff announcement from EU on Chinese EVs"           │ │
│  │  Source: NewsAPI │ AI suggestion: EU-China trade (78%)       │ │
│  │                              [Match to Event] [Create New]  │ │
│  │                                                             │ │
│  │  "SpaceX Starship test flight window opens"                 │ │
│  │  Source: X/Twitter │ No match found                         │ │
│  │                              [Match to Event] [Create New]  │ │
│  └─────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
```

---

## Error Handling & Resilience

### Auto-Disable on Failure

```
Feed fails → increment consecutive_failures
  1-2 failures  → retry on next interval (log warning)
  3-4 failures  → reduce frequency (2x interval), notify user
  5+ failures   → auto-disable feed, notify user, set auto_disabled_at

User fixes issue → re-enable → reset consecutive_failures
```

### Retry Strategy

```
Transient errors (timeout, 429, 500, 502, 503):
  → Retry with exponential backoff: 5s, 15s, 45s
  → Max 3 retries per run

Auth errors (401, 403):
  → No retry
  → Mark auth_status = 'EXPIRED' or 'INVALID'
  → Notify user to reconnect

Rate limit (429):
  → Check Retry-After header
  → Reschedule run
  → Log rate_limit_remaining
```

### Deduplication

```sql
-- Signals are deduplicated by source + source_event_id + signal_type + time bucket
-- A signal is a duplicate if the same source reported the same value for the same
-- external event within the same 5-minute window

CREATE UNIQUE INDEX idx_signals_dedup ON prediction_signals (
  source, source_event_id, signal_type,
  date_trunc('minute', fetched_at / 5) -- 5-minute buckets
) WHERE source_event_id IS NOT NULL;
```

---

## Build Phase

This fits into **Phase 1** (Signal Engine) and expands through later phases.

### Phase 1: Core Pipeline
- [ ] Feed adapter interface
- [ ] Polymarket adapter (first source)
- [ ] NewsAPI adapter
- [ ] Feed scheduler (cron-based polling)
- [ ] Signal deduplication
- [ ] Event auto-matching (keyword-based)
- [ ] prediction_data_sources table (seeded with built-in sources)
- [ ] prediction_data_feeds table
- [ ] prediction_feed_runs table
- [ ] Basic feed health API

### Phase 2: Expansion
- [ ] X/Twitter adapter
- [ ] Reddit adapter
- [ ] Google Trends adapter
- [ ] AI-based event matching (Claude)
- [ ] Unmatched signal queue + UI
- [ ] Feed health dashboard (`/predict/feeds`)

### Phase 3: Custom Feeds
- [ ] Webhook receiver
- [ ] CSV/JSON upload
- [ ] Custom API polling configuration
- [ ] Auto-event creation from unmatched clusters

### Phase 6+: Advanced
- [ ] GDELT adapter
- [ ] Yahoo Finance adapter
- [ ] FRED adapter
- [ ] Wikipedia pageviews adapter
- [ ] Google Sheets / Airtable adapters
- [ ] Backfill capability
- [ ] Feed marketplace (community-contributed adapters)
