Skip to Content
GuidesBuild a price-monitoring pipeline

Build a price-monitoring pipeline

A price-monitoring pipeline is a scheduled job that polls a watchlist of products on the ShopAPIS API, stores each price snapshot with a timestamp, and fires an alert when the price, Buy Box owner, or stock status changes. The architecture is four parts — a watchlist, a scheduler, a fetch-and-normalize worker, and a snapshot store with change detection — and because ShopAPIS returns one normalized schema, the same loop tracks Amazon, eBay, MercadoLibre and 70+ more without per-site code.

This guide builds the data layer behind the price monitoring solution. Need auth first? See getting started.

Architecture

watchlist (DB) → scheduler (cron) → worker → ShopAPIS /v1/product snapshots (DB) → change detection → alerts
  • Watchlist — the products to track: (marketplace, country, id) rows.
  • Scheduler — a cron/queue that enqueues the watchlist on an interval (e.g. hourly).
  • Worker — calls ShopAPIS per item, with backoff and concurrency control.
  • Snapshot store — append-only rows of price, was_price, buy_box_seller, availability, fetched_at.
  • Change detection — compares the latest snapshot to the previous one and emits events.

The worker

The worker is a single function: fetch, normalize, persist, diff. Concurrency is bounded so you stay inside your rate limit (see handling pagination & rate limits).

import time, requests API = "https://api.shopapis.com/v1/product" HEADERS = {"Authorization": "Bearer YOUR_API_KEY"} def fetch_price(item): for attempt in range(5): r = requests.get(API, params=item, headers=HEADERS, timeout=30) if r.status_code == 429: # rate limited → back off time.sleep(2 ** attempt) continue r.raise_for_status() p = r.json() return { "marketplace": p["marketplace"], "id": item["id"], "price": p["price"]["current"], "was_price": p["price"].get("list"), "currency": p["price"]["currency"], "buy_box_seller": p.get("seller", {}).get("name"), "in_stock": p["availability"]["in_stock"], "fetched_at": p["fetched_at"], } raise RuntimeError("rate limited after retries") def detect_change(prev, curr): events = [] if prev is None: return events if curr["price"] != prev["price"]: events.append(("price_change", prev["price"], curr["price"])) if curr["buy_box_seller"] != prev["buy_box_seller"]: events.append(("buybox_change", prev["buy_box_seller"], curr["buy_box_seller"])) if curr["in_stock"] != prev["in_stock"]: events.append(("stock_change", prev["in_stock"], curr["in_stock"])) return events

Each cycle the worker writes one normalized snapshot per watched product — the exact row your change-detection and price-history charts read:

{ "marketplace": "amazon", "id": "B0CHX1W1XY", "price": 189.99, "was_price": 249.00, "currency": "USD", "buy_box_seller": "Amazon.com", "in_stock": true, "fetched_at": "2026-06-05T08:30:00Z" }

The scheduler loop

Run the watchlist on a fixed cadence and bound parallelism so the whole batch finishes inside one interval without tripping rate limits.

from concurrent.futures import ThreadPoolExecutor watchlist = [ {"marketplace": "amazon", "country": "US", "id": "B0CHX1W1XY"}, {"marketplace": "ebay", "country": "US", "id": "category=123/itm/456"}, ] def run_cycle(store): with ThreadPoolExecutor(max_workers=8) as pool: # bound concurrency for snap in pool.map(fetch_price, watchlist): prev = store.latest(snap["marketplace"], snap["id"]) for event in detect_change(prev, snap): alert(snap, event) store.append(snap) # append-only history

Schedule run_cycle with cron (0 * * * * for hourly) or a managed scheduler. The append-only snapshots table becomes your price-history dataset for charts and analytics.

Why poll an API instead of scraping

Prices are the most aggressively protected field on a marketplace — geo-priced, client-rendered, and behind anti-bot challenges — so a DIY price scraper commonly sees block rates above 50% and silently drops SKUs. Repricing on a stale or incomplete feed erodes margin, and the anti-bot wall only gets higher as automated traffic grows (bots make up 42% of web traffic, most of it malicious, per Akamai ). ShopAPIS returns a clean, geo-correct price with the Buy Box owner attached, so your pipeline consumes signal instead of babysitting crawlers.

Production checklist

  • Idempotent snapshots — key on (marketplace, id, fetched_at); append, never overwrite.
  • Backoff on 429 — exponential, as shown; see the rate-limits guide.
  • Bound concurrency to your plan’s limit (see pricing).
  • Alert on diffs only — store every snapshot, but alert on price, buy_box_seller, availability changes.
Get your API key
Last updated on