Skip to main content

Overview

While Supabase is the recommended storage provider, you can create custom adapters for any database system. This guide shows how to implement the IStorageAdapter interface for your preferred database.

IStorageAdapter Interface

All storage adapters must implement this interface:
interface IStorageAdapter {
  // Connection management
  connect(): Promise<void>;
  disconnect(): Promise<void>;
  isConnected(): boolean;

  // Message operations
  saveMessage(message: StorageMessage): Promise<void>;
  getMessage(messageId: string): Promise<StorageMessage | null>;
  updateMessageStatus(messageId: string, status: MessageStatus): Promise<void>;
  deleteMessage(messageId: string): Promise<void>;

  // Conversation operations
  getConversation(
    phoneNumber: string,
    options?: ConversationQueryOptions
  ): Promise<ConversationResult>;

  // Search operations
  searchMessages(options: SearchOptions): Promise<SearchResult>;

  // Thread operations
  getMessageThread(messageId: string): Promise<MessageThread | null>;

  // Analytics
  getConversationAnalytics(
    phoneNumber: string,
    options?: AnalyticsOptions
  ): Promise<ConversationAnalytics>;

  // Export
  exportConversation(
    phoneNumber: string,
    options?: ExportOptions
  ): Promise<ExportResult>;

  // Cleanup
  cleanupOldMessages(retentionDays: number): Promise<number>;
}

Example: MongoDB Adapter

Here’s a complete example of a MongoDB storage adapter:
import { MongoClient, Db, Collection } from 'mongodb';
import {
  IStorageAdapter,
  StorageMessage,
  ConversationResult,
  SearchResult,
  MessageThread,
  ConversationAnalytics,
  ExportResult,
  MessageStatus
} from 'whatsapp-client-sdk';

export class MongoDBAdapter implements IStorageAdapter {
  private client: MongoClient;
  private db: Db;
  private messagesCollection: Collection;
  private conversationsCollection: Collection;
  private connected = false;

  constructor(private config: {
    url: string;
    database: string;
    messagesCollection?: string;
    conversationsCollection?: string;
  }) {
    this.client = new MongoClient(config.url);
  }

  async connect(): Promise<void> {
    if (this.connected) return;

    await this.client.connect();
    this.db = this.client.db(this.config.database);

    this.messagesCollection = this.db.collection(
      this.config.messagesCollection || 'whatsapp_messages'
    );

    this.conversationsCollection = this.db.collection(
      this.config.conversationsCollection || 'whatsapp_conversations'
    );

    // Create indexes
    await this.createIndexes();

    this.connected = true;
    console.log('✅ MongoDB connected');
  }

  async disconnect(): Promise<void> {
    if (!this.connected) return;

    await this.client.close();
    this.connected = false;
    console.log('✅ MongoDB disconnected');
  }

  isConnected(): boolean {
    return this.connected;
  }

  private async createIndexes(): Promise<void> {
    // Message indexes
    await this.messagesCollection.createIndex({ whatsappMessageId: 1 }, { unique: true });
    await this.messagesCollection.createIndex({ fromPhone: 1 });
    await this.messagesCollection.createIndex({ toPhone: 1 });
    await this.messagesCollection.createIndex({ timestamp: -1 });
    await this.messagesCollection.createIndex({ status: 1 });
    await this.messagesCollection.createIndex({ conversationId: 1 });

    // Full-text search index
    await this.messagesCollection.createIndex({ 'content.text': 'text' });

    // Conversation indexes
    await this.conversationsCollection.createIndex({ phoneNumber: 1 }, { unique: true });
    await this.conversationsCollection.createIndex({ lastMessageAt: -1 });
  }

