Documentation Index
Fetch the complete documentation index at: https://mintlify.com/OpsMill/infrahub/llms.txt
Use this file to discover all available pages before exploring further.
Pagination allows you to work with large datasets by fetching data in manageable chunks.
Understanding Pagination
Pagination is essential for:
- Performance: Avoid loading thousands of objects at once
- Memory: Reduce memory consumption
- Responsiveness: Faster initial response times
- API limits: Respect API rate limits and timeouts
- Offset: Starting position in the dataset
- Limit: Number of items to retrieve
- Cursor: Pointer to current position (cursor-based pagination)
- Total count: Total number of items available
The SDK uses GraphQL for pagination:
import asyncio
from infrahub_sdk import InfrahubClient
async def paginate_devices():
client = InfrahubClient()
offset = 0
limit = 100
all_devices = []
while True:
# Query with pagination
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
count
edges {
node {
id
name { value }
serial_number { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": limit}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
# Extract nodes
for edge in edges:
all_devices.append(edge["node"])
print(f"Fetched {len(all_devices)} devices...")
offset += limit
print(f"Total devices: {len(all_devices)}")
return all_devices
if __name__ == "__main__":
asyncio.run(paginate_devices())
Get Total Count
Fetch the total count before pagination:
async def get_device_count(client: InfrahubClient) -> int:
"""Get total device count."""
query = """
query GetDeviceCount {
InfraDevice {
count
}
}
"""
result = await client.execute_graphql(query=query)
return result["InfraDevice"]["count"]
# Usage
total = await get_device_count(client)
print(f"Total devices: {total}")
# Calculate number of pages
page_size = 100
pages = (total + page_size - 1) // page_size
print(f"Pages needed: {pages}")
Page-Based Iteration
Iterate through pages of results:
async def iterate_pages(
client: InfrahubClient,
page_size: int = 100
):
"""Iterate through pages of devices."""
offset = 0
page_num = 1
while True:
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": page_size}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
print(f"\nPage {page_num}:")
for edge in edges:
print(f" - {edge['node']['name']['value']}")
offset += page_size
page_num += 1
await iterate_pages(client, page_size=50)
Generator Pattern
Yield items as they’re fetched:
from typing import AsyncGenerator
async def device_generator(
client: InfrahubClient,
batch_size: int = 100
) -> AsyncGenerator:
"""Generate devices in batches."""
offset = 0
while True:
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
serial_number { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": batch_size}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
for edge in edges:
yield edge["node"]
offset += batch_size
# Usage
async def process_devices():
client = InfrahubClient()
count = 0
async for device in device_generator(client):
print(f"Processing: {device['name']['value']}")
count += 1
print(f"Processed {count} devices total")
await process_devices()
Batch Processing
Process items in batches:
async def process_in_batches(
client: InfrahubClient,
batch_size: int = 100,
process_func = None
):
"""Process devices in batches."""
offset = 0
total_processed = 0
while True:
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": batch_size}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
# Process batch
batch_devices = [edge["node"] for edge in edges]
if process_func:
await process_func(batch_devices)
total_processed += len(batch_devices)
print(f"Processed {total_processed} devices...")
offset += batch_size
return total_processed
# Usage
async def process_batch(devices):
for device in devices:
# Do something with each device
pass
total = await process_in_batches(
client,
batch_size=50,
process_func=process_batch
)
Paginate Filtered Results
Combine filtering with pagination:
async def paginate_filtered_devices(
client: InfrahubClient,
device_type: str,
batch_size: int = 100
):
"""Paginate devices filtered by type."""
offset = 0
filtered_devices = []
while True:
query = """
query GetFilteredDevices(
$device_type: String!,
$offset: Int!,
$limit: Int!
) {
InfraDevice(
device_type__value: $device_type,
offset: $offset,
limit: $limit
) {
edges {
node {
id
name { value }
device_type { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={
"device_type": device_type,
"offset": offset,
"limit": batch_size
}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
for edge in edges:
filtered_devices.append(edge["node"])
offset += batch_size
return filtered_devices
# Usage
routers = await paginate_filtered_devices(
client,
device_type="router",
batch_size=50
)
Parallel Page Fetching
Fetch multiple pages concurrently:
import asyncio
async def fetch_page(
client: InfrahubClient,
offset: int,
limit: int
):
"""Fetch a single page."""
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": limit}
)
return result["InfraDevice"]["edges"]
async def parallel_pagination(
client: InfrahubClient,
total_items: int,
page_size: int = 100,
max_concurrent: int = 5
):
"""Fetch pages in parallel."""
# Calculate page offsets
offsets = range(0, total_items, page_size)
# Fetch pages concurrently with limit
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(offset):
async with semaphore:
return await fetch_page(client, offset, page_size)
# Fetch all pages
page_results = await asyncio.gather(*[
fetch_with_semaphore(offset)
for offset in offsets
])
# Flatten results
all_devices = []
for page in page_results:
all_devices.extend([edge["node"] for edge in page])
return all_devices
# Usage
total = await get_device_count(client)
devices = await parallel_pagination(
client,
total_items=total,
page_size=100,
max_concurrent=5
)
Selective Field Loading
Only fetch required fields:
async def paginate_minimal_fields(
client: InfrahubClient,
batch_size: int = 100
):
"""Paginate with only essential fields."""
offset = 0
devices = []
while True:
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
# Only fetch what you need
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": batch_size}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
devices.extend([edge["node"] for edge in edges])
offset += batch_size
return devices
Progress Tracking
from rich.progress import Progress
async def paginate_with_progress(
client: InfrahubClient,
batch_size: int = 100
):
"""Paginate with progress bar."""
# Get total count
total = await get_device_count(client)
offset = 0
all_devices = []
with Progress() as progress:
task = progress.add_task(
"[cyan]Fetching devices...",
total=total
)
while offset < total:
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges {
node {
id
name { value }
}
}
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": batch_size}
)
edges = result["InfraDevice"]["edges"]
if not edges:
break
for edge in edges:
all_devices.append(edge["node"])
progress.update(task, advance=1)
offset += batch_size
return all_devices
Error Handling
Retry Failed Pages
import asyncio
from infrahub_sdk.exceptions import GraphQLError
async def fetch_page_with_retry(
client: InfrahubClient,
offset: int,
limit: int,
max_retries: int = 3
):
"""Fetch page with retry logic."""
for attempt in range(max_retries):
try:
query = """
query GetDevices($offset: Int!, $limit: Int!) {
InfraDevice(offset: $offset, limit: $limit) {
edges { node { id name { value } } }
}
}
"""
result = await client.execute_graphql(
query=query,
variables={"offset": offset, "limit": limit}
)
return result["InfraDevice"]["edges"]
except GraphQLError as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
else:
raise
Best Practices
Choose appropriate page size
Balance between fewer requests (large pages) and faster response (small pages). 50-200 items is often optimal.
Use generators for large datasets
Process items as they’re fetched instead of loading everything into memory.
Implement progress tracking
Show progress for long-running pagination operations.
Handle empty results gracefully
Always check if a page is empty before processing.
Consider parallel fetching
For read-only operations, fetch multiple pages concurrently.
Only fetch required fields
Reduce payload size by querying only needed fields.
Next Steps
Async Operations
Combine pagination with async patterns
Batch Operations
Process paginated data in batches
Error Handling
Handle pagination errors
Querying Data
Learn more about querying