From DS notebooks to realtime pipelines: diversity scores at scale
One of the recurring patterns in my work at Aampe is taking something a data scientist built in a notebook and figuring out how to make it work in a realtime distributed system. The algorithm is usually clever. The challenge is everything around it: latency, storage, idempotency, failure modes.
This is one of those stories. Our DS team built a diversity scoring system in BigQuery. My job was to make it run in a streaming pipeline that processes millions of contacts.
What problem are we solving?
Aampe sends personalized messages to users on behalf of our customers (think push notifications, emails, in-app messages). The system already picks what to say (the action group, the value proposition) but within that choice there are multiple ways to phrase it.
Say we picked "value prop = convenience" for a food delivery app. The alternates might be:
- "Order shoes from the comfort of your home"
- "Get shoes delivered to your doorstep"
- "Our huge selection has something for every taste. Why wait?"
If a user keeps seeing "from the comfort of your home" every other day, they'll tune out. Diversity scoring picks the alternate that is most different from what the user has recently seen.
The DS approach: trigram fingerprinting
The core idea from the data science team is elegant. Instead of comparing full strings (expensive, fragile), break every message into character-level trigrams and use those as fingerprints.
The message "Hello there" becomes:
hel, ell, llo, lo , o t, th, the, her, ere
For each contact, we maintain a profile of trigrams they've been exposed to, with recency-weighted scores. When we need to pick the next message, we score each candidate against the profile. Lower score = more diverse = better.
The scoring formula uses exponential decay with a 7-day half-life:
score = count * (0.5) ** (days_elapsed / 7)
A trigram seen yesterday carries a lot of penalty. One seen 3 weeks ago barely registers. After 30 days it's basically zero.
The notebook implemented this across six BigQuery queries, building temp tables for message structure, ngram profiles, weights, and final selection. It worked beautifully as a batch process.
Now make it realtime.
The key insight: rolling updates instead of lookback
The notebook approach (let's call it Approach 1) requires looking up the full history every time you want to score:
# Approach 1: Historical lookup
# For trigram "hel" on July 3rd, look back at all occurrences:
# - Seen 2x on July 1st: 2 * (0.5)^(2/7) = 1.64
# - Seen 1x on July 2nd: 1 * (0.5)^(1/7) = 0.91
# Total penalty: 1.64 + 0.91 = 2.55
This means maintaining per-contact, per-trigram, per-date history. For a contact who's been receiving messages for 30 days, that's a lot of rows to scan.
Approach 2 flips it. Instead of storing history and recomputing, we keep a single rolling score per trigram and update it on every message:
# Approach 2: Rolling update
# Start: score = 0
# July 1, first "hel": score = 0 * (0.5)^(0/7) + 1 = 1
# July 1, second "hel": score = 1 * (0.5)^(0/7) + 1 = 2
# July 2, see "hel": score = 2 * (0.5)^(1/7) + 1 = 2.82
#
# Scoring on July 3rd (no new occurrence, just decay):
# score = 2.82 * (0.5)^(1/7) = 2.57
The difference between approaches? On July 3rd: 2.57 vs 2.55. A delta of 0.02. On July 30th: 0.19 vs 0.195. A delta of 0.005.
Empirically equivalent, but the engineering implications are massive:
- No lookback needed. No BigQuery exports, no scanning 30 days of message history.
- One row per trigram per contact. Instead of
(contact, trigram, date, count)we store(trigram, score, last_seen). - Natural cleanup. If
last_seenis over 30 days ago, delete the row. Simple TTL.
Scoring a candidate message
With the rolling profile in place, scoring is straightforward:
- Take the candidate text, split into trigrams
- Hash each trigram (we use
FARM_FINGERPRINTstyle hashing for compact storage) - Look up each trigram hash in the contact's profile
- Sum the (decayed) scores of matching trigrams
- Lowest total score = most diverse candidate
def score_candidate(text: str, contact_profile: dict, now: datetime) -> float:
"""Score a candidate message against a contact's trigram profile.
Lower score = more diverse = better."""
trigrams = get_ngrams(text, n=3)
total = 0.0
for tri in trigrams:
h = hash_trigram(tri)
if h in contact_profile:
score, last_seen = contact_profile[h]
days = (now - last_seen).days
decayed = score * (0.5) ** (days / 7)
total += decayed
return total
A score of 0 means total novelty. Nothing in this candidate overlaps with what the contact has seen. We add a small random jitter as a tiebreaker for candidates that score identically (common when a contact is new and has no profile yet).
The storage question
Here's where it gets interesting from a systems perspective. Per contact we're storing roughly 1024 trigrams (the number varies, 95th percentile from the notebook showed ~2100 per 30-day window). Each trigram record is about 60 bytes:
| Field | Type | Size | |---|---|---| | trigram hash | int64 | 8-12 bytes | | timestamp | datetime | 8 bytes | | score | float | 16 bytes | | overhead | | ~20 bytes |
So per contact: ~60KB. At 5 million contacts: 300GB. At 10 million: 600GB.
We considered Bigtable first. It has native TTL (no app logic for expiry), efficient range scans on individual trigrams, and fast reads. But the only real advantage over Spanner for our case was TTL.
We went with Spanner instead, storing the entire trigram profile as a JSON column:
CREATE TABLE `{customer_id}.contact_ngram_profile` (
contact_id STRING(64) NOT NULL,
ngram_profile JSON,
updated_at TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true)
) PRIMARY KEY (contact_id)
The JSON blob holds all trigrams for a contact in one row. The tradeoff: we can't selectively fetch individual trigrams. But that's actually fine. When scoring, we need to check a candidate's trigrams against the full profile anyway. Loading the whole thing once and iterating in-memory is more efficient than making dozens of point lookups.
The cleanup tradeoff is that we can't rely on per-cell TTL like Bigtable offers. Instead, when we update a contact's profile (on every message sent), we also prune trigrams where last_seen exceeds 30 days. And we set a row-level TTL of 30 days so contacts who haven't received any messages get cleaned up automatically.
The update pipeline
The profile update piggybacks on existing infrastructure. We already publish a StagedMessage event to PubSub whenever a message is queued. A subscription routes to an /upkeep endpoint that handles message history tracking. We added ngram profile updates to the same flow:
/stage (message assignment)
|
v
PubSub --subscription--> /upkeep
|
+-- update message history (existing)
+-- update ngram profile (new)
|
+-- fetch existing profile
+-- extract trigrams from message
+-- decay existing scores
+-- add new trigram scores
+-- prune old trigrams
+-- write back
The update for each trigram follows the rolling formula:
def update_profile(profile: dict, message_text: str, now: datetime):
trigrams = get_ngrams(message_text, n=3)
for tri in trigrams:
h = hash_trigram(tri)
if h in profile:
old_score, last_seen = profile[h]
days = (now - last_seen).days
decayed = old_score * (0.5) ** (days / 7)
profile[h] = (decayed + 1, now)
else:
profile[h] = (1.0, now)
# Prune anything older than 30 days
cutoff = now - timedelta(days=30)
profile = {k: v for k, v in profile.items() if v[1] > cutoff}
return profile
The idempotency problem
This is the part that kept me up. PubSub guarantees at-least-once delivery. The upkeep endpoint might get called twice for the same message. If we naively run the update twice, we'd inflate the trigram scores for that message (counting it double).
We explored three options:
Option 1: Timestamp guard. Track last_message_timestamp on the profile. Reject any update where the message timestamp is older than what we've already processed. Simple, but makes selective backfill tricky since older messages get silently dropped.
Option 2: Separate route. Decouple ngram updates from message history entirely. Fewer failure modes that could trigger retries. But still doesn't handle the case where the DB write itself fails and we retry.
Option 3: Chain to history success. Only update the ngram profile if the message history write succeeds. This eliminates retries caused by history failures. Combined with the timestamp guard, it covers most cases.
We went with Option 3 for the short term with a timestamp-based idempotency check, and a plan to move to Option 2 (separate route) when we need backfill support.
The full selection flow
Zooming out, here's where diversity fits in the overall message assignment pipeline:
- Eligibility: Determine which formulas a contact qualifies for
- Action Set selection: Pick what type of content (value prop, greeting, CTA)
- Action Group selection: Pick the specific labels (convenience, trust, direct)
- Formula filtering: Find formulas where the selected action group applies
- Diversity scoring: For each formula's components, score all alternates against the contact's trigram profile. Pick the least-seen alternate for each component slot. Sum component scores to get a formula-level diversity score. Pick the formula with the lowest total.
The interface ended up clean:
def get_most_diverse_message_signature(
customer_id: str,
contact_id: str,
formula_details: list[FormulaDetails],
channel: str,
) -> DiverseMessageSignature:
...
class FormulaDetails:
formula_id: int
components: list[Component] # each has component_id + alternates
static_text: list[str]
class DiverseMessageSignature:
formula_id: int
components_selected: list[ComponentText] # component_id + chosen alternate
score: float
Feed it the eligible formulas with their alternates, get back the most diverse combination.
What I learned
The pattern of "notebook to pipeline" keeps repeating. Each time the technical translation is different, but the meta-lessons are consistent:
Batch computations hide complexity. In a notebook you can scan 30 days of history, cross-join everything, and let BigQuery handle it. In realtime you need incremental updates, and that forces you to find mathematical equivalences (like the rolling score) that preserve the semantics while changing the computation model.
Storage shapes your system. The choice between "many small rows" and "one big document" cascades into everything: how you update, how you query, how you clean up, how you handle failures. We picked the "big document" model not because it's generally better, but because it matched our access pattern.
The algorithm is the easy part. Trigram extraction, decay scoring, candidate ranking... maybe 50 lines of Python. The surrounding system (pub/sub flow, idempotency, backfill strategy, storage schema, TTL management) is where the real engineering lives.
Every time a DS teammate hands me a notebook, I know the math will be elegant. The fun part is figuring out what shape the system needs to be so that math can run a few million times a day without falling over.