Insights

Syncing MongoDB Data to Mixpanel: A Scheduled Integration Approach

Written by Aswin Kumar | Aug 30, 2025 7:09:42 AM

This technical blog outlines a solution to sync user profiles and events from a MongoDB database to Mixpanel for analytics. Due to connectivity constraints, we implemented a Node.js script that periodically checks for updated or new data and forwards it to Mixpanel. Below, I detail the investigation, schema requirements, and implementation with sanitized code snippets using dummy database and collection names.

Investigation: Real-Time Feasibility

We first explored real-time data syncing:

  • MongoDB Change Streams: We checked if the database supported change streams by running rs.status() in the MongoDB shell. The response indicated no replica set, ruling out real-time streaming.
  • Fallback Approach: Without replica sets, we opted for a scheduled task to query recent data, relying on timestamps in the schema.

Schema Requirements

The MongoDB collections must include:

  • created_at: ISODate timestamp for new records.
  • updated_at: ISODate timestamp for updates. These fields enable filtering records modified within a specific time window (e.g., last 15 minutes).
  • Collections:
    • mixpanel_users: Stores user profiles (e.g., name, email, user_id, properties).
    • mixpanel_events: Stores events (e.g., event_name, user_id, event_properties).

Solution Overview

We built a Node.js script that:

  1. Connects to MongoDB (dummy name: "app_db").
  2. Queries mixpanel_users and mixpanel_events for records with created_at or updated_at within the last 15 minutes.
  3. Sends user profiles to Mixpanel's People API.
  4. Sends events to Mixpanel's Events API.
  5. Runs every 15 minutes via a cron job and on startup for initial sync.

The script uses mongodb, node-cron, and fetch, with secrets managed via environment variables.

 

 

Implementation

Prerequisites

  • Node.js 14+.
  • Dependencies: npm install dotenv mongodb node-cron.
  • Environment variables (e.g., in a secure vault, not .env for production):
    • MONGODB_URI: MongoDB connection string (e.g., mongodb://user:pass@host:port/app_db).
    • MIXPANEL_PROJECT_TOKEN: Mixpanel token.
    • MIXPANEL_PROJECT_ID: Mixpanel project ID.
    • MIXPANEL_SERVICE_USER and MIXPANEL_SERVICE_SECRET: For People API authentication.

Core Script

This script (e.g., index.js) handles data syncing and scheduling.

require("dotenv").config();

const { MongoClient } = require("mongodb");

const cron = require("node-cron");

 

const uri = process.env.MONGODB_URI;

 

async function processData() {

  const client = new MongoClient(uri);

  try {

    await client.connect();

    const db = client.db("app_db"); // Dummy DB name

    console.log("Connected to MongoDB");

 

    const fifteenMinutesAgo = new Date(Date.now() - 15 * 60 * 1000);

    console.log(`Processing data since ${fifteenMinutesAgo.toISOString()}`);

 

    // Process users

    const users = await db

      .collection("mixpanel_users")

      .find({

        $or: [

          { updated_at: { $gte: fifteenMinutesAgo } },

          { created_at: { $gte: fifteenMinutesAgo } },

        ],

      })

      .toArray();

 

    for (const user of users) {

      const rawProps = JSON.parse(user.properties || "{}");

      const userProps = {

        $name: rawProps.name || "",

        $email: rawProps.email || "",

        $created: rawProps.created_at || Math.floor(Date.now() / 1000),

        $user_id: user.user_id || user.uuid || "",

      };

      await sendToMixpanelProfile(user.user_id || user.uuid, userProps);

    }

 

    // Process events

    const events = await db

      .collection("mixpanel_events")

      .find({

        $or: [

          { updated_at: { $gte: fifteenMinutesAgo } },

          { created_at: { $gte: fifteenMinutesAgo } },

        ],

      })

      .toArray();

 

    for (const event of events) {

      const props = JSON.parse(event.event_properties || "{}");

      await sendToMixpanel(

        event.event_name || "Unknown Event",

        event.user_id || event.uuid,

        props,

        event.created_at || event.updated_at || Math.floor(Date.now() / 1000)

      );

    }

 

    console.log(`Processed ${users.length} users and ${events.length} events`);

  } catch (error) {

    console.error("Error:", error);

  } finally {

    await client.close();

  }

}

 

// Schedule every 15 minutes

cron.schedule("*/15 * * * *", () => {

  console.log("Running cron at", new Date().toISOString());

  processData();

});

 

// Run on startup

processData();



Mixpanel API Helpers

These functions send data to Mixpanel's EU endpoints.

 

async function sendToMixpanel(eventName, distinctId, props, time) {

  const payload = {

    event: eventName,

    properties: {

      token: process.env.MIXPANEL_PROJECT_TOKEN,

      distinct_id: distinctId,

      time: Math.floor(new Date(time).getTime() / 1000),

      $insert_id: Buffer.from(`${distinctId}${eventName}${time}`).toString("base64"),

      ...props,

    },

  };

 

  const response = await fetch("https://api-eu.mixpanel.com/track", {

    method: "POST",

    headers: { "Content-Type": "application/json" },

    body: JSON.stringify([payload]),

  });

 

  const text = await response.text();

  console.log(`Sent event: ${eventName} | Response: ${text}`);

}

 

async function sendToMixpanelProfile(distinctId, props) {

  const payload = {

    $token: process.env.MIXPANEL_PROJECT_TOKEN,

    $distinct_id: distinctId,

    $set: props || {},

    $ip: "0",

  };

 

  const response = await fetch("https://api-eu.mixpanel.com/engage", {

    method: "POST",

    headers: { "Content-Type": "application/json" },

    body: JSON.stringify([payload]),

  });

 

  const text = await response.text();

  console.log(`Sent profile: ${distinctId} | Response: ${text}`);

}

Use Cases

  • User Profiles: Syncs name, email, and creation time to Mixpanel People for user analytics.
  • Events: Tracks actions (e.g., "Signed Up", "Clicked Button") with custom properties.
  • Compliance: Uses Mixpanel's EU endpoint for data residency requirements.
  • Reliability: Handles connectivity issues via scheduled runs and idempotent requests ($insert_id).

Enhancements

  • Add retry logic for failed API calls.
  • Monitor with alerts for errors.
  • Scale by adjusting cron frequency or batch sizes.

This solution ensures reliable data syncing for analytics while adapting to database constraints.