Documentation Index
Fetch the complete documentation index at: https://mintlify.com/OpsMill/infrahub/llms.txt
Use this file to discover all available pages before exploring further.
Infrahub uses Prefect as its workflow orchestration engine, enabling sophisticated CI/CD pipelines, scheduled tasks, and event-driven automation.
Workflow Architecture
Infrahub workflows are Prefect flows defined in the backend:
# backend/infrahub/git/tasks.py
from prefect import flow, task
@flow(name="git-repository-add-read-write")
async def add_git_repository(model: GitRepositoryAdd) -> None:
# Workflow logic
...
@task(name="git-branch-create", cache_policy=NONE)
async def git_branch_create(...) -> None:
# Task logic
...
Workflow Types
Workflows are categorized by type:
# backend/infrahub/workflows/constants.py
class WorkflowType(StrEnum):
CORE = "core" # Core system workflows
USER = "user" # User-triggered workflows
INTERNAL = "internal" # Background/scheduled workflows
Workflow Catalogue
All workflows are registered in the catalogue:
# backend/infrahub/workflows/catalogue.py
GIT_REPOSITORY_ADD = WorkflowDefinition(
name="git-repository-add-read-write",
type=WorkflowType.CORE,
module="infrahub.git.tasks",
function="add_git_repository",
tags=[WorkflowTag.DATABASE_CHANGE],
)
GIT_REPOSITORIES_SYNC = WorkflowDefinition(
name="git_repositories_sync",
type=WorkflowType.INTERNAL,
cron="* * * * *", # Every minute
module="infrahub.git.tasks",
function="sync_remote_repositories",
concurrency_limit=1,
)
Key Workflow Definitions
Git Operations
GIT_REPOSITORY_ADD: Add and clone repository
GIT_REPOSITORIES_SYNC: Sync all repositories (scheduled)
GIT_REPOSITORIES_CREATE_BRANCH: Create branch across repos
GIT_REPOSITORIES_MERGE: Merge Git branches
GIT_REPOSITORY_USER_CHECKS_TRIGGER: Run user-defined checks
Artifact Generation
REQUEST_ARTIFACT_DEFINITION_GENERATE: Trigger artifact generation
REQUEST_ARTIFACT_GENERATE: Generate single artifact
TRIGGER_ARTIFACT_DEFINITION_GENERATE: Batch artifact generation
Branch Operations
BRANCH_CREATE: Create new branch
BRANCH_MERGE: Merge branch
BRANCH_REBASE: Rebase branch
BRANCH_DELETE: Delete branch
BRANCH_VALIDATE: Validate branch state
Proposed Changes
PROPOSED_CHANGE_MERGE: Merge proposed change
REQUEST_PROPOSED_CHANGE_PIPELINE: Run full validation pipeline
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY: Data integrity checks
REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY: Schema integrity checks
Creating Workflows
Define a Flow
from prefect import flow
from infrahub.workflows.utils import add_tags
@flow(
name="custom-validation-workflow",
flow_run_name="Validate {object_kind} objects"
)
async def validate_objects(
object_kind: str,
branch_name: str,
) -> None:
# Add branch tag for filtering
await add_tags(branches=[branch_name])
# Your workflow logic
client = get_client()
objects = await client.filters(kind=object_kind, branch=branch_name)
for obj in objects:
# Validate each object
await validate_single_object(obj)
Define Tasks
from prefect import task
from prefect.cache_policies import NONE
@task(
name="validate-single-object",
task_run_name="Validate {obj.display_label}",
cache_policy=NONE,
retries=3,
)
async def validate_single_object(obj) -> bool:
# Task logic
if not obj.name.value:
return False
return True
Register Workflow
# backend/infrahub/workflows/catalogue.py
CUSTOM_VALIDATION = WorkflowDefinition(
name="custom-validation-workflow",
type=WorkflowType.USER,
module="myapp.workflows.validation",
function="validate_objects",
tags=[WorkflowTag.DATABASE_CHANGE],
)
# Add to WORKFLOWS list
WORKFLOWS = [
# ... existing workflows
CUSTOM_VALIDATION,
]
Submitting Workflows
Via Workflow Service
from infrahub.workers.dependencies import get_workflow
workflow_service = get_workflow()
# Submit workflow (fire and forget)
await workflow_service.submit_workflow(
workflow=CUSTOM_VALIDATION,
context=context,
parameters={
"object_kind": "NetworkDevice",
"branch_name": "main",
},
)
Execute and Wait
# Execute and wait for result
result = await workflow_service.execute_workflow(
workflow=CUSTOM_VALIDATION,
parameters={
"object_kind": "NetworkDevice",
"branch_name": "main",
},
expected_return=bool,
)
Batch Execution
client = get_client()
batch = await client.create_batch()
for device in devices:
batch.add(
task=validate_device,
device_id=device.id,
branch=branch_name,
)
async for result, error in batch.execute():
if error:
log.error(f"Validation failed: {error}")
else:
log.info(f"Validation passed: {result}")
Workflow Composition
Parent-Child Flows
@flow(name="parent-workflow")
async def parent_workflow(branch: str) -> None:
# Parent flow can spawn child flows
await child_workflow_1(branch=branch)
await child_workflow_2(branch=branch)
@flow(name="child-workflow-1")
async def child_workflow_1(branch: str) -> None:
# Child flow logic
...
Task Dependencies
@flow(name="sequential-workflow")
async def sequential_workflow() -> None:
# Tasks run sequentially
result1 = await task1()
result2 = await task2(result1)
await task3(result2)
@flow(name="parallel-workflow")
async def parallel_workflow() -> None:
# Tasks can run in parallel
batch = await client.create_batch()
batch.add(task=task1)
batch.add(task=task2)
batch.add(task=task3)
async for result, error in batch.execute():
# Process results
...
Scheduled Workflows
Cron Schedule
GIT_REPOSITORIES_SYNC = WorkflowDefinition(
name="git_repositories_sync",
type=WorkflowType.INTERNAL,
cron="* * * * *", # Every minute
module="infrahub.git.tasks",
function="sync_remote_repositories",
)
ANONYMOUS_TELEMETRY_SEND = WorkflowDefinition(
name="anonymous_telemetry_send",
type=WorkflowType.INTERNAL,
cron=f"{random.randint(0, 59)} 2 * * *", # Daily at 2:XX AM
module="infrahub.telemetry.tasks",
function="send_telemetry_push",
)
Scheduled Examples
- Repository Sync: Every minute (
* * * * *)
- Telemetry: Daily at random time
- Webhook Config: Daily at 3:XX AM
- Lock Cleanup: Every minute
Concurrency Control
Limit Concurrent Runs
GIT_REPOSITORIES_SYNC = WorkflowDefinition(
name="git_repositories_sync",
type=WorkflowType.INTERNAL,
cron="* * * * *",
module="infrahub.git.tasks",
function="sync_remote_repositories",
concurrency_limit=1, # Only one run at a time
concurrency_limit_strategy=ConcurrencyLimitStrategy.CANCEL_NEW,
)
Lock Resources
from infrahub import lock
@flow(name="modify-repository")
async def modify_repository(repo_name: str) -> None:
# Acquire lock for repository
async with lock.registry.get(name=repo_name, namespace="repository"):
# Only one workflow can modify this repo at a time
await perform_repository_operation(repo_name)
Error Handling
Task Retries
@task(
name="unreliable-task",
retries=3, # Retry up to 3 times
retry_delay_seconds=10, # Wait 10s between retries
)
async def unreliable_task() -> None:
# Task that might fail
...
Exception Handling
@flow(name="resilient-workflow")
async def resilient_workflow() -> None:
try:
await risky_operation()
except RepositoryError as exc:
log.error(f"Repository error: {exc.message}")
# Handle or re-raise
raise
except Exception as exc:
log.exception("Unexpected error")
# Cleanup
await cleanup()
raise
Tags help categorize and filter workflows:
# backend/infrahub/workflows/constants.py
class WorkflowTag(StrEnum):
DATABASE_CHANGE = "database-change"
WORKFLOWTYPE = "workflow-type"
WORKFLOW_WITH_TAGS = WorkflowDefinition(
name="important-workflow",
type=WorkflowType.CORE,
tags=[WorkflowTag.DATABASE_CHANGE],
module="...",
function="...",
)
Tag workflow runs with branches:
from infrahub.workflows.utils import add_tags, add_branch_tag
@flow(name="branch-specific-workflow")
async def branch_workflow(branch: str) -> None:
# Tag with branch
await add_branch_tag(branch_name=branch)
# Or tag with multiple resources
await add_tags(
branches=[branch],
nodes=["node-id-1", "node-id-2"],
)
Workflow Models
WorkflowDefinition
# backend/infrahub/workflows/models.py
class WorkflowDefinition(BaseModel):
name: str
type: WorkflowType = WorkflowType.INTERNAL
module: str # Python module path
function: str # Function name
cron: str | None = None # Optional schedule
tags: list[WorkflowTag] = Field(default_factory=list)
concurrency_limit: int | None = None
concurrency_limit_strategy: ConcurrencyLimitStrategy | None = None
WorkflowInfo
class WorkflowInfo(BaseModel):
id: UUID # Prefect flow run ID
info: FlowRun | None = None # Prefect flow run info
Real-World Examples
Proposed Change Pipeline
# backend/infrahub/proposed_change/tasks.py
@flow(name="proposed-changed-pipeline")
async def run_proposed_change_pipeline(
proposed_change_id: str,
source_branch: str,
destination_branch: str,
) -> None:
"""Complete validation pipeline for proposed changes."""
# Run data integrity checks
await workflow.submit_workflow(
workflow=REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
parameters={"proposed_change_id": proposed_change_id},
)
# Run schema integrity checks
await workflow.submit_workflow(
workflow=REQUEST_PROPOSED_CHANGE_SCHEMA_INTEGRITY,
parameters={"proposed_change_id": proposed_change_id},
)
# Run repository checks
await workflow.submit_workflow(
workflow=REQUEST_PROPOSED_CHANGE_REPOSITORY_CHECKS,
parameters={
"proposed_change_id": proposed_change_id,
"source_branch": source_branch,
"destination_branch": destination_branch,
},
)
# Refresh artifacts
await workflow.submit_workflow(
workflow=REQUEST_PROPOSED_CHANGE_REFRESH_ARTIFACTS,
parameters={"proposed_change_id": proposed_change_id},
)
Artifact Generation
# backend/infrahub/git/tasks.py
@flow(name="request_artifact_definitions_generate")
async def generate_request_artifact_definition(
model: RequestArtifactDefinitionGenerate,
context: InfrahubContext,
) -> None:
"""Generate all artifacts for a definition."""
client = get_client()
artifact_definition = await client.get(
kind=CoreArtifactDefinition,
id=model.artifact_definition_id,
branch=model.branch,
)
# Get targets from group
group = await fetch_artifact_definition_targets(...)
# Submit generation task for each target
batch = await client.create_batch()
for member in group.members.peers:
batch.add(
task=workflow.submit_workflow,
workflow=REQUEST_ARTIFACT_GENERATE,
parameters={"model": create_artifact_model(member)},
)
async for _, _ in batch.execute():
pass
Best Practices
Design Principles
- Single Responsibility: Each workflow should have one clear purpose
- Idempotent: Workflows should be safe to retry
- Composable: Build complex workflows from simple tasks
- Observable: Add logging and tags for monitoring
- Use batch operations for parallel execution
- Set appropriate concurrency limits
- Cache task results when appropriate
- Use task retries for transient failures
Monitoring
- Use descriptive flow and task run names
- Add tags for filtering in Prefect UI
- Log important state changes
- Track execution times for bottlenecks
Troubleshooting
Workflow Not Running
- Check workflow is registered in catalogue
- Verify worker pool is running
- Check Prefect server connectivity
- Review workflow deployment status
Task Failures
- Review task logs in Prefect UI
- Check error messages and stack traces
- Verify input parameters
- Test task function independently
- Profile workflow execution times
- Identify slow tasks
- Parallelize independent operations
- Optimize database queries