  async saveMessage(message: StorageMessage): Promise<void> {
    // Upsert conversation
    await this.conversationsCollection.updateOne(
      { phoneNumber: message.fromPhone },
      {
        $set: {
          phoneNumber: message.fromPhone,
          businessPhoneId: message.phoneNumberId,
          lastMessageAt: message.timestamp,
          updatedAt: new Date()
        },
        $inc: { messageCount: 1 },
        $setOnInsert: {
          createdAt: new Date(),
          unreadCount: message.direction === 'incoming' ? 1 : 0,
          metadata: {}
        }
      },
      { upsert: true }
    );

    // Insert message
    await this.messagesCollection.insertOne({
      ...message,
      createdAt: new Date(),
      updatedAt: new Date()
    });
  }

  async getMessage(messageId: string): Promise<StorageMessage | null> {
    const message = await this.messagesCollection.findOne({
      whatsappMessageId: messageId
    });

    return message as StorageMessage | null;
  }

  async updateMessageStatus(messageId: string, status: MessageStatus): Promise<void> {
    await this.messagesCollection.updateOne(
      { whatsappMessageId: messageId },
      {
        $set: {
          status,
          updatedAt: new Date()
        }
      }
    );
  }

  async deleteMessage(messageId: string): Promise<void> {
    await this.messagesCollection.deleteOne({
      whatsappMessageId: messageId
    });
  }

  async getConversation(
    phoneNumber: string,
    options?: {
      limit?: number;
      offset?: number;
      dateFrom?: Date;
      dateTo?: Date;
    }
  ): Promise<ConversationResult> {
    const query: any = {
      $or: [
        { fromPhone: phoneNumber },
        { toPhone: phoneNumber }
      ]
    };

    if (options?.dateFrom || options?.dateTo) {
      query.timestamp = {};
      if (options.dateFrom) query.timestamp.$gte = options.dateFrom;
      if (options.dateTo) query.timestamp.$lte = options.dateTo;
    }

    const [messages, total] = await Promise.all([
      this.messagesCollection
        .find(query)
        .sort({ timestamp: -1 })
        .skip(options?.offset || 0)
        .limit(options?.limit || 50)
        .toArray(),
      this.messagesCollection.countDocuments(query)
    ]);

    return {
      phoneNumber,
      totalMessages: total,
      messages: messages as any[]
    };
  }

  async searchMessages(options: {
    text: string;
    phoneNumber?: string;
    messageType?: string;
    dateFrom?: Date;
    dateTo?: Date;
    limit?: number;
    offset?: number;
  }): Promise<SearchResult> {
    const query: any = {};

    // Text search
    if (options.text) {
      query.$text = { $search: options.text };
    }

    // Phone number filter
    if (options.phoneNumber) {
      query.$or = [
        { fromPhone: options.phoneNumber },
        { toPhone: options.phoneNumber }
      ];
    }

    // Message type filter
    if (options.messageType) {
      query.messageType = options.messageType;
    }

    // Date range filter
    if (options.dateFrom || options.dateTo) {
      query.timestamp = {};
      if (options.dateFrom) query.timestamp.$gte = options.dateFrom;
      if (options.dateTo) query.timestamp.$lte = options.dateTo;
    }

    const [messages, total] = await Promise.all([
      this.messagesCollection
        .find(query)
        .sort({ timestamp: -1 })
        .skip(options?.offset || 0)
        .limit(options?.limit || 20)
        .toArray(),
      this.messagesCollection.countDocuments(query)
    ]);

    return {
      total,
      messages: messages as any[]
    };
  }

  async getMessageThread(messageId: string): Promise<MessageThread | null> {
    const originalMessage = await this.getMessage(messageId);

    if (!originalMessage) {
      return null;
    }

    const replies = await this.messagesCollection
      .find({ replyToMessageId: messageId })
      .sort({ timestamp: 1 })
      .toArray();

    return {
      originalMessage: originalMessage as any,
      replies: replies as any[]
    };
  }

