This technical blog outlines a solution to sync user profiles and events from a MongoDB database to Mixpanel for analytics. Due to connectivity constraints, we implemented a Node.js script that periodically checks for updated or new data and forwards it to Mixpanel. Below, I detail the investigation, schema requirements, and implementation with sanitized code snippets using dummy database and collection names.
Investigation: Real-Time Feasibility
We first explored real-time data syncing:
- MongoDB Change Streams: We checked if the database supported change streams by running rs.status() in the MongoDB shell. The response indicated no replica set, ruling out real-time streaming.
- Fallback Approach: Without replica sets, we opted for a scheduled task to query recent data, relying on timestamps in the schema.
Schema Requirements
The MongoDB collections must include:
- created_at: ISODate timestamp for new records.
- updated_at: ISODate timestamp for updates. These fields enable filtering records modified within a specific time window (e.g., last 15 minutes).
- Collections:
- mixpanel_users: Stores user profiles (e.g., name, email, user_id, properties).
- mixpanel_events: Stores events (e.g., event_name, user_id, event_properties).
Solution Overview
We built a Node.js script that:
- Connects to MongoDB (dummy name: "app_db").
- Queries mixpanel_users and mixpanel_events for records with created_at or updated_at within the last 15 minutes.
- Sends user profiles to Mixpanel's People API.
- Sends events to Mixpanel's Events API.
- Runs every 15 minutes via a cron job and on startup for initial sync.
The script uses mongodb, node-cron, and fetch, with secrets managed via environment variables.

Implementation
Prerequisites
- Node.js 14+.
- Dependencies: npm install dotenv mongodb node-cron.
- Environment variables (e.g., in a secure vault, not .env for production):
- MONGODB_URI: MongoDB connection string (e.g., mongodb://user:pass@host:port/app_db).
- MIXPANEL_PROJECT_TOKEN: Mixpanel token.
- MIXPANEL_PROJECT_ID: Mixpanel project ID.
- MIXPANEL_SERVICE_USER and MIXPANEL_SERVICE_SECRET: For People API authentication.
Core Script
This script (e.g., index.js) handles data syncing and scheduling.
require("dotenv").config();
const { MongoClient } = require("mongodb");
const cron = require("node-cron");
const uri = process.env.MONGODB_URI;
async function processData() {
const client = new MongoClient(uri);
try {
await client.connect();
const db = client.db("app_db"); // Dummy DB name
console.log("Connected to MongoDB");
const fifteenMinutesAgo = new Date(Date.now() - 15 * 60 * 1000);
console.log(`Processing data since ${fifteenMinutesAgo.toISOString()}`);
// Process users
const users = await db
.collection("mixpanel_users")
.find({
$or: [
{ updated_at: { $gte: fifteenMinutesAgo } },
{ created_at: { $gte: fifteenMinutesAgo } },
],
})
.toArray();
for (const user of users) {
const rawProps = JSON.parse(user.properties || "{}");
const userProps = {
$name: rawProps.name || "",
$email: rawProps.email || "",
$created: rawProps.created_at || Math.floor(Date.now() / 1000),
$user_id: user.user_id || user.uuid || "",
};
await sendToMixpanelProfile(user.user_id || user.uuid, userProps);
}
// Process events
const events = await db
.collection("mixpanel_events")
.find({
$or: [
{ updated_at: { $gte: fifteenMinutesAgo } },
{ created_at: { $gte: fifteenMinutesAgo } },
],
})
.toArray();
for (const event of events) {
const props = JSON.parse(event.event_properties || "{}");
await sendToMixpanel(
event.event_name || "Unknown Event",
event.user_id || event.uuid,
props,
event.created_at || event.updated_at || Math.floor(Date.now() / 1000)
);
}
console.log(`Processed ${users.length} users and ${events.length} events`);
} catch (error) {
console.error("Error:", error);
} finally {
await client.close();
}
}
// Schedule every 15 minutes
cron.schedule("*/15 * * * *", () => {
console.log("Running cron at", new Date().toISOString());
processData();
});
// Run on startup
processData();
Mixpanel API Helpers
These functions send data to Mixpanel's EU endpoints.
async function sendToMixpanel(eventName, distinctId, props, time) {
const payload = {
event: eventName,
properties: {
token: process.env.MIXPANEL_PROJECT_TOKEN,
distinct_id: distinctId,
time: Math.floor(new Date(time).getTime() / 1000),
$insert_id: Buffer.from(`${distinctId}${eventName}${time}`).toString("base64"),
...props,
},
};
const response = await fetch("https://api-eu.mixpanel.com/track", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify([payload]),
});
const text = await response.text();
console.log(`Sent event: ${eventName} | Response: ${text}`);
}
async function sendToMixpanelProfile(distinctId, props) {
const payload = {
$token: process.env.MIXPANEL_PROJECT_TOKEN,
$distinct_id: distinctId,
$set: props || {},
$ip: "0",
};
const response = await fetch("https://api-eu.mixpanel.com/engage", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify([payload]),
});
const text = await response.text();
console.log(`Sent profile: ${distinctId} | Response: ${text}`);
}
Use Cases
- User Profiles: Syncs name, email, and creation time to Mixpanel People for user analytics.
- Events: Tracks actions (e.g., "Signed Up", "Clicked Button") with custom properties.
- Compliance: Uses Mixpanel's EU endpoint for data residency requirements.
- Reliability: Handles connectivity issues via scheduled runs and idempotent requests ($insert_id).
Enhancements
- Add retry logic for failed API calls.
- Monitor with alerts for errors.
- Scale by adjusting cron frequency or batch sizes.
This solution ensures reliable data syncing for analytics while adapting to database constraints.