Python
Complete code examples for integrating with the VelaFlows API using Python.
All examples use the requests library.
Setup: API Client Helper
Create a reusable API client class:
# velaflows.py
import os
import time
import random
import requests
class VelaFlowsError(Exception):
"""Raised when the VelaFlows API returns an error."""
def __init__(self, status: int, code: str, message: str, details: dict = None):
self.status = status
self.code = code
self.details = details or {}
super().__init__(message)
class VelaFlowsClient:
BASE_URL = "https://api.velaflows.com/api/v1"
def __init__(self, token: str = None, workspace_id: str = None):
self.token = token or os.environ["VELAFLOWS_API_TOKEN"]
self.workspace_id = workspace_id or os.environ["VELAFLOWS_WORKSPACE_ID"]
self.session = requests.Session()
self.session.headers.update(
{
"Authorization": f"Bearer {self.token}",
"x-workspace-id": self.workspace_id,
"Content-Type": "application/json",
}
)
def _handle_response(self, response: requests.Response) -> dict:
data = response.json()
if not response.ok or not data.get("success"):
error = data.get("error", {})
raise VelaFlowsError(
status=response.status_code,
code=error.get("code", "UNKNOWN"),
message=error.get("message", "Unknown error"),
details=error.get("details"),
)
return data["data"]
def get(self, path: str, params: dict = None) -> dict:
"""Send a GET request."""
response = self.session.get(f"{self.BASE_URL}{path}", params=params)
return self._handle_response(response)
def post(self, path: str, json: dict = None) -> dict:
"""Send a POST request."""
response = self.session.post(f"{self.BASE_URL}{path}", json=json)
return self._handle_response(response)
def patch(self, path: str, json: dict = None) -> dict:
"""Send a PATCH request."""
response = self.session.patch(f"{self.BASE_URL}{path}", json=json)
return self._handle_response(response)
def delete(self, path: str) -> dict:
"""Send a DELETE request."""
response = self.session.delete(f"{self.BASE_URL}{path}")
return self._handle_response(response)
# Initialize the client
client = VelaFlowsClient()Common Operations
List Conversations
result = client.get("/inbox/conversations", params={
"page": 1,
"limit": 20,
"status": "open",
})
print(f"Total conversations: {result['pagination']['total']}")
for conv in result["items"]:
print(f"[{conv['channelType']}] {conv['status']} - {conv['lastMessageAt']}")Create a CRM Lead
result = client.post("/crm/leads", json={
"firstName": "Jane",
"lastName": "Smith",
"email": "jane.smith@example.com",
"phone": "+1234567890",
"pipelineId": "65a1b2c3d4e5f6a7b8c9d0e1",
"stageId": "65a1b2c3d4e5f6a7b8c9d0e2",
})
print(f"Created lead: {result['lead']['_id']}")Send a Message
conversation_id = "65a1b2c3d4e5f6a7b8c9d0e1"
result = client.post(f"/inbox/conversations/{conversation_id}/messages", json={
"content": "Hello! How can I help you today?",
"type": "text",
})
print(f"Sent message: {result['message']['_id']}")Update a Customer
customer_id = "65a1b2c3d4e5f6a7b8c9d0e1"
result = client.patch(f"/customers/{customer_id}", json={
"firstName": "Jane",
"lastName": "Doe",
"email": "jane.doe@newcompany.com",
})
print(f"Updated customer: {result['lead']['_id']}")Subscribe to Webhooks
result = client.post("/webhooks/subscriptions", json={
"url": "https://your-app.com/webhooks/velaflows",
"events": [
"conversation.created",
"conversation.message.received",
"lead.stage_changed",
],
"secret": "your-webhook-secret-for-verification",
})
print(f"Webhook subscription: {result['subscription']['_id']}")Error Handling
try:
result = client.post("/crm/leads", json={
"email": "invalid-email",
})
except VelaFlowsError as e:
if e.code == "VALIDATION_ERROR":
print(f"Invalid data: {e.details}")
elif e.code == "UNAUTHORIZED":
print("Check your API token")
elif e.code == "FORBIDDEN":
print("Token lacks required scope")
elif e.code == "TOO_MANY_REQUESTS":
print("Rate limited - retry later")
else:
print(f"API error [{e.code}]: {e}")
except requests.RequestException as e:
print(f"Network error: {e}")Pagination Helper
Fetch all pages of a paginated endpoint:
def fetch_all_pages(path: str, params: dict = None, limit: int = 100) -> list:
"""Fetch all items from a paginated endpoint."""
params = dict(params or {})
params["limit"] = limit
all_items = []
page = 1
while True:
params["page"] = page
result = client.get(path, params=params)
all_items.extend(result["items"])
pagination = result["pagination"]
if page >= pagination["totalPages"]:
break
page += 1
return all_items
# Fetch all open conversations
all_conversations = fetch_all_pages(
"/inbox/conversations",
params={"status": "open"},
)
print(f"Fetched {len(all_conversations)} conversations")Retry with Exponential Backoff
def with_retry(fn, max_retries: int = 3):
"""Execute a function with automatic retry on rate limits and server errors."""
for attempt in range(max_retries + 1):
try:
return fn()
except VelaFlowsError as e:
# Don't retry client errors (except rate limits)
if e.status < 500 and e.status != 429:
raise
if attempt == max_retries:
raise
delay = (2 ** attempt) + random.random()
print(f"Retrying in {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
raise RuntimeError("Unreachable")
# Usage
conversations = with_retry(
lambda: client.get("/inbox/conversations", params={"status": "open"})
)Full Script Example
A complete script that exports all CRM leads to a CSV file:
#!/usr/bin/env python3
"""Export all CRM leads to a CSV file."""
import csv
import os
from velaflows import VelaFlowsClient
client = VelaFlowsClient()
def export_leads(output_file: str = "leads_export.csv"):
# Fetch all leads
all_leads = []
page = 1
while True:
result = client.get("/crm/leads", params={
"page": page,
"limit": 100,
"sortBy": "createdAt",
"sortOrder": "desc",
})
all_leads.extend(result["items"])
if page >= result["pagination"]["totalPages"]:
break
page += 1
if not all_leads:
print("No leads found")
return
# Write CSV
fieldnames = ["_id", "firstName", "lastName", "email", "phone", "status", "createdAt"]
with open(output_file, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
writer.writerows(all_leads)
print(f"Exported {len(all_leads)} leads to {output_file}")
if __name__ == "__main__":
export_leads()