  async getConversationAnalytics(
    phoneNumber: string,
    options?: {
      from?: Date;
      to?: Date;
    }
  ): Promise<ConversationAnalytics> {
    const query: any = {
      $or: [
        { fromPhone: phoneNumber },
        { toPhone: phoneNumber }
      ]
    };

    if (options?.from || options?.to) {
      query.timestamp = {};
      if (options.from) query.timestamp.$gte = options.from;
      if (options.to) query.timestamp.$lte = options.to;
    }

    const messages = await this.messagesCollection
      .find(query)
      .sort({ timestamp: 1 })
      .toArray();

    const incoming = messages.filter(m => m.direction === 'incoming');
    const outgoing = messages.filter(m => m.direction === 'outgoing');

    // Calculate average response time
    let totalResponseTime = 0;
    let responseCount = 0;

    for (let i = 0; i < incoming.length; i++) {
      const incomingMsg = incoming[i];
      const nextOutgoing = outgoing.find(
        m => m.timestamp > incomingMsg.timestamp
      );

      if (nextOutgoing) {
        totalResponseTime += nextOutgoing.timestamp - incomingMsg.timestamp;
        responseCount++;
      }
    }

    const avgResponseTime = responseCount > 0
      ? totalResponseTime / responseCount
      : 0;

    // Message types distribution
    const messagesByType = messages.reduce((acc, msg) => {
      acc[msg.messageType] = (acc[msg.messageType] || 0) + 1;
      return acc;
    }, {} as Record<string, number>);

    return {
      phoneNumber,
      totalMessages: messages.length,
      incomingMessages: incoming.length,
      outgoingMessages: outgoing.length,
      firstMessageAt: messages[0]?.timestamp || new Date(),
      lastMessageAt: messages[messages.length - 1]?.timestamp || new Date(),
      averageResponseTime: avgResponseTime,
      responseRate: (responseCount / incoming.length) * 100 || 0,
      messagesByType,
      messagesByDay: [] // Implement grouping by day if needed
    };
  }

  async exportConversation(
    phoneNumber: string,
    options?: {
      format?: 'json' | 'csv';
      dateFrom?: Date;
      dateTo?: Date;
    }
  ): Promise<ExportResult> {
    const conversation = await this.getConversation(phoneNumber, {
      dateFrom: options?.dateFrom,
      dateTo: options?.dateTo,
      limit: 10000 // Large limit for export
    });

    if (options?.format === 'csv') {
      const csv = this.convertToCSV(conversation.messages);
      return {
        data: csv,
        messageCount: conversation.messages.length,
        format: 'csv'
      };
    }

    return {
      data: conversation.messages,
      messageCount: conversation.messages.length,
      format: 'json'
    };
  }

  private convertToCSV(messages: any[]): string {
    const headers = ['Timestamp', 'From', 'To', 'Type', 'Direction', 'Status', 'Content'];
    const rows = messages.map(msg => [
      msg.timestamp.toISOString(),
      msg.fromPhone,
      msg.toPhone,
      msg.messageType,
      msg.direction,
      msg.status,
      msg.content.text || JSON.stringify(msg.content)
    ]);

    return [
      headers.join(','),
      ...rows.map(row => row.map(cell => `"${cell}"`).join(','))
    ].join('\n');
  }

  async cleanupOldMessages(retentionDays: number): Promise<number> {
    const cutoffDate = new Date();
    cutoffDate.setDate(cutoffDate.getDate() - retentionDays);

    const result = await this.messagesCollection.deleteMany({
      timestamp: { $lt: cutoffDate }
    });

    return result.deletedCount || 0;
  }
}

Usage

import { WhatsAppClient } from 'whatsapp-client-sdk';
import { MongoDBAdapter } from './mongodb-adapter';

const mongoAdapter = new MongoDBAdapter({
  url: process.env.MONGODB_URL!,
  database: 'whatsapp',
  messagesCollection: 'messages',
  conversationsCollection: 'conversations'
});

