Tools
Building a Real-Time Data Pipeline from Shopify to Meta's Marketing API
2025-12-19
0 views
admin
Building a Real-Time Data Pipeline from Shopify to Meta’s Marketing API ## The Problem We’re Solving ## Architecture Overview ## Tech Stack ## Shopify Webhook Integration ## Customer Data Normalization ## Meta Marketing API Integration ## Custom Audience Sync ## Sending Purchase Events (Conversions API) ## Audience Segmentation ## Key Challenges & Solutions ## Rate Limiting ## Webhook Idempotency ## Results ## What I’d Do Differently ## Try It Out I spent the last few months building Audience+ — a tool that syncs Shopify customer data to Meta’s advertising platform in real time. Below is a clear, accessible technical breakdown of how it works, the challenges we solved, and concrete code patterns that may help if you’re building something similar. Meta’s browser-based tracking is fundamentally broken. The solution: send first-party customer and purchase data directly from Shopify to Meta using server-side APIs. Shopify sends webhooks for customer and order lifecycle events.
We verify each request using HMAC signatures before processing to ensure authenticity. Customer data is normalized and hashed to meet Meta’s requirements
(lowercase, trimmed, SHA-256). We integrate with two Meta APIs: Customers are automatically classified into segments that stay in sync with Meta. Meta enforces strict limits. We use exponential backoff retries. Prevents duplicate processing from retries or out-of-order delivery. Stores using Audience+ typically see: If you want this without building it yourself, check out
👉 https://www.audience-plus.com 10-minute setup. Fully automated. Have questions about the implementation? Drop them in the comments. Templates let you quickly answer FAQs or store snippets for re-use. Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink. Hide child comments as well For further actions, you may consider blocking this person and/or reporting abuse CODE_BLOCK:
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Shopify Store │──▶│ Audience+ API│──▶│ Meta API │
│ (Webhooks) │ │ (Processing) │ │ │
└───────────────┘ └───────────────┘ └───────────────┘ │ ▼ ┌───────────────┐ │ PostgreSQL │ │ (Customer DB) │ └───────────────┘ Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Shopify Store │──▶│ Audience+ API│──▶│ Meta API │
│ (Webhooks) │ │ (Processing) │ │ │
└───────────────┘ └───────────────┘ └───────────────┘ │ ▼ ┌───────────────┐ │ PostgreSQL │ │ (Customer DB) │ └───────────────┘ CODE_BLOCK:
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Shopify Store │──▶│ Audience+ API│──▶│ Meta API │
│ (Webhooks) │ │ (Processing) │ │ │
└───────────────┘ └───────────────┘ └───────────────┘ │ ▼ ┌───────────────┐ │ PostgreSQL │ │ (Customer DB) │ └───────────────┘ CODE_BLOCK:
// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto'; export async function POST(req: NextRequest) { const body = await req.text(); const hmac = req.headers.get('x-shopify-hmac-sha256'); if (!verifyShopifyWebhook(body, hmac)) { return NextResponse.json({ error: 'Invalid signature' }, { status: 401 }); } const topic = req.headers.get('x-shopify-topic'); const payload = JSON.parse(body); switch (topic) { case 'orders/create': await handleNewOrder(payload); break; case 'customers/create': await handleNewCustomer(payload); break; case 'customers/update': await handleCustomerUpdate(payload); break; } return NextResponse.json({ received: true });
} function verifyShopifyWebhook(body: string, hmac: string | null): boolean { if (!hmac) return false; const hash = crypto .createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!) .update(body, 'utf8') .digest('base64'); return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto'; export async function POST(req: NextRequest) { const body = await req.text(); const hmac = req.headers.get('x-shopify-hmac-sha256'); if (!verifyShopifyWebhook(body, hmac)) { return NextResponse.json({ error: 'Invalid signature' }, { status: 401 }); } const topic = req.headers.get('x-shopify-topic'); const payload = JSON.parse(body); switch (topic) { case 'orders/create': await handleNewOrder(payload); break; case 'customers/create': await handleNewCustomer(payload); break; case 'customers/update': await handleCustomerUpdate(payload); break; } return NextResponse.json({ received: true });
} function verifyShopifyWebhook(body: string, hmac: string | null): boolean { if (!hmac) return false; const hash = crypto .createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!) .update(body, 'utf8') .digest('base64'); return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
} CODE_BLOCK:
// app/api/webhooks/shopify/route.ts
import { NextRequest, NextResponse } from 'next/server';
import crypto from 'crypto'; export async function POST(req: NextRequest) { const body = await req.text(); const hmac = req.headers.get('x-shopify-hmac-sha256'); if (!verifyShopifyWebhook(body, hmac)) { return NextResponse.json({ error: 'Invalid signature' }, { status: 401 }); } const topic = req.headers.get('x-shopify-topic'); const payload = JSON.parse(body); switch (topic) { case 'orders/create': await handleNewOrder(payload); break; case 'customers/create': await handleNewCustomer(payload); break; case 'customers/update': await handleCustomerUpdate(payload); break; } return NextResponse.json({ received: true });
} function verifyShopifyWebhook(body: string, hmac: string | null): boolean { if (!hmac) return false; const hash = crypto .createHmac('sha256', process.env.SHOPIFY_WEBHOOK_SECRET!) .update(body, 'utf8') .digest('base64'); return crypto.timingSafeEqual(Buffer.from(hash), Buffer.from(hmac));
} CODE_BLOCK:
// lib/customer-processor.ts
interface ShopifyCustomer { id: number; email: string; phone?: string; first_name?: string; last_name?: string; orders_count: number; total_spent: string; created_at: string;
} interface MetaUserData { em?: string; ph?: string; fn?: string; ln?: string; external_id?: string;
} function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData { const userData: MetaUserData = {}; if (customer.email) { userData.em = hashForMeta(customer.email.toLowerCase().trim()); } if (customer.phone) { userData.ph = hashForMeta(normalizePhone(customer.phone)); } if (customer.first_name) { userData.fn = hashForMeta(customer.first_name.toLowerCase().trim()); } if (customer.last_name) { userData.ln = hashForMeta(customer.last_name.toLowerCase().trim()); } userData.external_id = hashForMeta(customer.id.toString()); return userData;
} function hashForMeta(value: string): string { return crypto.createHash('sha256').update(value).digest('hex');
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
// lib/customer-processor.ts
interface ShopifyCustomer { id: number; email: string; phone?: string; first_name?: string; last_name?: string; orders_count: number; total_spent: string; created_at: string;
} interface MetaUserData { em?: string; ph?: string; fn?: string; ln?: string; external_id?: string;
} function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData { const userData: MetaUserData = {}; if (customer.email) { userData.em = hashForMeta(customer.email.toLowerCase().trim()); } if (customer.phone) { userData.ph = hashForMeta(normalizePhone(customer.phone)); } if (customer.first_name) { userData.fn = hashForMeta(customer.first_name.toLowerCase().trim()); } if (customer.last_name) { userData.ln = hashForMeta(customer.last_name.toLowerCase().trim()); } userData.external_id = hashForMeta(customer.id.toString()); return userData;
} function hashForMeta(value: string): string { return crypto.createHash('sha256').update(value).digest('hex');
} CODE_BLOCK:
// lib/customer-processor.ts
interface ShopifyCustomer { id: number; email: string; phone?: string; first_name?: string; last_name?: string; orders_count: number; total_spent: string; created_at: string;
} interface MetaUserData { em?: string; ph?: string; fn?: string; ln?: string; external_id?: string;
} function processCustomerForMeta(customer: ShopifyCustomer): MetaUserData { const userData: MetaUserData = {}; if (customer.email) { userData.em = hashForMeta(customer.email.toLowerCase().trim()); } if (customer.phone) { userData.ph = hashForMeta(normalizePhone(customer.phone)); } if (customer.first_name) { userData.fn = hashForMeta(customer.first_name.toLowerCase().trim()); } if (customer.last_name) { userData.ln = hashForMeta(customer.last_name.toLowerCase().trim()); } userData.external_id = hashForMeta(customer.id.toString()); return userData;
} function hashForMeta(value: string): string { return crypto.createHash('sha256').update(value).digest('hex');
} COMMAND_BLOCK:
// lib/meta-audience-sync.ts
const META_API_VERSION = 'v18.0'; async function addUsersToAudience( audienceId: string, users: AudienceUser[], accessToken: string
): Promise<void> { const BATCH_SIZE = 10_000; const batches = chunk(users, BATCH_SIZE); for (const batch of batches) { const payload = { schema: ['EMAIL', 'PHONE', 'FN', 'LN'], data: batch.map(user => [ user.email ? hashForMeta(user.email.toLowerCase()) : '', user.phone ? hashForMeta(normalizePhone(user.phone)) : '', user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '', user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '', ]), }; const response = await fetch( `https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ payload, access_token: accessToken }), } ); if (!response.ok) { throw new Error(`Meta API error`); } await sleep(1000); }
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
// lib/meta-audience-sync.ts
const META_API_VERSION = 'v18.0'; async function addUsersToAudience( audienceId: string, users: AudienceUser[], accessToken: string
): Promise<void> { const BATCH_SIZE = 10_000; const batches = chunk(users, BATCH_SIZE); for (const batch of batches) { const payload = { schema: ['EMAIL', 'PHONE', 'FN', 'LN'], data: batch.map(user => [ user.email ? hashForMeta(user.email.toLowerCase()) : '', user.phone ? hashForMeta(normalizePhone(user.phone)) : '', user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '', user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '', ]), }; const response = await fetch( `https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ payload, access_token: accessToken }), } ); if (!response.ok) { throw new Error(`Meta API error`); } await sleep(1000); }
} COMMAND_BLOCK:
// lib/meta-audience-sync.ts
const META_API_VERSION = 'v18.0'; async function addUsersToAudience( audienceId: string, users: AudienceUser[], accessToken: string
): Promise<void> { const BATCH_SIZE = 10_000; const batches = chunk(users, BATCH_SIZE); for (const batch of batches) { const payload = { schema: ['EMAIL', 'PHONE', 'FN', 'LN'], data: batch.map(user => [ user.email ? hashForMeta(user.email.toLowerCase()) : '', user.phone ? hashForMeta(normalizePhone(user.phone)) : '', user.firstName ? hashForMeta(user.firstName.toLowerCase()) : '', user.lastName ? hashForMeta(user.lastName.toLowerCase()) : '', ]), }; const response = await fetch( `https://graph.facebook.com/${META_API_VERSION}/${audienceId}/users`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ payload, access_token: accessToken }), } ); if (!response.ok) { throw new Error(`Meta API error`); } await sleep(1000); }
} COMMAND_BLOCK:
// lib/meta-conversions.ts
async function sendPurchaseEvent( pixelId: string, customer: ProcessedCustomer, order: ShopifyOrder, accessToken: string
) { const event = { event_name: 'Purchase', event_time: Math.floor(new Date(order.created_at).getTime() / 1000), event_id: `order_${order.id}`, action_source: 'website', user_data: { em: customer.hashedEmail, ph: customer.hashedPhone, client_ip_address: order.client_details?.browser_ip, client_user_agent: order.client_details?.user_agent, }, custom_data: { value: parseFloat(order.total_price), currency: order.currency, content_ids: order.line_items.map(i => i.product_id.toString()), content_type: 'product', num_items: order.line_items.reduce((s, i) => s + i.quantity, 0), }, }; await fetch( `https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ data: [event], access_token: accessToken }), } );
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
// lib/meta-conversions.ts
async function sendPurchaseEvent( pixelId: string, customer: ProcessedCustomer, order: ShopifyOrder, accessToken: string
) { const event = { event_name: 'Purchase', event_time: Math.floor(new Date(order.created_at).getTime() / 1000), event_id: `order_${order.id}`, action_source: 'website', user_data: { em: customer.hashedEmail, ph: customer.hashedPhone, client_ip_address: order.client_details?.browser_ip, client_user_agent: order.client_details?.user_agent, }, custom_data: { value: parseFloat(order.total_price), currency: order.currency, content_ids: order.line_items.map(i => i.product_id.toString()), content_type: 'product', num_items: order.line_items.reduce((s, i) => s + i.quantity, 0), }, }; await fetch( `https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ data: [event], access_token: accessToken }), } );
} COMMAND_BLOCK:
// lib/meta-conversions.ts
async function sendPurchaseEvent( pixelId: string, customer: ProcessedCustomer, order: ShopifyOrder, accessToken: string
) { const event = { event_name: 'Purchase', event_time: Math.floor(new Date(order.created_at).getTime() / 1000), event_id: `order_${order.id}`, action_source: 'website', user_data: { em: customer.hashedEmail, ph: customer.hashedPhone, client_ip_address: order.client_details?.browser_ip, client_user_agent: order.client_details?.user_agent, }, custom_data: { value: parseFloat(order.total_price), currency: order.currency, content_ids: order.line_items.map(i => i.product_id.toString()), content_type: 'product', num_items: order.line_items.reduce((s, i) => s + i.quantity, 0), }, }; await fetch( `https://graph.facebook.com/${META_API_VERSION}/${pixelId}/events`, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ data: [event], access_token: accessToken }), } );
} CODE_BLOCK:
enum CustomerSegment { NEW = 'new', ENGAGED = 'engaged', EXISTING = 'existing',
} function classifyCustomer(customer: CustomerWithOrders): CustomerSegment { if (customer.orders_count === 0) return CustomerSegment.ENGAGED; if (customer.orders_count >= 1) return CustomerSegment.EXISTING; return CustomerSegment.NEW;
} Enter fullscreen mode Exit fullscreen mode CODE_BLOCK:
enum CustomerSegment { NEW = 'new', ENGAGED = 'engaged', EXISTING = 'existing',
} function classifyCustomer(customer: CustomerWithOrders): CustomerSegment { if (customer.orders_count === 0) return CustomerSegment.ENGAGED; if (customer.orders_count >= 1) return CustomerSegment.EXISTING; return CustomerSegment.NEW;
} CODE_BLOCK:
enum CustomerSegment { NEW = 'new', ENGAGED = 'engaged', EXISTING = 'existing',
} function classifyCustomer(customer: CustomerWithOrders): CustomerSegment { if (customer.orders_count === 0) return CustomerSegment.ENGAGED; if (customer.orders_count >= 1) return CustomerSegment.EXISTING; return CustomerSegment.NEW;
} COMMAND_BLOCK:
async function withRetry<T>( fn: () => Promise<T>, maxRetries = 3
): Promise<T> { let lastError: Error; for (let i = 0; i < maxRetries; i++) { try { return await fn(); } catch (error) { lastError = error as Error; await sleep(2 ** i * 1000); } } throw lastError!;
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
async function withRetry<T>( fn: () => Promise<T>, maxRetries = 3
): Promise<T> { let lastError: Error; for (let i = 0; i < maxRetries; i++) { try { return await fn(); } catch (error) { lastError = error as Error; await sleep(2 ** i * 1000); } } throw lastError!;
} COMMAND_BLOCK:
async function withRetry<T>( fn: () => Promise<T>, maxRetries = 3
): Promise<T> { let lastError: Error; for (let i = 0; i < maxRetries; i++) { try { return await fn(); } catch (error) { lastError = error as Error; await sleep(2 ** i * 1000); } } throw lastError!;
} COMMAND_BLOCK:
async function handleWebhookIdempotently( webhookId: string, handler: () => Promise<void>
) { const existing = await prisma.processedWebhook.findUnique({ where: { id: webhookId }, }); if (existing) return; await handler(); await prisma.processedWebhook.create({ data: { id: webhookId, processedAt: new Date() }, });
} Enter fullscreen mode Exit fullscreen mode COMMAND_BLOCK:
async function handleWebhookIdempotently( webhookId: string, handler: () => Promise<void>
) { const existing = await prisma.processedWebhook.findUnique({ where: { id: webhookId }, }); if (existing) return; await handler(); await prisma.processedWebhook.create({ data: { id: webhookId, processedAt: new Date() }, });
} COMMAND_BLOCK:
async function handleWebhookIdempotently( webhookId: string, handler: () => Promise<void>
) { const existing = await prisma.processedWebhook.findUnique({ where: { id: webhookId }, }); if (existing) return; await handler(); await prisma.processedWebhook.create({ data: { id: webhookId, processedAt: new Date() }, });
} - iOS 14.5 App Tracking Transparency hides ~75% of iPhone users
- The Meta Pixel only retains data for 180 days
- Meta optimizes on 30–40% of real conversion data for most stores - Framework: Next.js 15 (App Router)
- Language: TypeScript
- API Layer: tRPC
- Database: PostgreSQL (Neon serverless)
- ORM: Prisma
- Authentication: Better-Auth + Shopify OAuth
- Hosting: Vercel - Custom Audiences API — for syncing customer lists
- Conversions API — for real-time server-side events - 50–100% more conversions visible to Meta
- 10–20% ROAS improvement
- Correct exclusions and retargeting for the first time - Start with the Conversions API first
- Add monitoring earlier
- Use queues from day one instead of synchronous processing
how-totutorialguidedev.toaiserverrouterswitchpostgresqldatabase