Information

Syncing MongoDB Data to Mixpanel: A Scheduled Integration Approach

Sync MongoDB data to Mixpanel for analytics using a Node.js script and scheduled tasks to handle connectivity constraints and ensure reliable data syncing.


This technical blog outlines a solution to sync user profiles and events from a MongoDB database to Mixpanel for analytics. Due to connectivity constraints, we implemented a Node.js script that periodically checks for updated or new data and forwards it to Mixpanel. Below, I detail the investigation, schema requirements, and implementation with sanitized code snippets using dummy database and collection names.

Investigation: Real-Time Feasibility

We first explored real-time data syncing:

  • MongoDB Change Streams: We checked if the database supported change streams by running rs.status() in the MongoDB shell. The response indicated no replica set, ruling out real-time streaming.
  • Fallback Approach: Without replica sets, we opted for a scheduled task to query recent data, relying on timestamps in the schema.

Schema Requirements

The MongoDB collections must include:

  • created_at: ISODate timestamp for new records.
  • updated_at: ISODate timestamp for updates. These fields enable filtering records modified within a specific time window (e.g., last 15 minutes).
  • Collections:
    • mixpanel_users: Stores user profiles (e.g., name, email, user_id, properties).
    • mixpanel_events: Stores events (e.g., event_name, user_id, event_properties).

Solution Overview

We built a Node.js script that:

  1. Connects to MongoDB (dummy name: "app_db").
  2. Queries mixpanel_users and mixpanel_events for records with created_at or updated_at within the last 15 minutes.
  3. Sends user profiles to Mixpanel's People API.
  4. Sends events to Mixpanel's Events API.
  5. Runs every 15 minutes via a cron job and on startup for initial sync.

The script uses mongodb, node-cron, and fetch, with secrets managed via environment variables.

 

 

Screenshot 2025-08-30 at 12.25.32

Implementation

Prerequisites

  • Node.js 14+.
  • Dependencies: npm install dotenv mongodb node-cron.
  • Environment variables (e.g., in a secure vault, not .env for production):
    • MONGODB_URI: MongoDB connection string (e.g., mongodb://user:pass@host:port/app_db).
    • MIXPANEL_PROJECT_TOKEN: Mixpanel token.
    • MIXPANEL_PROJECT_ID: Mixpanel project ID.
    • MIXPANEL_SERVICE_USER and MIXPANEL_SERVICE_SECRET: For People API authentication.

Core Script

This script (e.g., index.js) handles data syncing and scheduling.

require("dotenv").config();

const { MongoClient } = require("mongodb");

const cron = require("node-cron");

 

const uri = process.env.MONGODB_URI;

 

async function processData() {

  const client = new MongoClient(uri);

  try {

    await client.connect();

    const db = client.db("app_db"); // Dummy DB name

    console.log("Connected to MongoDB");

 

    const fifteenMinutesAgo = new Date(Date.now() - 15 * 60 * 1000);

    console.log(`Processing data since ${fifteenMinutesAgo.toISOString()}`);

 

    // Process users

    const users = await db

      .collection("mixpanel_users")

      .find({

        $or: [

          { updated_at: { $gte: fifteenMinutesAgo } },

          { created_at: { $gte: fifteenMinutesAgo } },

        ],

      })

      .toArray();

 

    for (const user of users) {

      const rawProps = JSON.parse(user.properties || "{}");

      const userProps = {

        $name: rawProps.name || "",

        $email: rawProps.email || "",

        $created: rawProps.created_at || Math.floor(Date.now() / 1000),

        $user_id: user.user_id || user.uuid || "",

      };

      await sendToMixpanelProfile(user.user_id || user.uuid, userProps);

    }

 

    // Process events

    const events = await db

      .collection("mixpanel_events")

      .find({

        $or: [

          { updated_at: { $gte: fifteenMinutesAgo } },

          { created_at: { $gte: fifteenMinutesAgo } },

        ],

      })

      .toArray();

 

    for (const event of events) {

      const props = JSON.parse(event.event_properties || "{}");

      await sendToMixpanel(

        event.event_name || "Unknown Event",

        event.user_id || event.uuid,

        props,

        event.created_at || event.updated_at || Math.floor(Date.now() / 1000)

      );

    }

 

    console.log(`Processed ${users.length} users and ${events.length} events`);

  } catch (error) {

    console.error("Error:", error);

  } finally {

    await client.close();

  }

}

 

// Schedule every 15 minutes

cron.schedule("*/15 * * * *", () => {

  console.log("Running cron at", new Date().toISOString());

  processData();

});

 

// Run on startup

processData();



Mixpanel API Helpers

These functions send data to Mixpanel's EU endpoints.

 

async function sendToMixpanel(eventName, distinctId, props, time) {

  const payload = {

    event: eventName,

    properties: {

      token: process.env.MIXPANEL_PROJECT_TOKEN,

      distinct_id: distinctId,

      time: Math.floor(new Date(time).getTime() / 1000),

      $insert_id: Buffer.from(`${distinctId}${eventName}${time}`).toString("base64"),

      ...props,

    },

  };

 

  const response = await fetch("https://api-eu.mixpanel.com/track", {

    method: "POST",

    headers: { "Content-Type": "application/json" },

    body: JSON.stringify([payload]),

  });

 

  const text = await response.text();

  console.log(`Sent event: ${eventName} | Response: ${text}`);

}

 

async function sendToMixpanelProfile(distinctId, props) {

  const payload = {

    $token: process.env.MIXPANEL_PROJECT_TOKEN,

    $distinct_id: distinctId,

    $set: props || {},

    $ip: "0",

  };

 

  const response = await fetch("https://api-eu.mixpanel.com/engage", {

    method: "POST",

    headers: { "Content-Type": "application/json" },

    body: JSON.stringify([payload]),

  });

 

  const text = await response.text();

  console.log(`Sent profile: ${distinctId} | Response: ${text}`);

}

Use Cases

  • User Profiles: Syncs name, email, and creation time to Mixpanel People for user analytics.
  • Events: Tracks actions (e.g., "Signed Up", "Clicked Button") with custom properties.
  • Compliance: Uses Mixpanel's EU endpoint for data residency requirements.
  • Reliability: Handles connectivity issues via scheduled runs and idempotent requests ($insert_id).

Enhancements

  • Add retry logic for failed API calls.
  • Monitor with alerts for errors.
  • Scale by adjusting cron frequency or batch sizes.

This solution ensures reliable data syncing for analytics while adapting to database constraints.

Similar posts

Get notified on new marketing insights

Be the first to know about new B2B SaaS Marketing insights to build or refine your marketing function with the tools and knowledge of today’s industry.