Documentation Index
Fetch the complete documentation index at: https://mintlify.com/OpsMill/infrahub/llms.txt
Use this file to discover all available pages before exploring further.
The Infrahub Python SDK is built with async/await for high-performance concurrent operations.
Understanding Async/Await
Why Async?
Async programming allows concurrent I/O operations:
- Non-blocking: Don’t wait for API responses
- Concurrent: Process multiple requests simultaneously
- Efficient: Better resource utilization
- Scalable: Handle more operations in less time
Basic Async Pattern
import asyncio
from infrahub_sdk import InfrahubClient
async def main():
client = InfrahubClient()
# All SDK operations are async
device = await client.get(kind="InfraDevice", id="device-id")
print(device.name.value)
if __name__ == "__main__":
# Run async function
asyncio.run(main())
Concurrent Operations
Query Multiple Objects Concurrently
Fetch multiple objects at once:
import asyncio
from infrahub_sdk import InfrahubClient
async def get_multiple_devices():
client = InfrahubClient()
device_ids = ["id-1", "id-2", "id-3", "id-4", "id-5"]
# Create tasks for all queries
tasks = [
client.get(kind="InfraDevice", id=device_id)
for device_id in device_ids
]
# Execute all queries concurrently
devices = await asyncio.gather(*tasks)
print(f"Retrieved {len(devices)} devices concurrently")
for device in devices:
print(f" - {device.name.value}")
if __name__ == "__main__":
asyncio.run(get_multiple_devices())
Create Multiple Objects Concurrently
import asyncio
from infrahub_sdk import InfrahubClient
async def create_device(client: InfrahubClient, index: int):
"""Create a single device."""
device = await client.create(
kind="InfraDevice",
name=f"device-{index:03d}",
serial_number=f"SN{index:06d}"
)
await device.save()
return device
async def create_devices_concurrently():
client = InfrahubClient()
# Create 100 devices concurrently
tasks = [
create_device(client, i)
for i in range(100)
]
devices = await asyncio.gather(*tasks)
print(f"Created {len(devices)} devices")
if __name__ == "__main__":
asyncio.run(create_devices_concurrently())
Update Multiple Objects Concurrently
import asyncio
async def update_device(client: InfrahubClient, device_id: str):
"""Update a single device."""
device = await client.get(kind="InfraDevice", id=device_id)
device.is_managed.value = True
await device.save()
return device
async def update_all_devices():
client = InfrahubClient()
# Get all devices
all_devices = await client.all(kind="InfraDevice")
# Update concurrently
tasks = [
update_device(client, device.id)
for device in all_devices
]
updated = await asyncio.gather(*tasks)
print(f"Updated {len(updated)} devices concurrently")
if __name__ == "__main__":
asyncio.run(update_all_devices())
Controlling Concurrency
Using Semaphore for Rate Limiting
Limit the number of concurrent operations:
import asyncio
from infrahub_sdk import InfrahubClient
async def fetch_device_with_limit(
client: InfrahubClient,
device_id: str,
semaphore: asyncio.Semaphore
):
"""Fetch device with concurrency limit."""
async with semaphore:
return await client.get(kind="InfraDevice", id=device_id)
async def limited_concurrent_fetch():
client = InfrahubClient()
# Limit to 10 concurrent requests
semaphore = asyncio.Semaphore(10)
device_ids = [f"id-{i}" for i in range(100)]
tasks = [
fetch_device_with_limit(client, device_id, semaphore)
for device_id in device_ids
]
devices = await asyncio.gather(*tasks)
print(f"Fetched {len(devices)} devices with max 10 concurrent")
if __name__ == "__main__":
asyncio.run(limited_concurrent_fetch())
Processing in Batches
Process items in sequential batches:
import asyncio
async def process_in_batches(
client: InfrahubClient,
items: list,
batch_size: int = 10
):
"""Process items in batches."""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Process batch concurrently
batch_tasks = [
client.get(kind="InfraDevice", id=item_id)
for item_id in batch
]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
print(f"Processed batch {i//batch_size + 1}: {len(batch)} items")
# Optional delay between batches
await asyncio.sleep(0.5)
return results
# Usage
device_ids = [f"id-{i}" for i in range(100)]
devices = await process_in_batches(client, device_ids, batch_size=20)
Error Handling in Async
Handle Errors with gather
Continue processing even if some tasks fail:
import asyncio
from infrahub_sdk.exceptions import GraphQLError
async def safe_get_device(
client: InfrahubClient,
device_id: str
):
"""Get device with error handling."""
try:
return await client.get(kind="InfraDevice", id=device_id)
except GraphQLError as e:
print(f"Error fetching {device_id}: {e.message}")
return None
async def fetch_all_safely():
client = InfrahubClient()
device_ids = ["id-1", "invalid-id", "id-3"]
tasks = [
safe_get_device(client, device_id)
for device_id in device_ids
]
devices = await asyncio.gather(*tasks)
# Filter out None results
valid_devices = [d for d in devices if d is not None]
print(f"Successfully fetched {len(valid_devices)} devices")
Using return_exceptions
import asyncio
async def fetch_with_exceptions():
client = InfrahubClient()
device_ids = ["id-1", "invalid-id", "id-3"]
tasks = [
client.get(kind="InfraDevice", id=device_id)
for device_id in device_ids
]
# Return exceptions instead of raising
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
devices = []
errors = []
for result in results:
if isinstance(result, Exception):
errors.append(result)
else:
devices.append(result)
print(f"Devices: {len(devices)}, Errors: {len(errors)}")
Async Context Managers
Using Client as Context Manager
import asyncio
from infrahub_sdk import InfrahubClient
async def use_context_manager():
async with InfrahubClient() as client:
devices = await client.all(kind="InfraDevice")
print(f"Found {len(devices)} devices")
# Client automatically closes on exit
if __name__ == "__main__":
asyncio.run(use_context_manager())
Custom Async Context Manager
from contextlib import asynccontextmanager
@asynccontextmanager
async def device_transaction(client: InfrahubClient, branch_name: str):
"""Context manager for transactional device operations."""
# Setup: create branch
branch = await client.branch.create(branch_name=branch_name)
try:
yield client, branch.name
# Commit: merge or validate
print(f"Transaction succeeded on {branch.name}")
except Exception as e:
# Rollback: delete branch
await client.branch.delete(branch_name=branch.name)
print(f"Transaction rolled back: {e}")
raise
# Usage
async with device_transaction(client, "tx-branch") as (client, branch):
device = await client.create(
kind="InfraDevice",
name="new-device",
serial_number="SN123456",
branch=branch
)
await device.save()
Async Generators
Stream Results
Process items as they’re retrieved:
import asyncio
from typing import AsyncGenerator
async def stream_devices(
client: InfrahubClient,
batch_size: int = 10
) -> AsyncGenerator:
"""Stream devices in batches."""
offset = 0
while True:
# Query batch using GraphQL
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": batch_size}
)
devices = result["InfraDevice"]["edges"]
if not devices:
break
for device in devices:
yield device["node"]
offset += batch_size
# Usage
async def process_stream():
client = InfrahubClient()
count = 0
async for device in stream_devices(client):
print(f"Processing: {device['name']['value']}")
count += 1
print(f"Processed {count} devices")
Advanced Patterns
Task Coordination
Coordinate multiple async tasks:
import asyncio
async def coordinator():
client = InfrahubClient()
# Start multiple concurrent workflows
task1 = asyncio.create_task(create_devices(client, 10))
task2 = asyncio.create_task(update_devices(client))
task3 = asyncio.create_task(query_devices(client))
# Wait for all to complete
results = await asyncio.gather(task1, task2, task3)
print("All workflows completed")
return results
async def create_devices(client, count):
# Implementation
pass
async def update_devices(client):
# Implementation
pass
async def query_devices(client):
# Implementation
pass
Queue-Based Processing
Use async queues for producer-consumer pattern:
import asyncio
async def producer(
queue: asyncio.Queue,
client: InfrahubClient
):
"""Produce device IDs."""
devices = await client.all(kind="InfraDevice")
for device in devices:
await queue.put(device.id)
# Signal completion
await queue.put(None)
async def consumer(
queue: asyncio.Queue,
client: InfrahubClient,
worker_id: int
):
"""Consume and process device IDs."""
while True:
device_id = await queue.get()
if device_id is None:
# Re-add sentinel for other workers
await queue.put(None)
break
# Process device
device = await client.get(kind="InfraDevice", id=device_id)
print(f"Worker {worker_id}: {device.name.value}")
queue.task_done()
async def queue_processing():
client = InfrahubClient()
queue = asyncio.Queue()
# Start producer
producer_task = asyncio.create_task(
producer(queue, client)
)
# Start multiple consumers
consumer_tasks = [
asyncio.create_task(consumer(queue, client, i))
for i in range(5)
]
# Wait for completion
await producer_task
await asyncio.gather(*consumer_tasks)
Async Retry Logic
import asyncio
from functools import wraps
def async_retry(max_attempts: int = 3, delay: float = 1.0):
"""Decorator for async retry logic."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt < max_attempts - 1:
await asyncio.sleep(delay * (2 ** attempt))
continue
else:
raise
return wrapper
return decorator
# Usage
@async_retry(max_attempts=3, delay=1.0)
async def fetch_device(client: InfrahubClient, device_id: str):
return await client.get(kind="InfraDevice", id=device_id)
Timeout Management
import asyncio
async def fetch_with_timeout(
client: InfrahubClient,
device_id: str,
timeout: float = 5.0
):
"""Fetch device with timeout."""
try:
device = await asyncio.wait_for(
client.get(kind="InfraDevice", id=device_id),
timeout=timeout
)
return device
except asyncio.TimeoutError:
print(f"Timeout fetching device {device_id}")
return None
Synchronous Client
When to Use Sync Client
For synchronous contexts or legacy code:
from infrahub_sdk import InfrahubClientSync
# Synchronous client (no await needed)
client = InfrahubClientSync()
# Synchronous operations
device = client.get(kind="InfrahubDevice", id="device-id")
devices = client.all(kind="InfraDevice")
device.name.value = "updated-name"
device.save()
Mixed Async/Sync
Use sync client in async code when needed:
import asyncio
from infrahub_sdk import InfrahubClient, InfrahubClientSync
async def mixed_example():
# Async client for most operations
async_client = InfrahubClient()
devices = await async_client.all(kind="InfraDevice")
# Sync client for specific operations
sync_client = InfrahubClientSync()
# Note: This blocks the event loop - use sparingly
tag = sync_client.get(kind="BuiltinTag", id="tag-id")
Use concurrent operations
Leverage asyncio.gather() to run multiple operations concurrently.
Limit concurrency with Semaphore
Prevent overwhelming the API with too many concurrent requests.
Break large operations into manageable batches for better control.
Use return_exceptions=True or try/except to prevent one failure from stopping all operations.
Use async context managers
Ensure proper cleanup of resources with async context managers.
Next Steps
Batch Operations
Combine async with batch operations
Error Handling
Handle async errors properly
Pagination
Use async patterns for pagination
Branches
Async branch operations