Example Instrumentation
Examples of using span metrics to debug performance issues and monitor application behavior across Python applications.
These examples assume you have already set up tracing in your application.
This guide provides practical examples of using span attributes and metrics to solve common monitoring and debugging challenges in Python applications. Each example demonstrates how to instrument different components, showing how they work together within a distributed trace to provide end-to-end visibility.
Challenge: Understanding bottlenecks and failures in multi-step file processing operations across request handling and processing services.
Solution: Track the entire file processing pipeline with detailed metrics at each stage, from initial file handling through processing and storage.
Client Application Instrumentation:
# File upload request handling
import sentry_sdk
import time
from pathlib import Path
import requests
from contextlib import contextmanager
with sentry_sdk.start_span(op="upload", name="Upload File") as span:
try:
# Begin upload process with the requests library
file_path = Path("/path/to/uploads/user-profile.jpg")
# Track progress with a context manager
@contextmanager
def track_progress(total_size):
start = time.time()
bytes_sent = 0
class ProgressTracker:
def __call__(self, monitor):
nonlocal bytes_sent
bytes_sent = monitor.bytes_read
progress_percent = (bytes_sent / total_size) * 100
span.set_data("upload.percent_complete", progress_percent)
span.set_data("upload.bytes_transferred", bytes_sent)
yield ProgressTracker()
# Record total time after context exits
span.set_data("upload.total_time_ms", (time.time() - start) * 1000)
# Use the progress tracker with requests
with track_progress(file_path.stat().st_size) as progress_callback:
with open(file_path, "rb") as f:
response = requests.post(
"https://api.example.com/upload",
files={"file": f},
headers={"X-Sentry-Trace": sentry_sdk.get_traceparent()}, # Propagate trace context
stream=True,
hooks={"response": progress_callback}
)
# Set final data after completion
span.set_data("upload.success", response.ok)
if response.ok:
result = response.json()
span.set_data("upload.server_file_id", result["file_id"])
return response
except Exception as error:
# Record failure information
span.set_data("upload.success", False)
span.set_data("upload.error_type", error.__class__.__name__)
span.set_data("upload.error_message", str(error))
span.set_status(sentry_sdk.SpanStatus.ERROR)
raise
Server Application Instrumentation:
# File processing service
import sentry_sdk
from pathlib import Path
import boto3
with sentry_sdk.start_span(
op="file.process.service",
name="File Processing Service"
) as span:
# File processing implementation
file_path = Path("/tmp/uploads/user-profile.jpg")
# Process the file
try:
# Track individual processing steps
with sentry_sdk.start_span(op="scan", name="Virus Scan") as scan_span:
# Virus scan implementation
scan_span.set_data("scan.engine", "clamav")
scan_span.set_data("scan.result", "clean")
# Upload to S3
s3_client = boto3.client('s3', region_name='us-west-2')
upload_start = time.time()
s3_client.upload_file(
str(file_path),
'my-bucket',
'uploads/user-profile.jpg'
)
span.set_data("storage.actual_upload_time_ms",
(time.time() - upload_start) * 1000)
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("error.message", str(e))
raise
How the Trace Works Together: The client application span initiates the trace and handles the file upload. It propagates the trace context to the server through the request headers. The server span continues the trace, processing the file and storing it. This creates a complete picture of the file's journey, allowing you to:
- Identify bottlenecks at any stage (request preparation, network transfer, processing, storage)
- Track end-to-end processing times and success rates
- Monitor resource usage across the stack
- Correlate request handling issues with processing service errors
Challenge: Managing cost (token usage) and performance of LLM integrations in Python applications.
Solution: Tracking of the entire LLM interaction flow, from initial request through response processing.
Client Application Instrumentation:
# LLM request handling in a Flask application
import sentry_sdk
import time
import openai
from flask import jsonify
@app.route("/ask", methods=["POST"])
def handle_llm_request():
with sentry_sdk.start_span(op="llm", name="Generate Text") as span:
start_time = time.time() * 1000 # Convert to milliseconds
# Begin streaming response from LLM API
user_input = request.json["question"]
response_chunks = []
first_token_received = False
tokens_received = 0
# Using OpenAI's streaming API
try:
for chunk in openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": user_input}],
stream=True
):
tokens_received += 1
content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
response_chunks.append(content)
# Record time to first token
if not first_token_received and content:
first_token_received = True
time_to_first_token = (time.time() * 1000) - start_time
span.set_data("response.time_to_first_token_ms", time_to_first_token)
# Record final metrics after stream completes
total_request_time = (time.time() * 1000) - start_time
span.set_data("response.total_time_ms", total_request_time)
span.set_data("response.format", "text")
span.set_data("response.tokens_received", tokens_received)
return jsonify({
"response": "".join(response_chunks),
"tokens": tokens_received
})
except Exception as error:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("error.type", error.__class__.__name__)
span.set_data("error.message", str(error))
return jsonify({"error": str(error)}), 500
Server Application Instrumentation:
# LLM processing service (e.g., in a separate microservice)
import sentry_sdk
import time
import openai
def process_llm_request(request_data):
with sentry_sdk.start_span(
op="llm",
name="Generate Text"
) as span:
start_time = int(time.time() * 1000) # Current time in milliseconds
try:
# Check rate limits before processing
rate_limits = check_rate_limits()
span.set_data("llm.rate_limit_remaining", rate_limits["remaining"])
# Prepare the prompt with additional context
prepared_prompt = enhance_prompt(request_data["question"])
# Make the actual API call to the LLM provider
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prepared_prompt}],
temperature=0.7,
max_tokens=4096
)
# Track token usage and performance metrics
span.set_data("llm.prompt_tokens", response.usage.prompt_tokens)
span.set_data("llm.completion_tokens", response.usage.completion_tokens)
span.set_data("llm.total_tokens", response.usage.total_tokens)
span.set_data("llm.api_latency_ms", int(time.time() * 1000) - start_time)
# Calculate and record cost based on token usage
cost = calculate_cost(
response.usage.prompt_tokens,
response.usage.completion_tokens,
"gpt-4"
)
span.set_data("llm.cost_usd", cost)
return {
"response": response.choices[0].message.content,
"usage": response.usage
}
except Exception as error:
# Track error information
span.set_data("error", True)
span.set_data("error.type", error.__class__.__name__)
span.set_data("error.message", str(error))
# Check if it's a rate limit error
is_rate_limit = "rate_limit" in str(error).lower()
span.set_data("error.is_rate_limit", is_rate_limit)
span.set_status(sentry_sdk.SpanStatus.ERROR)
raise
How the Trace Works Together: The client application span captures the initial request handling, while the server span tracks the actual LLM API interaction. The distributed trace shows the complete flow from input to response, enabling you to:
- Analyze end-to-end response times
- Track costs and token usage patterns
- Optimize API integration performance
- Monitor rate limits and service quotas
- Correlate user inputs with model performance
Challenge: Understanding the complete purchase flow and identifying revenue-impacting issues across the application stack.
Solution: Track the full transaction process from API request to order fulfillment.
Client Application Instrumentation:
# Django view handling checkout request
import sentry_sdk
import time
from django.views import View
from django.http import JsonResponse
class CheckoutView(View):
def post(self, request):
with sentry_sdk.start_span(op="order", name="Process Order") as span:
# Validate the checkout request
validation_start = time.time()
validation_result = self.validate_checkout_data(request.POST)
span.set_data("request.validation_time_ms",
(time.time() - validation_start) * 1000)
if not validation_result["valid"]:
span.set_data("request.validation_success", False)
span.set_data("request.validation_errors", validation_result["errors"])
return JsonResponse({"errors": validation_result["errors"]}, status=400)
# Process the order
try:
order_result = self.process_order(request)
# Update span with order results
span.set_data("order.id", order_result["order_id"])
span.set_data("order.success", True)
# Clear the cart and return success
request.session["cart"] = []
request.session["cart_total"] = 0
return JsonResponse({"order_id": order_result["order_id"]})
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("order.success", False)
span.set_data("error.message", str(e))
return JsonResponse({"error": str(e)}, status=500)
Server Application Instrumentation:
# Order processing service
import sentry_sdk
import stripe
from decimal import Decimal
def process_order(order_data):
with sentry_sdk.start_span(
op="inventory",
name="Check Inventory"
) as span:
try:
# Check inventory availability
inventory_start = time.time()
inventory_result = check_inventory(order_data["items"])
span.set_data("inventory.check_time_ms",
(time.time() - inventory_start) * 1000)
span.set_data("inventory.all_available", inventory_result["all_available"])
if not inventory_result["all_available"]:
span.set_data("inventory.unavailable_items",
inventory_result["unavailable_items"])
raise ValueError("Some items are out of stock")
# Process payment via Stripe
payment_start = time.time()
stripe.api_key = "sk_test_..."
payment_intent = stripe.PaymentIntent.create(
amount=int(Decimal(order_data["total"]) * 100), # Convert to cents
currency="usd",
payment_method=order_data["payment_method_id"],
confirm=True
)
span.set_data("payment.processing_time_ms",
(time.time() - payment_start) * 1000)
span.set_data("payment.transaction_id", payment_intent.id)
span.set_data("payment.success", payment_intent.status == "succeeded")
# Create fulfillment record
fulfillment = create_fulfillment(order_data["order_id"])
span.set_data("fulfillment.id", fulfillment["id"])
span.set_data("fulfillment.warehouse", fulfillment["warehouse"])
span.set_data("fulfillment.shipping_method", fulfillment["shipping_method"])
span.set_data("fulfillment.estimated_delivery",
fulfillment["estimated_delivery"].isoformat())
return {
"success": True,
"order_id": order_data["order_id"],
"payment_id": payment_intent.id,
"fulfillment_id": fulfillment["id"]
}
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("error.message", str(e))
span.set_data("order.success", False)
raise
How the Trace Works Together: The client application span tracks the initial order request, while the server span handles order processing and fulfillment. The distributed trace provides visibility into the entire purchase flow, allowing you to:
- Analyze transaction performance and success rates
- Track payment processing timing and errors
- Monitor inventory availability impact on conversions
- Measure end-to-end order completion times
- Identify friction points in the transaction process
Challenge: Understanding performance and reliability of distributed data processing pipelines, from job submission through completion.
Solution: Comprehensive tracking of job lifecycle across queue management, processing stages, and worker performance.
Client Application Instrumentation:
# Celery task submission
import sentry_sdk
from celery import shared_task
import time
from datetime import datetime, timedelta
def submit_processing_job(data_file_path, priority="medium"):
with sentry_sdk.start_span(op="job", name="Process Job") as span:
# Submit job to Celery
try:
# Configure task and submit
task = process_data_file.apply_async(
args=[str(data_file_path)],
kwargs={"priority": priority},
queue="data_processing"
)
span.set_data("job.id", task.id)
span.set_data("job.submission_success", True)
# Start monitoring task progress
monitoring_result = setup_task_monitoring(task.id)
span.set_data("monitor.callback_url", monitoring_result["callback_url"])
return {
"job_id": task.id,
"status": "submitted",
"monitoring_url": monitoring_result["status_url"]
}
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("job.submission_success", False)
span.set_data("error.message", str(e))
raise
@shared_task(name="tasks.process_data_file")
def process_data_file(file_path, priority="medium"):
with sentry_sdk.start_span(
op="process",
name="Process Data"
) as span:
try:
# Processing implementation with stage tracking
span.set_data("processing.current_stage", "parse")
data = parse_data_file(file_path)
span.set_data("processing.current_stage", "transform")
transformed_data = transform_data(data)
span.set_data("processing.current_stage", "validate")
validation_result = validate_data(transformed_data)
span.set_data("validation.errors_count", len(validation_result["errors"]))
span.set_data("processing.current_stage", "export")
export_result = export_processed_data(transformed_data)
# Record resource utilization
span.set_data("resource.cpu_percent", psutil.cpu_percent())
span.set_data("resource.memory_used_mb",
psutil.Process().memory_info().rss / (1024 * 1024))
# Update final job outcome
span.set_data("outcome.status", "completed")
span.set_data("outcome.records_processed", len(data))
span.set_data("outcome.output_size_bytes",
os.path.getsize(export_result["output_path"]))
return {
"success": True,
"records_processed": len(data),
"output_path": export_result["output_path"]
}
except Exception as e:
span.set_status(sentry_sdk.SpanStatus.ERROR)
span.set_data("outcome.status", "failed")
span.set_data("error.message", str(e))
span.set_data("error.stage", span.get_data("processing.current_stage"))
raise
How the Trace Works Together: This example shows both the job submission and processing as a single trace, which is common in Celery/distributed task patterns. The spans track the entire job lifecycle, enabling you to:
- Monitor task queue health and processing times
- Track worker resource utilization
- Identify bottlenecks in specific processing stages
- Analyze job scheduling efficiency and queue wait times
- Monitor data throughput and error rates
For more information about implementing these examples effectively, see our Span Metrics guide which includes detailed best practices and implementation guidelines.
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").