Build a price-monitoring pipeline
A price-monitoring pipeline is a scheduled job that polls a watchlist of products on the ShopAPIS API, stores each price snapshot with a timestamp, and fires an alert when the price, Buy Box owner, or stock status changes. The architecture is four parts — a watchlist, a scheduler, a fetch-and-normalize worker, and a snapshot store with change detection — and because ShopAPIS returns one normalized schema, the same loop tracks Amazon, eBay, MercadoLibre and 70+ more without per-site code.
This guide builds the data layer behind the price monitoring solution. Need auth first? See getting started.
Architecture
watchlist (DB) → scheduler (cron) → worker → ShopAPIS /v1/product
│
▼
snapshots (DB) → change detection → alerts- Watchlist — the products to track:
(marketplace, country, id)rows. - Scheduler — a cron/queue that enqueues the watchlist on an interval (e.g. hourly).
- Worker — calls ShopAPIS per item, with backoff and concurrency control.
- Snapshot store — append-only rows of
price,was_price,buy_box_seller,availability,fetched_at. - Change detection — compares the latest snapshot to the previous one and emits events.
The worker
The worker is a single function: fetch, normalize, persist, diff. Concurrency is bounded so you stay inside your rate limit (see handling pagination & rate limits).
import time, requests
API = "https://api.shopapis.com/v1/product"
HEADERS = {"Authorization": "Bearer YOUR_API_KEY"}
def fetch_price(item):
for attempt in range(5):
r = requests.get(API, params=item, headers=HEADERS, timeout=30)
if r.status_code == 429: # rate limited → back off
time.sleep(2 ** attempt)
continue
r.raise_for_status()
p = r.json()
return {
"marketplace": p["marketplace"],
"id": item["id"],
"price": p["price"]["current"],
"was_price": p["price"].get("list"),
"currency": p["price"]["currency"],
"buy_box_seller": p.get("seller", {}).get("name"),
"in_stock": p["availability"]["in_stock"],
"fetched_at": p["fetched_at"],
}
raise RuntimeError("rate limited after retries")
def detect_change(prev, curr):
events = []
if prev is None:
return events
if curr["price"] != prev["price"]:
events.append(("price_change", prev["price"], curr["price"]))
if curr["buy_box_seller"] != prev["buy_box_seller"]:
events.append(("buybox_change", prev["buy_box_seller"], curr["buy_box_seller"]))
if curr["in_stock"] != prev["in_stock"]:
events.append(("stock_change", prev["in_stock"], curr["in_stock"]))
return eventsEach cycle the worker writes one normalized snapshot per watched product — the exact row your change-detection and price-history charts read:
{
"marketplace": "amazon",
"id": "B0CHX1W1XY",
"price": 189.99,
"was_price": 249.00,
"currency": "USD",
"buy_box_seller": "Amazon.com",
"in_stock": true,
"fetched_at": "2026-06-05T08:30:00Z"
}The scheduler loop
Run the watchlist on a fixed cadence and bound parallelism so the whole batch finishes inside one interval without tripping rate limits.
from concurrent.futures import ThreadPoolExecutor
watchlist = [
{"marketplace": "amazon", "country": "US", "id": "B0CHX1W1XY"},
{"marketplace": "ebay", "country": "US", "id": "category=123/itm/456"},
]
def run_cycle(store):
with ThreadPoolExecutor(max_workers=8) as pool: # bound concurrency
for snap in pool.map(fetch_price, watchlist):
prev = store.latest(snap["marketplace"], snap["id"])
for event in detect_change(prev, snap):
alert(snap, event)
store.append(snap) # append-only historySchedule run_cycle with cron (0 * * * * for hourly) or a managed scheduler. The append-only snapshots table becomes your price-history dataset for charts and analytics.
Why poll an API instead of scraping
Prices are the most aggressively protected field on a marketplace — geo-priced, client-rendered, and behind anti-bot challenges — so a DIY price scraper commonly sees block rates above 50% and silently drops SKUs. Repricing on a stale or incomplete feed erodes margin, and the anti-bot wall only gets higher as automated traffic grows (bots make up 42% of web traffic, most of it malicious, per Akamai ). ShopAPIS returns a clean, geo-correct price with the Buy Box owner attached, so your pipeline consumes signal instead of babysitting crawlers.
Production checklist
- Idempotent snapshots — key on
(marketplace, id, fetched_at); append, never overwrite. - Backoff on 429 — exponential, as shown; see the rate-limits guide.
- Bound concurrency to your plan’s limit (see pricing).
- Alert on diffs only — store every snapshot, but alert on
price,buy_box_seller,availabilitychanges.