---
title: "Instrument Queues"
description: "Learn how to manually instrument your code to use Sentry's Queues module. "
url: https://docs.sentry.io/platforms/python/tracing/instrumentation/custom-instrumentation/queues-module/
---

# Instrument Queues | Sentry for Python

Sentry comes with a [queue-monitoring dashboard](https://sentry.io/orgredirect/organizations/:orgslug/insights/backend/queues/) that can be auto-instrumented for popular Python queue setups (like [Celery](https://docs.sentry.io/platforms/python/integrations/celery.md)).

In case yours isn't supported, you can still instrument custom spans and transactions around your queue producers and consumers to ensure that you have performance data about your messaging queues.

## [Producer Instrumentation](https://docs.sentry.io/platforms/python/tracing/instrumentation/custom-instrumentation/queues-module.md#producer-instrumentation)

To start capturing performance metrics, use the `sentry_sdk.start_span()` function to wrap your queue producer events. Your span `op` must be set to `queue.publish`. Include the following span data to enrich your producer spans with queue metrics:

| Data Attribute                | Type   | Description                       |
| ----------------------------- | ------ | --------------------------------- |
| `messaging.message.id`        | string | The message identifier            |
| `messaging.destination.name`  | string | The queue or topic name           |
| `messaging.message.body.size` | int    | Size of the message body in bytes |

Your `queue.publish` span must exist inside a transaction in order to be recognized as a producer span. If you are using a [supported web framework](https://docs.sentry.io/platforms/python/integrations.md#web-frameworks), the transaction is created by the integration. If you use plain Python, you can start a new one using `sentry_sdk.start_transaction()`.

You must also include trace headers (`sentry-trace` and `baggage`) in your message so that your consumers can continue your trace once your message is picked up.

```python
from datetime import datetime, timezone

import sentry_sdk
import my_custom_queue

# Initialize Sentry
sentry_sdk.init(...)

connection = my_custom_queue.connect()

# The message you want to send to the queue
queue = "messages"
message = "Hello World!"
message_id = "abc123"

# Create transaction
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
with sentry_sdk.start_transaction(
    op="function",
    name="queue_producer_transaction",
):
    # Create the span
    with sentry_sdk.start_span(
        op="queue.publish",
        name="queue_producer",
    ) as span:
        # Set span data
        span.set_data("messaging.message.id", message_id)
        span.set_data("messaging.destination.name", queue)
        span.set_data("messaging.message.body.size", len(message.encode("utf-8")))

        # Publish the message to the queue (including trace information and current time stamp)
        now = int(datetime.now(timezone.utc).timestamp())
        connection.publish(
            queue=queue,
            body=message,
            timestamp=now,
            headers={
                "sentry-trace": sentry_sdk.get_traceparent(),
                "baggage": sentry_sdk.get_baggage(),
            },
        )
```

## [Consumer Instrumentation](https://docs.sentry.io/platforms/python/tracing/instrumentation/custom-instrumentation/queues-module.md#consumer-instrumentation)

To start capturing performance metrics, use the `sentry_sdk.start_span()` function to wrap your queue consumers. Your span `op` must be set to `queue.process`. Include the following span data to enrich your consumer spans with queue metrics:

| Data Attribute                      | Type   | Description                                                         |
| ----------------------------------- | ------ | ------------------------------------------------------------------- |
| `messaging.message.id`              | string | The message identifier                                              |
| `messaging.destination.name`        | string | The queue or topic name                                             |
| `messaging.message.body.size`       | number | Size of the message body in bytes                                   |
| `messaging.message.retry.count`     | number | The number of times a message was attempted to be processed         |
| `messaging.message.receive.latency` | number | The time in milliseconds that a message awaited processing in queue |

Your `queue.process` span must exist inside a transaction in order to be recognized as a consumer span. If you are using a [supported web framework](https://docs.sentry.io/platforms/python/integrations.md#web-frameworks), the transaction is created by the integration. If you use plain Python, you can start a new one using `sentry_sdk.start_transaction()`.

Use `sentry_sdk.continue_trace()` to connect your consumer spans to their associated producer spans, and `transaction.set_status()` to mark the trace of your message as success or failed.

```python
from datetime import datetime, timezone

import sentry_sdk
import my_custom_queue

# Initialize Sentry
sentry_sdk.init(...)

connection = my_custom_queue.connect()

# Pick up message from queues
queue = "messages"
message = connection.consume(queue=queue)

# Calculate latency (optional, but valuable)
now = datetime.now(timezone.utc)
message_time = datetime.fromtimestamp(message["timestamp"], timezone.utc)
latency = now - message_time

# Continue the trace started in the producer
# If you are using a web framework, the framework integration
# will create this for you and you can omit this.
transaction = sentry_sdk.continue_trace(
    message["headers"],
    op="function",
    name="queue_consumer_transaction",
)
with sentry_sdk.start_transaction(transaction):
    # Create the span
    with sentry_sdk.start_span(
        op="queue.process",
        name="queue_consumer",
    ) as span:
        # Set span data
        span.set_data("messaging.message.id", message["message_id"])
        span.set_data("messaging.destination.name", queue)
        span.set_data("messaging.message.body.size", message["body"])
        span.set_data("messaging.message.receive.latency", latency)
        span.set_data("messaging.message.retry.count", 0)

        try:
            # Process the message
            process_message(message)
        except Exception:
            # In case of an error set the status to "internal_error"
            transaction.set_status("internal_error")
```
