This technical blog outlines a solution to sync user profiles and events from a MongoDB database to Mixpanel for analytics. Due to connectivity constraints, we implemented a Node.js script that periodically checks for updated or new data and forwards it to Mixpanel. Below, I detail the investigation, schema requirements, and implementation with sanitized code snippets using dummy database and collection names.
We first explored real-time data syncing:
The MongoDB collections must include:
We built a Node.js script that:
The script uses mongodb, node-cron, and fetch, with secrets managed via environment variables.
This script (e.g., index.js) handles data syncing and scheduling.
require("dotenv").config();
const { MongoClient } = require("mongodb");
const cron = require("node-cron");
const uri = process.env.MONGODB_URI;
async function processData() {
const client = new MongoClient(uri);
try {
await client.connect();
const db = client.db("app_db"); // Dummy DB name
console.log("Connected to MongoDB");
const fifteenMinutesAgo = new Date(Date.now() - 15 * 60 * 1000);
console.log(`Processing data since ${fifteenMinutesAgo.toISOString()}`);
// Process users
const users = await db
.collection("mixpanel_users")
.find({
$or: [
{ updated_at: { $gte: fifteenMinutesAgo } },
{ created_at: { $gte: fifteenMinutesAgo } },
],
})
.toArray();
for (const user of users) {
const rawProps = JSON.parse(user.properties || "{}");
const userProps = {
$name: rawProps.name || "",
$email: rawProps.email || "",
$created: rawProps.created_at || Math.floor(Date.now() / 1000),
$user_id: user.user_id || user.uuid || "",
};
await sendToMixpanelProfile(user.user_id || user.uuid, userProps);
}
// Process events
const events = await db
.collection("mixpanel_events")
.find({
$or: [
{ updated_at: { $gte: fifteenMinutesAgo } },
{ created_at: { $gte: fifteenMinutesAgo } },
],
})
.toArray();
for (const event of events) {
const props = JSON.parse(event.event_properties || "{}");
await sendToMixpanel(
event.event_name || "Unknown Event",
event.user_id || event.uuid,
props,
event.created_at || event.updated_at || Math.floor(Date.now() / 1000)
);
}
console.log(`Processed ${users.length} users and ${events.length} events`);
} catch (error) {
console.error("Error:", error);
} finally {
await client.close();
}
}
// Schedule every 15 minutes
cron.schedule("*/15 * * * *", () => {
console.log("Running cron at", new Date().toISOString());
processData();
});
// Run on startup
processData();
These functions send data to Mixpanel's EU endpoints.
async function sendToMixpanel(eventName, distinctId, props, time) {
const payload = {
event: eventName,
properties: {
token: process.env.MIXPANEL_PROJECT_TOKEN,
distinct_id: distinctId,
time: Math.floor(new Date(time).getTime() / 1000),
$insert_id: Buffer.from(`${distinctId}${eventName}${time}`).toString("base64"),
...props,
},
};
const response = await fetch("https://api-eu.mixpanel.com/track", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify([payload]),
});
const text = await response.text();
console.log(`Sent event: ${eventName} | Response: ${text}`);
}
async function sendToMixpanelProfile(distinctId, props) {
const payload = {
$token: process.env.MIXPANEL_PROJECT_TOKEN,
$distinct_id: distinctId,
$set: props || {},
$ip: "0",
};
const response = await fetch("https://api-eu.mixpanel.com/engage", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify([payload]),
});
const text = await response.text();
console.log(`Sent profile: ${distinctId} | Response: ${text}`);
}
This solution ensures reliable data syncing for analytics while adapting to database constraints.