Example Instrumentation

Examples of using span metrics to debug performance issues and monitor application behavior across Python applications.

This guide provides practical examples of using span attributes and metrics to solve common monitoring and debugging challenges in Python applications. Each example demonstrates how to instrument different components, showing how they work together within a distributed trace to provide end-to-end visibility.

Challenge: Understanding bottlenecks and failures in multi-step file processing operations across request handling and processing services.

Solution: Track the entire file processing pipeline with detailed metrics at each stage, from initial file handling through processing and storage.

Client Application Instrumentation:

Copied
# File upload request handling
import sentry_sdk
import time
from pathlib import Path
import requests
from contextlib import contextmanager

with sentry_sdk.start_span(op="upload", name="Upload File") as span:
    try:
        # Begin upload process with the requests library
        file_path = Path("/path/to/uploads/user-profile.jpg")
        
        # Track progress with a context manager
        @contextmanager
        def track_progress(total_size):
            start = time.time()
            bytes_sent = 0
            
            class ProgressTracker:
                def __call__(self, monitor):
                    nonlocal bytes_sent
                    bytes_sent = monitor.bytes_read
                    progress_percent = (bytes_sent / total_size) * 100
                    span.set_data("upload.percent_complete", progress_percent)
                    span.set_data("upload.bytes_transferred", bytes_sent)
            
            yield ProgressTracker()
            
            # Record total time after context exits
            span.set_data("upload.total_time_ms", (time.time() - start) * 1000)
        
        # Use the progress tracker with requests
        with track_progress(file_path.stat().st_size) as progress_callback:
            with open(file_path, "rb") as f:
                response = requests.post(
                    "https://api.example.com/upload",
                    files={"file": f},
                    headers={"X-Sentry-Trace": sentry_sdk.get_traceparent()},  # Propagate trace context
                    stream=True,
                    hooks={"response": progress_callback}
                )
            
            # Set final data after completion
            span.set_data("upload.success", response.ok)
            if response.ok:
                result = response.json()
                span.set_data("upload.server_file_id", result["file_id"])
            
            return response
        
    except Exception as error:
        # Record failure information
        span.set_data("upload.success", False)
        span.set_data("upload.error_type", error.__class__.__name__)
        span.set_data("upload.error_message", str(error))
        span.set_status(sentry_sdk.SpanStatus.ERROR)
        raise

Server Application Instrumentation:

Copied
# File processing service
import sentry_sdk
from pathlib import Path
import boto3

with sentry_sdk.start_span(
    op="file.process.service",
    name="File Processing Service"
) as span:
    # File processing implementation
    file_path = Path("/tmp/uploads/user-profile.jpg")
    
    # Process the file
    try:
        # Track individual processing steps
        with sentry_sdk.start_span(op="scan", name="Virus Scan") as scan_span:
            # Virus scan implementation
            scan_span.set_data("scan.engine", "clamav")
            scan_span.set_data("scan.result", "clean")
        
        # Upload to S3
        s3_client = boto3.client('s3', region_name='us-west-2')
        upload_start = time.time()
        s3_client.upload_file(
            str(file_path),
            'my-bucket',
            'uploads/user-profile.jpg'
        )
        
        span.set_data("storage.actual_upload_time_ms", 
                         (time.time() - upload_start) * 1000)
    
    except Exception as e:
        span.set_status(sentry_sdk.SpanStatus.ERROR)
        span.set_data("error.message", str(e))
        raise

How the Trace Works Together: The client application span initiates the trace and handles the file upload. It propagates the trace context to the server through the request headers. The server span continues the trace, processing the file and storing it. This creates a complete picture of the file's journey, allowing you to:

  • Identify bottlenecks at any stage (request preparation, network transfer, processing, storage)
  • Track end-to-end processing times and success rates
  • Monitor resource usage across the stack
  • Correlate request handling issues with processing service errors

Challenge: Managing cost (token usage) and performance of LLM integrations in Python applications.

Solution: Tracking of the entire LLM interaction flow, from initial request through response processing.

Client Application Instrumentation:

Copied
# LLM request handling in a Flask application
import sentry_sdk
import time
import openai
from flask import jsonify

@app.route("/ask", methods=["POST"])
def handle_llm_request():
    with sentry_sdk.start_span(op="llm", name="Generate Text") as span:
        start_time = time.time() * 1000  # Convert to milliseconds
        
        # Begin streaming response from LLM API
        user_input = request.json["question"]
        
        response_chunks = []
        first_token_received = False
        tokens_received = 0
        
        # Using OpenAI's streaming API
        try:
            for chunk in openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "user", "content": user_input}],
                stream=True
            ):
                tokens_received += 1
                content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "")
                response_chunks.append(content)
                
                # Record time to first token
                if not first_token_received and content:
                    first_token_received = True
                    time_to_first_token = (time.time() * 1000) - start_time
                    span.set_data("response.time_to_first_token_ms", time_to_first_token)
            
            # Record final metrics after stream completes
            total_request_time = (time.time() * 1000) - start_time
            
            span.set_data("response.total_time_ms", total_request_time)
            span.set_data("response.format", "text")
            span.set_data("response.tokens_received", tokens_received)
            
            return jsonify({
                "response": "".join(response_chunks),
                "tokens": tokens_received
            })
            
        except Exception as error:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("error.type", error.__class__.__name__)
            span.set_data("error.message", str(error))
            return jsonify({"error": str(error)}), 500