const client = new WhatsAppClient({
  accessToken: process.env.WHATSAPP_ACCESS_TOKEN!,
  phoneNumberId: process.env.WHATSAPP_PHONE_NUMBER_ID!,
  storage: {
    enabled: true,
    provider: 'custom',
    options: {
      customAdapter: mongoAdapter
    },
    features: {
      persistIncoming: true,
      persistOutgoing: true,
      persistStatus: true
    }
  }
});

await client.initializeStorage();

Example: PostgreSQL Adapter

Using pg library for PostgreSQL:
import { Pool } from 'pg';
import { IStorageAdapter, StorageMessage } from 'whatsapp-client-sdk';

export class PostgreSQLAdapter implements IStorageAdapter {
  private pool: Pool;
  private connected = false;

  constructor(config: {
    host: string;
    port: number;
    database: string;
    user: string;
    password: string;
  }) {
    this.pool = new Pool(config);
  }

  async connect(): Promise<void> {
    if (this.connected) return;

    // Test connection
    await this.pool.query('SELECT NOW()');

    // Create tables
    await this.createTables();

    this.connected = true;
    console.log('✅ PostgreSQL connected');
  }

  async disconnect(): Promise<void> {
    await this.pool.end();
    this.connected = false;
  }

  isConnected(): boolean {
    return this.connected;
  }

  private async createTables(): Promise<void> {
    await this.pool.query(`
      CREATE TABLE IF NOT EXISTS whatsapp_conversations (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        phone_number VARCHAR(50) NOT NULL UNIQUE,
        business_phone_id VARCHAR(50),
        last_message_at TIMESTAMPTZ DEFAULT NOW(),
        message_count INTEGER DEFAULT 0,
        unread_count INTEGER DEFAULT 0,
        metadata JSONB DEFAULT '{}',
        created_at TIMESTAMPTZ DEFAULT NOW(),
        updated_at TIMESTAMPTZ DEFAULT NOW()
      )
    `);

    await this.pool.query(`
      CREATE TABLE IF NOT EXISTS whatsapp_messages (
        id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
        whatsapp_message_id VARCHAR(255) UNIQUE NOT NULL,
        conversation_id UUID REFERENCES whatsapp_conversations(id),
        phone_number_id VARCHAR(50),
        from_phone VARCHAR(50),
        to_phone VARCHAR(50),
        message_type VARCHAR(50),
        content JSONB,
        reply_to_message_id UUID,
        timestamp TIMESTAMPTZ,
        status VARCHAR(50),
        direction VARCHAR(20),
        metadata JSONB DEFAULT '{}',
        created_at TIMESTAMPTZ DEFAULT NOW(),
        updated_at TIMESTAMPTZ DEFAULT NOW()
      )
    `);

    // Create indexes
    await this.pool.query(`
      CREATE INDEX IF NOT EXISTS idx_messages_whatsapp_id
      ON whatsapp_messages(whatsapp_message_id)
    `);
  }

