When it comes to squeezing the last drops of performance out of Google Pub/Sub, its official Python client might not be the right technology for your project. However, the appeal and ease of use of the language might still draw you towards it. Hence, it’s important to understand where the limitations lie and what can be done to fine-tune your integration with Google’s aforementioned service.
Threads and processes
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
message.ack()
subscriber = pubsub_v1.SubscriberClient()
streaming_pull_future = subscriber.subscribe("<subscription_path>", callback=callback)
with subscriber:
try:
streaming_pull_future.result()
except TimeoutError:
streaming_pull_future.cancel()
streaming
First, it’s worth mentioning that out of the box, the client uses sane defaults as subscription properties. What you see above is a basic snippet that can be found across Google’s Pub/Sub documentation. It subscribes to a topic via an existing subscription in a Streaming Pull (also known as Asynchronous Pull) mode. Under the hood, a persistent connection is opened and kept alive for back-and-forth communication, and messages are pushed from server to client as soon as they become available. The client receives said messages and dispatches them one by one to a callback that is executed on a thread from a pre-allocated thread pool.
At this point, you probably know where this is going. After all, there can be no Python article without mentioning the infamous Global Interpreter Lock, or GIL for short. As is often the case, being mindful of its existence and limitations is key to understanding where to look for potential bottlenecks.
What does this mean in practice? Threads from the preallocated pool will compete for CPU time, so it’s highly advisable to ensure no CPU-heavy tasks are run in the threaded callback. If your scenario requires intense computation on the incoming data, consider offloading that to a dedicated process pool, e.g., with the help of a multiprocessing library:
from multiprocessing import Pool
from google.cloud import pubsub_v1
import time
PROCESS_POOL_SIZE = 4
pool = Pool(processes=PROCESS_POOL_SIZE)
def cpu_heavy_task(number):
# Do something CPU-intensive
time.sleep(0.5)
return number
def on_message_received(message: pubsub_v1.subscriber.message.Message) -> None:
number = ... # Decode actual message
def on_cpu_heavy_done(result):
message.ack()
pool.apply_async(cpu_heavy_task, number, callback=on_cpu_heavy_done)
subscriber = pubsub_v1.SubscriberClient()
streaming_pull_future = subscriber.subscribe(..., callback=on_message_received)
On the other hand, if your callbacks are strictly IO-bound, you may consider increasing the number of threads available in the pool. This can be achieved by passing a ThreadScheduler object alongside a callback to the subscribe() method.
from concurrent.futures import ThreadPoolExecutor
from google.cloud import pubsub_v1
THREAD_POOL_SIZE = 100
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(
executor=ThreadPoolExecutor(max_workers=THREAD_POOL_SIZE)
)
subscriber = pubsub_v1.SubscriberClient()
streaming_pull_future = subscriber.subscribe(..., scheduler=scheduler)
Note that the optimal number of threads might depend on other (often external) factors, e.g., the quota limits of an API you call for each message. As with anything, it’s a balancing act that requires careful consideration and a fundamental understanding of how all the pieces interact with each other. Also, keep in mind that both threads and processes, while great at dealing with certain types of workloads, come at a cost. Context-switching and inter-process communication have an overhead that can negate any potential gains in some scenarios.
Flow Control
Another batch of settings that can be used to fine-tune the performance of your consumer is related to Flow Control. This is a mechanism that prevents your subscriber from “biting off more than they can chew” or, to put it simply, pulling more messages than can be processed in a given timeframe. To limit the number of outstanding elements (received messages that are still not acknowledged or negatively acknowledged), use the max_messages option and optionally combine it with max_bytes. Once this threshold is reached, your client will cease pulling any new messages until older ones are processed. Note that pulled messages which exceed limits are kept in an internal buffer and are not lease-managed. This has major implications, as from the perspective of the Pub/Sub server, those messages have been dispatched, and failure to acknowledge them within the subscription default ACK deadline will trigger a re-delivery. Hence, measuring the average processing time for a message and balancing max_messages and the subscription’s acknowledgment deadline is essential to getting the most out of your subscribers.
Assuming you’ve got those numbers figured out, let’s move to what happens once a message is released from the buffer and gets dispatched for processing to one of the threads. At this point, the lease manager kicks in and attempts to do its best to ensure that acknowledgment deadlines are not crossed. Its behavior can somewhat be controlled by three additional Flow Control settings. Let’s start with max_lease_duration. It basically allows us to extend the acknowledgment deadline from the default one to a desired value, which is especially useful in long-running tasks or in tasks where we have to handle intermittent disruptions in communication with external services. It might represent our “worst case scenario,” where retries with backoff happen, or an API takes ages to reply. By default, this value is set to one hour, and while it seems like a long time, it can be extended even further into the future.
Then, there is min_duration_per_lease_extension and max_duration_per_lease_extension. Practically speaking, the former can be safely ignored or set to the same value as the latter. They can both range from 10 to 600 seconds, and they instruct the lease manager on how to extend the current deadline up to the max_lease_duration. As a rule of thumb, I recommend setting those to a higher value, as it generally limits the number of calls the client has to make to the server in order to update said deadlines.
MAX_MESSAGES = 100
MAX_BYTES = 100 * 1024 # 1024 KiB
MAX_LEASE_DURATION = 2 * 60 * 60 # 2 hours
DURATION_PER_LEASE_EXTENSION = 10 * 60 # 600 seconds, max
...
flow_control = pubsub_v1.types.FlowControl(
max_messages=MAX_MESSAGES,
max_bytes=MAX_BYTES,
max_lease_duration=MAX_LEASE_DURATION,
min_duration_per_lease_extension=DURATION_PER_LEASE_EXTENSION,
max_duration_per_lease_extension=DURATION_PER_LEASE_EXTENSION,
)
subscriber = pubsub_v1.SubscriberClient()
streaming_pull_future = subscriber.subscribe(..., flow_control=flow_control)
Data format
Last but not least, choosing the right data format for your application might also affect not only the performance but also your overall costs. While there are no practical limitations to the type of information embedded in a message (as long as it’s binary), sending unstructured and uncompressed data might not be the right choice, and the reason is twofold. To start off, the pricing model of Pub/Sub is based on throughput—bytes written to a topic and read from its subscriptions. In other words—the more bytes you push through Pub/Sub, the more you’re going to pay. On a large scale, with payloads containing massive chunks of data, compressing that information is vital to keep your costs down. Moreover, having a schema in place enforces a contract that any service pushing data to a topic has to abide by. With schema in place, any message that does not follow the specified format will be rejected.
Luckily for us, Pub/Sub supports two serialization frameworks that help us achieve exactly that: AVRO and Protobuf. However, if none of these meets your needs, you can still implement your own solution. As I’ve mentioned before, any binary data will be accepted as a message payload. But keep in mind that decoding and decompression will come at a cost, as it requires some heavy lifting from the CPU. What exactly is that cost? Of course, it depends. Both the average size of your message and the actual implementation of the underlying library used for payload handling might have a significant impact on the overall performance of your application. Before making any final decisions, I highly recommend scouting and benchmarking some of the available options. Thus, while there are some rule-of-thumb rules that are a good starting point for your research, use them as guidance only.
Conclusion
Obviously, the rabbit hole of optimizations one can apply to get the most out of Python-based Pub/Sub consumers is bottomless, and this post does not aspire to be a comprehensive overview of all of them. What I’ve tried to focus on were the “quick wins,” i.e., those pieces of the puzzle that can most likely yield the best results relative to time spent on analysis and implementation. Even then, you might consider some of them not worth the effort. Perhaps the increased complexity of combining threads and processes into a single, maintainable pipeline will cost you more than simply spinning an additional EC2 instance. And even if you try, it is probably unreasonable to expect a night-and-day difference in terms of performance. Scaling horizontally is most likely your best bet in order to achieve desired throughput. Nevertheless, hopefully, the tips I’ve provided will allow you to better understand what choices you have and plan accordingly.