Skip to Content

Python

Complete code examples for integrating with the VelaFlows API using Python. All examples use the requests library.

Setup: API Client Helper

Create a reusable API client class:

# velaflows.py import os import time import random import requests class VelaFlowsError(Exception): """Raised when the VelaFlows API returns an error.""" def __init__(self, status: int, code: str, message: str, details: dict = None): self.status = status self.code = code self.details = details or {} super().__init__(message) class VelaFlowsClient: BASE_URL = "https://api.velaflows.com/api/v1" def __init__(self, token: str = None, workspace_id: str = None): self.token = token or os.environ["VELAFLOWS_API_TOKEN"] self.workspace_id = workspace_id or os.environ["VELAFLOWS_WORKSPACE_ID"] self.session = requests.Session() self.session.headers.update( { "Authorization": f"Bearer {self.token}", "x-workspace-id": self.workspace_id, "Content-Type": "application/json", } ) def _handle_response(self, response: requests.Response) -> dict: data = response.json() if not response.ok or not data.get("success"): error = data.get("error", {}) raise VelaFlowsError( status=response.status_code, code=error.get("code", "UNKNOWN"), message=error.get("message", "Unknown error"), details=error.get("details"), ) return data["data"] def get(self, path: str, params: dict = None) -> dict: """Send a GET request.""" response = self.session.get(f"{self.BASE_URL}{path}", params=params) return self._handle_response(response) def post(self, path: str, json: dict = None) -> dict: """Send a POST request.""" response = self.session.post(f"{self.BASE_URL}{path}", json=json) return self._handle_response(response) def patch(self, path: str, json: dict = None) -> dict: """Send a PATCH request.""" response = self.session.patch(f"{self.BASE_URL}{path}", json=json) return self._handle_response(response) def delete(self, path: str) -> dict: """Send a DELETE request.""" response = self.session.delete(f"{self.BASE_URL}{path}") return self._handle_response(response) # Initialize the client client = VelaFlowsClient()

Common Operations

List Conversations

result = client.get("/inbox/conversations", params={ "page": 1, "limit": 20, "status": "open", }) print(f"Total conversations: {result['pagination']['total']}") for conv in result["items"]: print(f"[{conv['channelType']}] {conv['status']} - {conv['lastMessageAt']}")

Create a CRM Lead

result = client.post("/crm/leads", json={ "firstName": "Jane", "lastName": "Smith", "email": "jane.smith@example.com", "phone": "+1234567890", "pipelineId": "65a1b2c3d4e5f6a7b8c9d0e1", "stageId": "65a1b2c3d4e5f6a7b8c9d0e2", }) print(f"Created lead: {result['lead']['_id']}")

Send a Message

conversation_id = "65a1b2c3d4e5f6a7b8c9d0e1" result = client.post(f"/inbox/conversations/{conversation_id}/messages", json={ "content": "Hello! How can I help you today?", "type": "text", }) print(f"Sent message: {result['message']['_id']}")

Update a Customer

customer_id = "65a1b2c3d4e5f6a7b8c9d0e1" result = client.patch(f"/customers/{customer_id}", json={ "firstName": "Jane", "lastName": "Doe", "email": "jane.doe@newcompany.com", }) print(f"Updated customer: {result['lead']['_id']}")

Subscribe to Webhooks

result = client.post("/webhooks/subscriptions", json={ "url": "https://your-app.com/webhooks/velaflows", "events": [ "conversation.created", "conversation.message.received", "lead.stage_changed", ], "secret": "your-webhook-secret-for-verification", }) print(f"Webhook subscription: {result['subscription']['_id']}")

Error Handling

try: result = client.post("/crm/leads", json={ "email": "invalid-email", }) except VelaFlowsError as e: if e.code == "VALIDATION_ERROR": print(f"Invalid data: {e.details}") elif e.code == "UNAUTHORIZED": print("Check your API token") elif e.code == "FORBIDDEN": print("Token lacks required scope") elif e.code == "TOO_MANY_REQUESTS": print("Rate limited - retry later") else: print(f"API error [{e.code}]: {e}") except requests.RequestException as e: print(f"Network error: {e}")

Pagination Helper

Fetch all pages of a paginated endpoint:

def fetch_all_pages(path: str, params: dict = None, limit: int = 100) -> list: """Fetch all items from a paginated endpoint.""" params = dict(params or {}) params["limit"] = limit all_items = [] page = 1 while True: params["page"] = page result = client.get(path, params=params) all_items.extend(result["items"]) pagination = result["pagination"] if page >= pagination["totalPages"]: break page += 1 return all_items # Fetch all open conversations all_conversations = fetch_all_pages( "/inbox/conversations", params={"status": "open"}, ) print(f"Fetched {len(all_conversations)} conversations")

Retry with Exponential Backoff

def with_retry(fn, max_retries: int = 3): """Execute a function with automatic retry on rate limits and server errors.""" for attempt in range(max_retries + 1): try: return fn() except VelaFlowsError as e: # Don't retry client errors (except rate limits) if e.status < 500 and e.status != 429: raise if attempt == max_retries: raise delay = (2 ** attempt) + random.random() print(f"Retrying in {delay:.1f}s (attempt {attempt + 1})") time.sleep(delay) raise RuntimeError("Unreachable") # Usage conversations = with_retry( lambda: client.get("/inbox/conversations", params={"status": "open"}) )

Full Script Example

A complete script that exports all CRM leads to a CSV file:

#!/usr/bin/env python3 """Export all CRM leads to a CSV file.""" import csv import os from velaflows import VelaFlowsClient client = VelaFlowsClient() def export_leads(output_file: str = "leads_export.csv"): # Fetch all leads all_leads = [] page = 1 while True: result = client.get("/crm/leads", params={ "page": page, "limit": 100, "sortBy": "createdAt", "sortOrder": "desc", }) all_leads.extend(result["items"]) if page >= result["pagination"]["totalPages"]: break page += 1 if not all_leads: print("No leads found") return # Write CSV fieldnames = ["_id", "firstName", "lastName", "email", "phone", "status", "createdAt"] with open(output_file, "w", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() writer.writerows(all_leads) print(f"Exported {len(all_leads)} leads to {output_file}") if __name__ == "__main__": export_leads()