Complete Webhook Integration Guide
A comprehensive guide to integrating webhooks with production-ready code examples, including queue processing, idempotency, and error handling.
Webhook Configuration Methods
Rach supports two ways to configure webhooks depending on your use case:
Method 1: Per-Checkout (Crypto Gateway)
For payment gateway transactions, set the webhook URL when creating each checkout session:
javascript
POST /api/v1/checkout/create
{
"amount": 100.00,
"currency": "USDT",
"network": "BSC",
"reference": "ORDER-12345",
"callback_url": "https://yoursite.com/webhooks/payment" // ← Checkout-specific
}Use When:
- You need different webhook URLs per checkout
- Running multi-tenant platform
- Testing with different endpoints
Events Received:
payment.confirmed- Payment received and confirmedpayment.expired- Checkout session expired
Method 2: Global Business Setting (Wallet Service)
For wallet deposit monitoring, set a global webhook URL for your business account:
bash
PATCH /api/v1/business/settings
{
"webhook_url": "https://yoursite.com/webhooks/deposits",
"webhook_secret": "your_secret_key_here"
}Use When:
- Single webhook endpoint for all events
- Wallet-as-a-Service deposits
- Centralized event processing
Events Received:
wallet.deposit.detected- Deposit detected on monitored address
Configuration Comparison
| Feature | Per-Checkout | Global Business |
|---|---|---|
| Endpoint | /api/v1/checkout/create | /api/v1/business/settings |
| Scope | Single checkout session | All wallet deposits |
| Field Name | callback_url | webhook_url + webhook_secret |
| Product | Crypto Gateway | Wallet Service |
| Can Change Per Request | ✅ Yes | ❌ No (global) |
| Signature | Auto-generated | Uses webhook_secret |
Best Practice
Use per-checkout for payment gateway and global setting for wallet service. You can use both simultaneously if you need both products.
Production-Ready Integration
A complete, scalable webhook handler with all best practices implemented.
Architecture Overview
Complete Implementation
javascript
// server.js - Production webhook handler with queue
const express = require('express');
const crypto = require('crypto');
const Queue = require('bull');
const redis = require('redis');
const app = express();
app.use(express.json());
// Create webhook processing queue
const webhookQueue = new Queue('webhooks', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || 6379
}
});
// ============================================
// WEBHOOK ENDPOINT - Respond quickly!
// ============================================
app.post('/webhooks/rach', async (req, res) => {
const startTime = Date.now();
try {
// 1. Verify HMAC signature FIRST
const signature = req.headers['x-webhook-signature'];
const webhookSecret = process.env.RACH_WEBHOOK_SECRET;
if (!verifySignature(req.body, signature, webhookSecret)) {
console.error('❌ Invalid webhook signature');
return res.status(401).json({ error: 'Invalid signature' });
}
// 2. Log receipt
console.log('✅ Webhook received', {
event: req.body.event,
timestamp: new Date().toISOString()
});
// 3. Add to queue for async processing
await webhookQueue.add('process-webhook', req.body, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true,
removeOnFail: false
});
// 4. Respond immediately (within 500ms ideally)
const duration = Date.now() - startTime;
res.status(200).json({
received: true,
duration_ms: duration
});
} catch (error) {
console.error('❌ Webhook endpoint error:', error);
res.status(500).json({ error: 'Internal server error' });
}
});
// ============================================
// SIGNATURE VERIFICATION
// ============================================
function verifySignature(payload, signature, secret) {
const hmac = crypto.createHmac('sha256', secret);
hmac.update(JSON.stringify(payload));
const computed = hmac.digest('hex');
// Use timing-safe comparison
return crypto.timingSafeEqual(
Buffer.from(signature || ''),
Buffer.from(computed)
);
}
// ============================================
// QUEUE PROCESSOR - Heavy lifting here
// ============================================
webhookQueue.process('process-webhook', async (job) => {
const event = job.data;
console.log(`🔄 Processing: ${event.event}`);
try {
// Check idempotency - don't process twice
const isDuplicate = await checkIfProcessed(event);
if (isDuplicate) {
console.log('⚠️ Duplicate webhook, skipping');
return { status: 'duplicate' };
}
// Route to appropriate handler
switch (event.event) {
case 'payment.confirmed':
await handlePaymentConfirmed(event);
break;
case 'wallet.deposit.detected':
await handleWalletDeposit(event);
break;
case 'payment.expired':
await handlePaymentExpired(event);
break;
default:
console.warn(`⚠️ Unknown event type: ${event.event}`);
}
// Mark as processed
await markAsProcessed(event);
console.log(`✅ Processed: ${event.event}`);
return { status: 'success' };
} catch (error) {
console.error(`❌ Processing failed:`, error);
throw error; // Will retry automatically
}
});
// ============================================
// IDEMPOTENCY TRACKING
// ============================================
const db = require('./database'); // Your database client
async function checkIfProcessed(event) {
const { event: eventType, session_id, data } = event;
// Create unique key based on event
let idempotencyKey;
if (session_id) {
idempotencyKey = `${eventType}-${session_id}`;
} else if (data?.customer_id && data?.detected_at) {
idempotencyKey = `${eventType}-${data.customer_id}-${data.detected_at}`;
} else {
idempotencyKey = `${eventType}-${JSON.stringify(event)}`;
}
const existing = await db.query(
'SELECT id FROM webhook_events WHERE idempotency_key = $1',
[idempotencyKey]
);
return existing.rows.length > 0;
}
async function markAsProcessed(event) {
const { event: eventType, session_id, data } = event;
let idempotencyKey;
if (session_id) {
idempotencyKey = `${eventType}-${session_id}`;
} else if (data?.customer_id && data?.detected_at) {
idempotencyKey = `${eventType}-${data.customer_id}-${data.detected_at}`;
} else {
idempotencyKey = `${eventType}-${JSON.stringify(event)}`;
}
await db.query(`
INSERT INTO webhook_events (idempotency_key, event_type, payload, processed_at)
VALUES ($1, $2, $3, NOW())
ON CONFLICT (idempotency_key) DO NOTHING
`, [idempotencyKey, eventType, JSON.stringify(event)]);
}
// ============================================
// EVENT HANDLERS
// ============================================
async function handlePaymentConfirmed(event) {
const { reference, amount, currency, session_id, tx_hash } = event;
await db.transaction(async (trx) => {
// 1. Update order status
await trx.query(`
UPDATE orders
SET status = 'paid',
paid_at = NOW(),
tx_hash = $1,
session_id = $2
WHERE reference = $3
`, [tx_hash, session_id, reference]);
// 2. Record transaction
await trx.query(`
INSERT INTO transactions (order_reference, amount, currency, type, status)
VALUES ($1, $2, $3, 'payment', 'confirmed')
`, [reference, amount, currency]);
// 3. Fulfill order (inventory, shipping, etc.)
await fulfillOrder(reference);
});
// 4. Notify customer (async, outside transaction)
await sendCustomerEmail(reference, 'payment_confirmed');
console.log(`💰 Payment confirmed: ${reference} - ${amount} ${currency}`);
}
async function handleWalletDeposit(event) {
const { customer_id, amount, currency, network, address } = event.data;
await db.transaction(async (trx) => {
// 1. Update user balance in YOUR database
await trx.query(`
UPDATE user_balances
SET ${currency.toLowerCase()}_balance = ${currency.toLowerCase()}_balance + $1,
updated_at = NOW()
WHERE customer_id = $2
`, [parseFloat(amount), customer_id]);
// 2. Record deposit
await trx.query(`
INSERT INTO deposits (customer_id, amount, currency, network, address, type)
VALUES ($1, $2, $3, $4, $5, 'wallet_deposit')
`, [customer_id, amount, currency, network, address]);
});
// 3. Notify user
await sendDepositNotification(customer_id, amount, currency);
console.log(`💵 Deposit credited: ${customer_id} - ${amount} ${currency}`);
}
async function handlePaymentExpired(event) {
const { reference, checkout_id } = event;
await db.query(`
UPDATE orders
SET status = 'expired',
expired_at = NOW()
WHERE reference = $1
`, [reference]);
console.log(`⏰ Payment expired: ${reference}`);
}
// ============================================
// HELPER FUNCTIONS
// ============================================
async function fulfillOrder(reference) {
// Your order fulfillment logic
// - Update inventory
// - Trigger shipping
// - Activate subscription
// - Grant access
console.log(`📦 Fulfilling order: ${reference}`);
}
async function sendCustomerEmail(reference, template) {
// Your email sending logic
console.log(`📧 Sending ${template} email for ${reference}`);
}
async function sendDepositNotification(customerId, amount, currency) {
// Send push notification or email
console.log(`🔔 Notifying ${customerId}: Received ${amount} ${currency}`);
}
// ============================================
//QUEUE EVENT LISTENERS
// ============================================
webhookQueue.on('completed', (job, result) => {
console.log(`✅ Job ${job.id} completed:`, result);
});
webhookQueue.on('failed', (job, err) => {
console.error(`❌ Job ${job.id} failed:`, err.message);
// Alert your monitoring system
});
webhookQueue.on('stalled', (job) => {
console.warn(`⚠️ Job ${job.id} stalled, will retry`);
});
// ============================================
// START SERVER
// ============================================
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`🚀 Webhook server running on port ${PORT}`);
});python
# app.py - Flask with Celery for async processing
from flask import Flask, request, jsonify
import hmac
import hashlib
import json
import os
from celery import Celery
from datetime import datetime
import psycopg2
from contextlib import contextmanager
app = Flask(__name__)
# Configure Celery
celery = Celery('webhooks',
broker=os.getenv('REDIS_URL', 'redis://localhost:6379/0'),
backend=os.getenv('REDIS_URL', 'redis://localhost:6379/0'))
# ============================================
# WEBHOOK ENDPOINT
# ============================================
@app.route('/webhooks/rach', methods=['POST'])
def webhook_handler():
start_time = datetime.now()
try:
# 1. Verify signature
signature = request.headers.get('X-Webhook-Signature')
webhook_secret = os.getenv('RACH_WEBHOOK_SECRET')
payload = request.get_data()
if not verify_signature(payload, signature, webhook_secret):
app.logger.error('Invalid webhook signature')
return jsonify({'error': 'Invalid signature'}), 401
# 2. Parse event
event = request.json
app.logger.info(f"Webhook received: {event.get('event')}")
# 3. Queue for processing
process_webhook.delay(event)
# 4. Respond quickly
duration = (datetime.now() - start_time).total_seconds() * 1000
return jsonify({
'received': True,
'duration_ms': duration
}), 200
except Exception as e:
app.logger.error(f'Webhook error: {str(e)}')
return jsonify({'error': 'Internal server error'}), 500
# ============================================
# SIGNATURE VERIFICATION
# ============================================
def verify_signature(payload, signature, secret):
if not signature or not secret:
return False
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
# ============================================
# CELERY TASK - Async Processing
# ============================================
@celery.task(bind=True, max_retries=3)
def process_webhook(self, event):
try:
event_type = event.get('event')
app.logger.info(f"Processing: {event_type}")
# Check idempotency
if is_already_processed(event):
app.logger.warn('Duplicate webhook, skipping')
return {'status': 'duplicate'}
# Route to handler
handlers = {
'payment.confirmed': handle_payment_confirmed,
'wallet.deposit.detected': handle_wallet_deposit,
'payment.expired': handle_payment_expired
}
handler = handlers.get(event_type)
if handler:
handler(event)
else:
app.logger.warn(f'Unknown event: {event_type}')
# Mark processed
mark_as_processed(event)
app.logger.info(f"Processed: {event_type}")
return {'status': 'success'}
except Exception as e:
app.logger.error(f'Processing failed: {str(e)}')
raise self.retry(exc=e, countdown=60) # Retry after 60s
# ============================================
# DATABASE HELPERS
# ============================================
@contextmanager
def get_db():
conn = psycopg2.connect(os.getenv('DATABASE_URL'))
try:
yield conn
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
def is_already_processed(event):
event_type = event.get('event')
session_id = event.get('session_id')
data = event.get('data', {})
# Create idempotency key
if session_id:
key = f"{event_type}-{session_id}"
elif data.get('customer_id') and data.get('detected_at'):
key = f"{event_type}-{data['customer_id']}-{data['detected_at']}"
else:
key = f"{event_type}-{json.dumps(event, sort_keys=True)}"
with get_db() as conn:
cur = conn.cursor()
cur.execute(
'SELECT id FROM webhook_events WHERE idempotency_key = %s',
(key,)
)
return cur.fetchone() is not None
def mark_as_processed(event):
event_type = event.get('event')
session_id = event.get('session_id')
data = event.get('data', {})
if session_id:
key = f"{event_type}-{session_id}"
elif data.get('customer_id') and data.get('detected_at'):
key = f"{event_type}-{data['customer_id']}-{data['detected_at']}"
else:
key = f"{event_type}-{json.dumps(event, sort_keys=True)}"
with get_db() as conn:
cur = conn.cursor()
cur.execute("""
INSERT INTO webhook_events (idempotency_key, event_type, payload, processed_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (idempotency_key) DO NOTHING
""", (key, event_type, json.dumps(event)))
# ============================================
# EVENT HANDLERS
# ============================================
def handle_payment_confirmed(event):
reference = event.get('reference')
amount = event.get('amount')
currency = event.get('currency')
tx_hash = event.get('tx_hash')
with get_db() as conn:
cur = conn.cursor()
# Update order
cur.execute("""
UPDATE orders
SET status = 'paid', paid_at = NOW(), tx_hash = %s
WHERE reference = %s
""", (tx_hash, reference))
# Record transaction
cur.execute("""
INSERT INTO transactions (order_reference, amount, currency, type, status)
VALUES (%s, %s, %s, 'payment', 'confirmed')
""", (reference, amount, currency))
# Fulfill order
fulfill_order(reference)
send_customer_email(reference, 'payment_confirmed')
app.logger.info(f"Payment confirmed: {reference} - {amount} {currency}")
def handle_wallet_deposit(event):
data = event.get('data', {})
customer_id = data.get('customer_id')
amount = float(data.get('amount'))
currency = data.get('currency')
with get_db() as conn:
cur = conn.cursor()
# Update balance
cur.execute(f"""
UPDATE user_balances
SET {currency.lower()}_balance = {currency.lower()}_balance + %s,
updated_at = NOW()
WHERE customer_id = %s
""", (amount, customer_id))
# Record deposit
cur.execute("""
INSERT INTO deposits (customer_id, amount, currency, network, type)
VALUES (%s, %s, %s, %s, 'wallet_deposit')
""", (customer_id, amount, currency, data.get('network')))
send_deposit_notification(customer_id, amount, currency)
app.logger.info(f"Deposit credited: {customer_id} - {amount} {currency}")
def handle_payment_expired(event):
reference = event.get('reference')
with get_db() as conn:
cur = conn.cursor()
cur.execute("""
UPDATE orders
SET status = 'expired', expired_at = NOW()
WHERE reference = %s
""", (reference,))
app.logger.info(f"Payment expired: {reference}")
# ============================================
# HELPER FUNCTIONS
# ============================================
def fulfill_order(reference):
app.logger.info(f"Fulfilling order: {reference}")
# Your fulfillment logic here
def send_customer_email(reference, template):
app.logger.info(f"Sending {template} email for {reference}")
# Your email logic here
def send_deposit_notification(customer_id, amount, currency):
app.logger.info(f"Notifying {customer_id}: Received {amount} {currency}")
# Your notification logic here
if __name__ == '__main__':
app.run(port=3000, debug=False)Database Schema
Create this table to track processed webhooks:
sql
-- Webhook events tracking (idempotency)
CREATE TABLE webhook_events (
id SERIAL PRIMARY KEY,
idempotency_key VARCHAR(255) UNIQUE NOT NULL,
event_type VARCHAR(50) NOT NULL,
payload JSONB NOT NULL,
processed_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_webhook_events_type ON webhook_events(event_type);
CREATE INDEX idx_webhook_events_processed ON webhook_events(processed_at);Production Checklist
Deployment Checklist
Before Going Live:
- [ ] Set
RACH_WEBHOOK_SECRETin environment variables - [ ] Set up Redis/RabbitMQ for queue
- [ ] Create
webhook_eventstable for idempotency - [ ] Configure worker processes (separate from web server)
- [ ] Set up monitoring/alerting for failed jobs
- [ ] Test with ngrok during development
- [ ] Verify HTTPS is enabled on webhook endpoint
- [ ] Configure rate limiting
- [ ] Set up logging/log aggregation
- [ ] Test idempotency (send duplicate webhooks)
- [ ] Load test webhook endpoint (should respond < 500ms)
Monitoring & Debugging
Key Metrics to Track
javascript
// Example: Datadog/New Relic metrics
statsd.timing('webhook.receive_duration', duration);
statsd.increment('webhook.received', {event: eventType});
statsd.increment('webhook.processed', {event: eventType, status: 'success'});
statsd.increment('webhook.signature_failure');Logging Best Practices
javascript
// Structured logging
logger.info('webhook.received', {
event: event.event,
session_id: event.session_id,
received_at: new Date().toISOString(),
signature_valid: true
});
logger.info('webhook.processed', {
event: event.event,
duration_ms: processingTime,
status: 'success'
});Testing
Load Testing
bash
# Using Apache Bench
ab -n 1000 -c 10 -p webhook.json \
-T 'application/json' \
-H 'X-Webhook-Signature: test_sig' \
http://localhost:3000/webhooks/rach
# Should maintain < 500ms response timeIdempotency Testing
javascript
// Send same webhook twice
const webhook = {
event: 'payment.confirmed',
session_id: 'test_123',
reference: 'ORDER-123'
};
// First call - should process
await sendWebhook(webhook);
// Second call - should skip (duplicate)
await sendWebhook(webhook);
// Verify order only fulfilled once