Server Application Instrumentation:

Copied
# LLM processing service (e.g., in a separate microservice)
import sentry_sdk
import time
import openai

def process_llm_request(request_data):
    with sentry_sdk.start_span(
        op="llm",
        name="Generate Text"
    ) as span:
        start_time = int(time.time() * 1000)  # Current time in milliseconds
        
        try:
            # Check rate limits before processing
            rate_limits = check_rate_limits()
            span.set_data("llm.rate_limit_remaining", rate_limits["remaining"])
            
            # Prepare the prompt with additional context
            prepared_prompt = enhance_prompt(request_data["question"])
            
            # Make the actual API call to the LLM provider
            response = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "system", "content": "You are a helpful assistant."},
                         {"role": "user", "content": prepared_prompt}],
                temperature=0.7,
                max_tokens=4096
            )
            
            # Track token usage and performance metrics
            span.set_data("llm.prompt_tokens", response.usage.prompt_tokens)
            span.set_data("llm.completion_tokens", response.usage.completion_tokens)
            span.set_data("llm.total_tokens", response.usage.total_tokens)
            span.set_data("llm.api_latency_ms", int(time.time() * 1000) - start_time)
            
            # Calculate and record cost based on token usage
            cost = calculate_cost(
                response.usage.prompt_tokens,
                response.usage.completion_tokens,
                "gpt-4"
            )
            span.set_data("llm.cost_usd", cost)
            
            return {
                "response": response.choices[0].message.content,
                "usage": response.usage
            }
            
        except Exception as error:
            # Track error information
            span.set_data("error", True)
            span.set_data("error.type", error.__class__.__name__)
            span.set_data("error.message", str(error))
            
            # Check if it's a rate limit error
            is_rate_limit = "rate_limit" in str(error).lower()
            span.set_data("error.is_rate_limit", is_rate_limit)
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            
            raise

How the Trace Works Together: The client application span captures the initial request handling, while the server span tracks the actual LLM API interaction. The distributed trace shows the complete flow from input to response, enabling you to:

  • Analyze end-to-end response times
  • Track costs and token usage patterns
  • Optimize API integration performance
  • Monitor rate limits and service quotas
  • Correlate user inputs with model performance

Challenge: Understanding the complete purchase flow and identifying revenue-impacting issues across the application stack.

Solution: Track the full transaction process from API request to order fulfillment.

Client Application Instrumentation:

Copied
# Django view handling checkout request
import sentry_sdk
import time
from django.views import View
from django.http import JsonResponse

class CheckoutView(View):
    def post(self, request):
        with sentry_sdk.start_span(op="order", name="Process Order") as span:
            # Validate the checkout request
            validation_start = time.time()
            validation_result = self.validate_checkout_data(request.POST)
            span.set_data("request.validation_time_ms", 
                             (time.time() - validation_start) * 1000)
            
            if not validation_result["valid"]:
                span.set_data("request.validation_success", False)
                span.set_data("request.validation_errors", validation_result["errors"])
                return JsonResponse({"errors": validation_result["errors"]}, status=400)
            
            # Process the order
            try:
                order_result = self.process_order(request)
                
                # Update span with order results
                span.set_data("order.id", order_result["order_id"])
                span.set_data("order.success", True)
                
                # Clear the cart and return success
                request.session["cart"] = []
                request.session["cart_total"] = 0
                
                return JsonResponse({"order_id": order_result["order_id"]})
                
            except Exception as e:
                span.set_status(sentry_sdk.SpanStatus.ERROR)
                span.set_data("order.success", False)
                span.set_data("error.message", str(e))
                return JsonResponse({"error": str(e)}, status=500)

Server Application Instrumentation:

Copied
# Order processing service
import sentry_sdk
import stripe
from decimal import Decimal