  async saveMessage(message: StorageMessage): Promise<void> {
    await this.pool.query(
      `INSERT INTO whatsapp_messages (
        whatsapp_message_id, phone_number_id, from_phone, to_phone,
        message_type, content, timestamp, status, direction
      ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
      [
        message.whatsappMessageId,
        message.phoneNumberId,
        message.fromPhone,
        message.toPhone,
        message.messageType,
        JSON.stringify(message.content),
        message.timestamp,
        message.status,
        message.direction
      ]
    );
  }

  // Implement other methods...
}

Message Transformers

Transform messages before storing them in the database:
import { MessageTransformer, StorageMessage } from 'whatsapp-client-sdk';

// Example: Anonymize phone numbers
const anonymizeTransformer: MessageTransformer = {
  name: 'anonymize',
  transform: async (message: StorageMessage): Promise<StorageMessage> => {
    return {
      ...message,
      fromPhone: message.fromPhone.substring(0, 5) + 'XXXXX',
      toPhone: message.toPhone.substring(0, 5) + 'XXXXX'
    };
  }
};

// Example: Encrypt message content
const encryptTransformer: MessageTransformer = {
  name: 'encrypt',
  transform: async (message: StorageMessage): Promise<StorageMessage> => {
    const crypto = await import('crypto');

    const encrypted = crypto.createCipher('aes-256-cbc', process.env.ENCRYPTION_KEY!)
      .update(JSON.stringify(message.content), 'utf8', 'hex');

    return {
      ...message,
      content: { encrypted }
    };
  }
};

// Example: Add custom metadata
const metadataTransformer: MessageTransformer = {
  name: 'metadata',
  transform: async (message: StorageMessage): Promise<StorageMessage> => {
    return {
      ...message,
      metadata: {
        ...message.metadata,
        processedAt: new Date().toISOString(),
        server: process.env.SERVER_ID,
        version: '1.0.0'
      }
    };
  }
};

// Use transformers
const client = new WhatsAppClient({
  accessToken: process.env.WHATSAPP_ACCESS_TOKEN!,
  phoneNumberId: process.env.WHATSAPP_PHONE_NUMBER_ID!,
  storage: {
    enabled: true,
    provider: 'supabase',
    options: { /* ... */ },
    features: { /* ... */ },
    customTransformers: [
      anonymizeTransformer,
      encryptTransformer,
      metadataTransformer
    ]
  }
});

Testing Your Adapter

Create comprehensive tests for your custom adapter:
import { MongoDBAdapter } from './mongodb-adapter';

describe('MongoDBAdapter', () => {
  let adapter: MongoDBAdapter;

  beforeAll(async () => {
    adapter = new MongoDBAdapter({
      url: 'mongodb://localhost:27017',
      database: 'test_whatsapp'
    });
    await adapter.connect();
  });

  afterAll(async () => {
    await adapter.disconnect();
  });

  test('should save and retrieve message', async () => {
    const testMessage = {
      whatsappMessageId: 'test_123',
      phoneNumberId: '123456',
      fromPhone: '+1234567890',
      toPhone: '+0987654321',
      messageType: 'text',
      content: { text: 'Hello World' },
      timestamp: new Date(),
      status: 'sent',
      direction: 'outgoing',
      metadata: {}
    };

    await adapter.saveMessage(testMessage);

    const retrieved = await adapter.getMessage('test_123');
    expect(retrieved).toBeDefined();
    expect(retrieved?.content.text).toBe('Hello World');
  });

  test('should search messages', async () => {
    const results = await adapter.searchMessages({
      text: 'Hello',
      limit: 10
    });

    expect(results.total).toBeGreaterThan(0);
  });

  // Add more tests...
});

Best Practices

1. Connection Pooling

Use connection pools for better performance:
class CustomAdapter implements IStorageAdapter {
  private pool: ConnectionPool;

  constructor(config: any) {
    this.pool = new ConnectionPool({
      min: 5,
      max: 20,
      ...config
    });
  }
}

2. Error Handling

Implement robust error handling:
async saveMessage(message: StorageMessage): Promise<void> {
  try {
    await this.db.insert(message);
  } catch (error) {
    if (error.code === 'DUPLICATE_KEY') {
      console.warn('Message already exists:', message.whatsappMessageId);
      return;
    }
    throw new StorageError('Failed to save message', error);
  }
}

3. Indexing

Create appropriate indexes for performance:
private async createIndexes(): Promise<void> {
  // Primary lookups
  await this.createIndex('whatsapp_message_id', { unique: true });
  await this.createIndex('phone_numbers', ['from_phone', 'to_phone']);

  // Sorting and filtering
  await this.createIndex('timestamp', { order: 'desc' });
  await this.createIndex('status');

  // Full-text search
  await this.createFullTextIndex('content');
}

4. Batch Operations

Support batch inserts for better performance:
async saveMessages(messages: StorageMessage[]): Promise<void> {
  const batchSize = 100;

  for (let i = 0; i < messages.length; i += batchSize) {
    const batch = messages.slice(i, i + batchSize);
    await this.db.insertMany(batch);
  }
}

Next Steps

I