Overview
While Supabase is the recommended storage provider, you can create custom adapters for any database system. This guide shows how to implement theIStorageAdapter interface for your preferred database.
IStorageAdapter Interface
All storage adapters must implement this interface:Copy
interface IStorageAdapter {
// Connection management
connect(): Promise<void>;
disconnect(): Promise<void>;
isConnected(): boolean;
// Message operations
saveMessage(message: StorageMessage): Promise<void>;
getMessage(messageId: string): Promise<StorageMessage | null>;
updateMessageStatus(messageId: string, status: MessageStatus): Promise<void>;
deleteMessage(messageId: string): Promise<void>;
// Conversation operations
getConversation(
phoneNumber: string,
options?: ConversationQueryOptions
): Promise<ConversationResult>;
// Search operations
searchMessages(options: SearchOptions): Promise<SearchResult>;
// Thread operations
getMessageThread(messageId: string): Promise<MessageThread | null>;
// Analytics
getConversationAnalytics(
phoneNumber: string,
options?: AnalyticsOptions
): Promise<ConversationAnalytics>;
// Export
exportConversation(
phoneNumber: string,
options?: ExportOptions
): Promise<ExportResult>;
// Cleanup
cleanupOldMessages(retentionDays: number): Promise<number>;
}
Example: MongoDB Adapter
Here’s a complete example of a MongoDB storage adapter:Copy
import { MongoClient, Db, Collection } from 'mongodb';
import {
IStorageAdapter,
StorageMessage,
ConversationResult,
SearchResult,
MessageThread,
ConversationAnalytics,
ExportResult,
MessageStatus
} from 'whatsapp-client-sdk';
export class MongoDBAdapter implements IStorageAdapter {
private client: MongoClient;
private db: Db;
private messagesCollection: Collection;
private conversationsCollection: Collection;
private connected = false;
constructor(private config: {
url: string;
database: string;
messagesCollection?: string;
conversationsCollection?: string;
}) {
this.client = new MongoClient(config.url);
}
async connect(): Promise<void> {
if (this.connected) return;
await this.client.connect();
this.db = this.client.db(this.config.database);
this.messagesCollection = this.db.collection(
this.config.messagesCollection || 'whatsapp_messages'
);
this.conversationsCollection = this.db.collection(
this.config.conversationsCollection || 'whatsapp_conversations'
);
// Create indexes
await this.createIndexes();
this.connected = true;
console.log('✅ MongoDB connected');
}
async disconnect(): Promise<void> {
if (!this.connected) return;
await this.client.close();
this.connected = false;
console.log('✅ MongoDB disconnected');
}
isConnected(): boolean {
return this.connected;
}
private async createIndexes(): Promise<void> {
// Message indexes
await this.messagesCollection.createIndex({ whatsappMessageId: 1 }, { unique: true });
await this.messagesCollection.createIndex({ fromPhone: 1 });
await this.messagesCollection.createIndex({ toPhone: 1 });
await this.messagesCollection.createIndex({ timestamp: -1 });
await this.messagesCollection.createIndex({ status: 1 });
await this.messagesCollection.createIndex({ conversationId: 1 });
// Full-text search index
await this.messagesCollection.createIndex({ 'content.text': 'text' });
// Conversation indexes
await this.conversationsCollection.createIndex({ phoneNumber: 1 }, { unique: true });
await this.conversationsCollection.createIndex({ lastMessageAt: -1 });
}
async saveMessage(message: StorageMessage): Promise<void> {
// Upsert conversation
await this.conversationsCollection.updateOne(
{ phoneNumber: message.fromPhone },
{
$set: {
phoneNumber: message.fromPhone,
businessPhoneId: message.phoneNumberId,
lastMessageAt: message.timestamp,
updatedAt: new Date()
},
$inc: { messageCount: 1 },
$setOnInsert: {
createdAt: new Date(),
unreadCount: message.direction === 'incoming' ? 1 : 0,
metadata: {}
}
},
{ upsert: true }
);
// Insert message
await this.messagesCollection.insertOne({
...message,
createdAt: new Date(),
updatedAt: new Date()
});
}
async getMessage(messageId: string): Promise<StorageMessage | null> {
const message = await this.messagesCollection.findOne({
whatsappMessageId: messageId
});
return message as StorageMessage | null;
}
async updateMessageStatus(messageId: string, status: MessageStatus): Promise<void> {
await this.messagesCollection.updateOne(
{ whatsappMessageId: messageId },
{
$set: {
status,
updatedAt: new Date()
}
}
);
}
async deleteMessage(messageId: string): Promise<void> {
await this.messagesCollection.deleteOne({
whatsappMessageId: messageId
});
}
async getConversation(
phoneNumber: string,
options?: {
limit?: number;
offset?: number;
dateFrom?: Date;
dateTo?: Date;
}
): Promise<ConversationResult> {
const query: any = {
$or: [
{ fromPhone: phoneNumber },
{ toPhone: phoneNumber }
]
};
if (options?.dateFrom || options?.dateTo) {
query.timestamp = {};
if (options.dateFrom) query.timestamp.$gte = options.dateFrom;
if (options.dateTo) query.timestamp.$lte = options.dateTo;
}
const [messages, total] = await Promise.all([
this.messagesCollection
.find(query)
.sort({ timestamp: -1 })
.skip(options?.offset || 0)
.limit(options?.limit || 50)
.toArray(),
this.messagesCollection.countDocuments(query)
]);
return {
phoneNumber,
totalMessages: total,
messages: messages as any[]
};
}
async searchMessages(options: {
text: string;
phoneNumber?: string;
messageType?: string;
dateFrom?: Date;
dateTo?: Date;
limit?: number;
offset?: number;
}): Promise<SearchResult> {
const query: any = {};
// Text search
if (options.text) {
query.$text = { $search: options.text };
}
// Phone number filter
if (options.phoneNumber) {
query.$or = [
{ fromPhone: options.phoneNumber },
{ toPhone: options.phoneNumber }
];
}
// Message type filter
if (options.messageType) {
query.messageType = options.messageType;
}
// Date range filter
if (options.dateFrom || options.dateTo) {
query.timestamp = {};
if (options.dateFrom) query.timestamp.$gte = options.dateFrom;
if (options.dateTo) query.timestamp.$lte = options.dateTo;
}
const [messages, total] = await Promise.all([
this.messagesCollection
.find(query)
.sort({ timestamp: -1 })
.skip(options?.offset || 0)
.limit(options?.limit || 20)
.toArray(),
this.messagesCollection.countDocuments(query)
]);
return {
total,
messages: messages as any[]
};
}
async getMessageThread(messageId: string): Promise<MessageThread | null> {
const originalMessage = await this.getMessage(messageId);
if (!originalMessage) {
return null;
}
const replies = await this.messagesCollection
.find({ replyToMessageId: messageId })
.sort({ timestamp: 1 })
.toArray();
return {
originalMessage: originalMessage as any,
replies: replies as any[]
};
}
async getConversationAnalytics(
phoneNumber: string,
options?: {
from?: Date;
to?: Date;
}
): Promise<ConversationAnalytics> {
const query: any = {
$or: [
{ fromPhone: phoneNumber },
{ toPhone: phoneNumber }
]
};
if (options?.from || options?.to) {
query.timestamp = {};
if (options.from) query.timestamp.$gte = options.from;
if (options.to) query.timestamp.$lte = options.to;
}
const messages = await this.messagesCollection
.find(query)
.sort({ timestamp: 1 })
.toArray();
const incoming = messages.filter(m => m.direction === 'incoming');
const outgoing = messages.filter(m => m.direction === 'outgoing');
// Calculate average response time
let totalResponseTime = 0;
let responseCount = 0;
for (let i = 0; i < incoming.length; i++) {
const incomingMsg = incoming[i];
const nextOutgoing = outgoing.find(
m => m.timestamp > incomingMsg.timestamp
);
if (nextOutgoing) {
totalResponseTime += nextOutgoing.timestamp - incomingMsg.timestamp;
responseCount++;
}
}
const avgResponseTime = responseCount > 0
? totalResponseTime / responseCount
: 0;
// Message types distribution
const messagesByType = messages.reduce((acc, msg) => {
acc[msg.messageType] = (acc[msg.messageType] || 0) + 1;
return acc;
}, {} as Record<string, number>);
return {
phoneNumber,
totalMessages: messages.length,
incomingMessages: incoming.length,
outgoingMessages: outgoing.length,
firstMessageAt: messages[0]?.timestamp || new Date(),
lastMessageAt: messages[messages.length - 1]?.timestamp || new Date(),
averageResponseTime: avgResponseTime,
responseRate: (responseCount / incoming.length) * 100 || 0,
messagesByType,
messagesByDay: [] // Implement grouping by day if needed
};
}
async exportConversation(
phoneNumber: string,
options?: {
format?: 'json' | 'csv';
dateFrom?: Date;
dateTo?: Date;
}
): Promise<ExportResult> {
const conversation = await this.getConversation(phoneNumber, {
dateFrom: options?.dateFrom,
dateTo: options?.dateTo,
limit: 10000 // Large limit for export
});
if (options?.format === 'csv') {
const csv = this.convertToCSV(conversation.messages);
return {
data: csv,
messageCount: conversation.messages.length,
format: 'csv'
};
}
return {
data: conversation.messages,
messageCount: conversation.messages.length,
format: 'json'
};
}
private convertToCSV(messages: any[]): string {
const headers = ['Timestamp', 'From', 'To', 'Type', 'Direction', 'Status', 'Content'];
const rows = messages.map(msg => [
msg.timestamp.toISOString(),
msg.fromPhone,
msg.toPhone,
msg.messageType,
msg.direction,
msg.status,
msg.content.text || JSON.stringify(msg.content)
]);
return [
headers.join(','),
...rows.map(row => row.map(cell => `"${cell}"`).join(','))
].join('\n');
}
async cleanupOldMessages(retentionDays: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
const result = await this.messagesCollection.deleteMany({
timestamp: { $lt: cutoffDate }
});
return result.deletedCount || 0;
}
}
Usage
Copy
import { WhatsAppClient } from 'whatsapp-client-sdk';
import { MongoDBAdapter } from './mongodb-adapter';
const mongoAdapter = new MongoDBAdapter({
url: process.env.MONGODB_URL!,
database: 'whatsapp',
messagesCollection: 'messages',
conversationsCollection: 'conversations'
});
const client = new WhatsAppClient({
accessToken: process.env.WHATSAPP_ACCESS_TOKEN!,
phoneNumberId: process.env.WHATSAPP_PHONE_NUMBER_ID!,
storage: {
enabled: true,
provider: 'custom',
options: {
customAdapter: mongoAdapter
},
features: {
persistIncoming: true,
persistOutgoing: true,
persistStatus: true
}
}
});
await client.initializeStorage();
Example: PostgreSQL Adapter
Usingpg library for PostgreSQL:
Copy
import { Pool } from 'pg';
import { IStorageAdapter, StorageMessage } from 'whatsapp-client-sdk';
export class PostgreSQLAdapter implements IStorageAdapter {
private pool: Pool;
private connected = false;
constructor(config: {
host: string;
port: number;
database: string;
user: string;
password: string;
}) {
this.pool = new Pool(config);
}
async connect(): Promise<void> {
if (this.connected) return;
// Test connection
await this.pool.query('SELECT NOW()');
// Create tables
await this.createTables();
this.connected = true;
console.log('✅ PostgreSQL connected');
}
async disconnect(): Promise<void> {
await this.pool.end();
this.connected = false;
}
isConnected(): boolean {
return this.connected;
}
private async createTables(): Promise<void> {
await this.pool.query(`
CREATE TABLE IF NOT EXISTS whatsapp_conversations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
phone_number VARCHAR(50) NOT NULL UNIQUE,
business_phone_id VARCHAR(50),
last_message_at TIMESTAMPTZ DEFAULT NOW(),
message_count INTEGER DEFAULT 0,
unread_count INTEGER DEFAULT 0,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
await this.pool.query(`
CREATE TABLE IF NOT EXISTS whatsapp_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
whatsapp_message_id VARCHAR(255) UNIQUE NOT NULL,
conversation_id UUID REFERENCES whatsapp_conversations(id),
phone_number_id VARCHAR(50),
from_phone VARCHAR(50),
to_phone VARCHAR(50),
message_type VARCHAR(50),
content JSONB,
reply_to_message_id UUID,
timestamp TIMESTAMPTZ,
status VARCHAR(50),
direction VARCHAR(20),
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
)
`);
// Create indexes
await this.pool.query(`
CREATE INDEX IF NOT EXISTS idx_messages_whatsapp_id
ON whatsapp_messages(whatsapp_message_id)
`);
}
async saveMessage(message: StorageMessage): Promise<void> {
await this.pool.query(
`INSERT INTO whatsapp_messages (
whatsapp_message_id, phone_number_id, from_phone, to_phone,
message_type, content, timestamp, status, direction
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`,
[
message.whatsappMessageId,
message.phoneNumberId,
message.fromPhone,
message.toPhone,
message.messageType,
JSON.stringify(message.content),
message.timestamp,
message.status,
message.direction
]
);
}
// Implement other methods...
}
Message Transformers
Transform messages before storing them in the database:Copy
import { MessageTransformer, StorageMessage } from 'whatsapp-client-sdk';
// Example: Anonymize phone numbers
const anonymizeTransformer: MessageTransformer = {
name: 'anonymize',
transform: async (message: StorageMessage): Promise<StorageMessage> => {
return {
...message,
fromPhone: message.fromPhone.substring(0, 5) + 'XXXXX',
toPhone: message.toPhone.substring(0, 5) + 'XXXXX'
};
}
};
// Example: Encrypt message content
const encryptTransformer: MessageTransformer = {
name: 'encrypt',
transform: async (message: StorageMessage): Promise<StorageMessage> => {
const crypto = await import('crypto');
const encrypted = crypto.createCipher('aes-256-cbc', process.env.ENCRYPTION_KEY!)
.update(JSON.stringify(message.content), 'utf8', 'hex');
return {
...message,
content: { encrypted }
};
}
};
// Example: Add custom metadata
const metadataTransformer: MessageTransformer = {
name: 'metadata',
transform: async (message: StorageMessage): Promise<StorageMessage> => {
return {
...message,
metadata: {
...message.metadata,
processedAt: new Date().toISOString(),
server: process.env.SERVER_ID,
version: '1.0.0'
}
};
}
};
// Use transformers
const client = new WhatsAppClient({
accessToken: process.env.WHATSAPP_ACCESS_TOKEN!,
phoneNumberId: process.env.WHATSAPP_PHONE_NUMBER_ID!,
storage: {
enabled: true,
provider: 'supabase',
options: { /* ... */ },
features: { /* ... */ },
customTransformers: [
anonymizeTransformer,
encryptTransformer,
metadataTransformer
]
}
});
Testing Your Adapter
Create comprehensive tests for your custom adapter:Copy
import { MongoDBAdapter } from './mongodb-adapter';
describe('MongoDBAdapter', () => {
let adapter: MongoDBAdapter;
beforeAll(async () => {
adapter = new MongoDBAdapter({
url: 'mongodb://localhost:27017',
database: 'test_whatsapp'
});
await adapter.connect();
});
afterAll(async () => {
await adapter.disconnect();
});
test('should save and retrieve message', async () => {
const testMessage = {
whatsappMessageId: 'test_123',
phoneNumberId: '123456',
fromPhone: '+1234567890',
toPhone: '+0987654321',
messageType: 'text',
content: { text: 'Hello World' },
timestamp: new Date(),
status: 'sent',
direction: 'outgoing',
metadata: {}
};
await adapter.saveMessage(testMessage);
const retrieved = await adapter.getMessage('test_123');
expect(retrieved).toBeDefined();
expect(retrieved?.content.text).toBe('Hello World');
});
test('should search messages', async () => {
const results = await adapter.searchMessages({
text: 'Hello',
limit: 10
});
expect(results.total).toBeGreaterThan(0);
});
// Add more tests...
});
Best Practices
1. Connection Pooling
Use connection pools for better performance:Copy
class CustomAdapter implements IStorageAdapter {
private pool: ConnectionPool;
constructor(config: any) {
this.pool = new ConnectionPool({
min: 5,
max: 20,
...config
});
}
}
2. Error Handling
Implement robust error handling:Copy
async saveMessage(message: StorageMessage): Promise<void> {
try {
await this.db.insert(message);
} catch (error) {
if (error.code === 'DUPLICATE_KEY') {
console.warn('Message already exists:', message.whatsappMessageId);
return;
}
throw new StorageError('Failed to save message', error);
}
}
3. Indexing
Create appropriate indexes for performance:Copy
private async createIndexes(): Promise<void> {
// Primary lookups
await this.createIndex('whatsapp_message_id', { unique: true });
await this.createIndex('phone_numbers', ['from_phone', 'to_phone']);
// Sorting and filtering
await this.createIndex('timestamp', { order: 'desc' });
await this.createIndex('status');
// Full-text search
await this.createFullTextIndex('content');
}
4. Batch Operations
Support batch inserts for better performance:Copy
async saveMessages(messages: StorageMessage[]): Promise<void> {
const batchSize = 100;
for (let i = 0; i < messages.length; i += batchSize) {
const batch = messages.slice(i, i + batchSize);
await this.db.insertMany(batch);
}
}