def process_order(order_data):
    with sentry_sdk.start_span(
        op="inventory",
        name="Check Inventory"
    ) as span:
        try:
            # Check inventory availability
            inventory_start = time.time()
            inventory_result = check_inventory(order_data["items"])
            span.set_data("inventory.check_time_ms", 
                             (time.time() - inventory_start) * 1000)
            span.set_data("inventory.all_available", inventory_result["all_available"])
            
            if not inventory_result["all_available"]:
                span.set_data("inventory.unavailable_items", 
                                 inventory_result["unavailable_items"])
                raise ValueError("Some items are out of stock")
            
            # Process payment via Stripe
            payment_start = time.time()
            stripe.api_key = "sk_test_..."
            payment_intent = stripe.PaymentIntent.create(
                amount=int(Decimal(order_data["total"]) * 100),  # Convert to cents
                currency="usd",
                payment_method=order_data["payment_method_id"],
                confirm=True
            )
            
            span.set_data("payment.processing_time_ms", 
                             (time.time() - payment_start) * 1000)
            span.set_data("payment.transaction_id", payment_intent.id)
            span.set_data("payment.success", payment_intent.status == "succeeded")
            
            # Create fulfillment record
            fulfillment = create_fulfillment(order_data["order_id"])
            span.set_data("fulfillment.id", fulfillment["id"])
            span.set_data("fulfillment.warehouse", fulfillment["warehouse"])
            span.set_data("fulfillment.shipping_method", fulfillment["shipping_method"])
            span.set_data("fulfillment.estimated_delivery", 
                             fulfillment["estimated_delivery"].isoformat())
            
            return {
                "success": True,
                "order_id": order_data["order_id"],
                "payment_id": payment_intent.id,
                "fulfillment_id": fulfillment["id"]
            }
            
        except Exception as e:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("error.message", str(e))
            span.set_data("order.success", False)
            raise

How the Trace Works Together: The client application span tracks the initial order request, while the server span handles order processing and fulfillment. The distributed trace provides visibility into the entire purchase flow, allowing you to:

  • Analyze transaction performance and success rates
  • Track payment processing timing and errors
  • Monitor inventory availability impact on conversions
  • Measure end-to-end order completion times
  • Identify friction points in the transaction process

Challenge: Understanding performance and reliability of distributed data processing pipelines, from job submission through completion.

Solution: Comprehensive tracking of job lifecycle across queue management, processing stages, and worker performance.

Client Application Instrumentation:

Copied
# Celery task submission
import sentry_sdk
from celery import shared_task
import time
from datetime import datetime, timedelta

def submit_processing_job(data_file_path, priority="medium"):
    with sentry_sdk.start_span(op="job", name="Process Job") as span:
        # Submit job to Celery
        try:
            # Configure task and submit
            task = process_data_file.apply_async(
                args=[str(data_file_path)],
                kwargs={"priority": priority},
                queue="data_processing"
            )
            
            span.set_data("job.id", task.id)
            span.set_data("job.submission_success", True)
            
            # Start monitoring task progress
            monitoring_result = setup_task_monitoring(task.id)
            span.set_data("monitor.callback_url", monitoring_result["callback_url"])
            
            return {
                "job_id": task.id,
                "status": "submitted",
                "monitoring_url": monitoring_result["status_url"]
            }
            
        except Exception as e:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("job.submission_success", False)
            span.set_data("error.message", str(e))
            raise

@shared_task(name="tasks.process_data_file")
def process_data_file(file_path, priority="medium"):
    with sentry_sdk.start_span(
        op="process",
        name="Process Data"
    ) as span:
        try:
            # Processing implementation with stage tracking
            span.set_data("processing.current_stage", "parse")
            data = parse_data_file(file_path)
            
            span.set_data("processing.current_stage", "transform")
            transformed_data = transform_data(data)
            
            span.set_data("processing.current_stage", "validate")
            validation_result = validate_data(transformed_data)
            span.set_data("validation.errors_count", len(validation_result["errors"]))
            
            span.set_data("processing.current_stage", "export")
            export_result = export_processed_data(transformed_data)
            
            # Record resource utilization
            span.set_data("resource.cpu_percent", psutil.cpu_percent())
            span.set_data("resource.memory_used_mb", 
                             psutil.Process().memory_info().rss / (1024 * 1024))
            
            # Update final job outcome
            span.set_data("outcome.status", "completed")
            span.set_data("outcome.records_processed", len(data))
            span.set_data("outcome.output_size_bytes", 
                             os.path.getsize(export_result["output_path"]))
            
            return {
                "success": True,
                "records_processed": len(data),
                "output_path": export_result["output_path"]
            }
            
        except Exception as e:
            span.set_status(sentry_sdk.SpanStatus.ERROR)
            span.set_data("outcome.status", "failed")
            span.set_data("error.message", str(e))
            span.set_data("error.stage", span.get_data("processing.current_stage"))
            raise

How the Trace Works Together: This example shows both the job submission and processing as a single trace, which is common in Celery/distributed task patterns. The spans track the entire job lifecycle, enabling you to:

  • Monitor task queue health and processing times
  • Track worker resource utilization
  • Identify bottlenecks in specific processing stages
  • Analyze job scheduling efficiency and queue wait times
  • Monitor data throughput and error rates

For more information about implementing these examples effectively, see our Span Metrics guide which includes detailed best practices and implementation guidelines.

Was this helpful?
Help improve this content
Our documentation is open source and available on GitHub. Your contributions are welcome, whether fixing a typo (drat!) or suggesting an update ("yeah, this would be better").