Really Boosting Performance: Multiprocessing with Python

ยท

11 min read

Introduction

In the world of Python programming, optimizing the performance of computationally intensive tasks is a pursuit shared by developers and data scientists alike. As the demand for faster and more efficient processing grows, a powerful solution has emerged: multiprocessing. At its core, multiprocessing is the technique of simultaneously executing multiple tasks across multiple processors or CPU cores. It takes advantage of the parallel processing capabilities of modern hardware, allowing Python programs to process multiple tasks concurrently. This parallel execution empowers developers to tackle resource-intensive computations, complex data analysis, and heavy data processing with remarkable speed.

๐Ÿ’ก
Multi-processing: By leveraging multiple cores, it enables programs to execute tasks in parallel, resulting in a substantial reduction in processing time.

Difference with Multi-threading

Threading, while useful for I/O-bound tasks, may not fully exploit the capabilities of modern multicore processors due to the GIL's (Global Interpreter Lock) limitations. Multiprocessing, on the other hand, allows Python programs to take full advantage of available CPU cores, making it an excellent choice for CPU-bound tasks and computationally intensive operations.

What's GIL?

The Global Interpreter Lock (GIL) is a critical aspect of Python's memory management and concurrency model. GIL is necessary because Python's memory management is not thread-safe, meaning that concurrent access to Python objects by multiple threads can lead to data corruption and inconsistencies. The GIL acts as a safeguard, allowing only one thread to access and manipulate Python objects at any given moment.

The GIL has a significant impact on both multithreading and multiprocessing in Python. In the context of multithreading, the GIL limits the true parallelism achieved by using multiple threads. Although multiple threads can exist and perform I/O-bound tasks concurrently due to Python's support for asynchronous I/O operations, only one thread at a time can execute Python bytecodes, effectively hindering the performance gain from using multiple threads for CPU-bound tasks.

In contrast, multiprocessing bypasses the GIL's limitations by creating separate processes, each with its own Python interpreter and memory space. This allows multiple processes to execute Python code independently and truly leverage the capabilities of multi-core processors. As a result, multiprocessing is often preferred for CPU-bound tasks or scenarios where true parallelism is essential.

Achieving Multiprocessing

The multiprocessing module in Python provides a high-level interface for creating and managing processes. It offers several classes and functions that facilitate the creation, coordination, and communication between multiple processes. One of the key classes in the multiprocessing module is Process, which represents a separate process with its own Python interpreter.

The multiprocessing module abstracts the complexities of process creation and management, making it easier for developers to parallelize their code and leverage multiple.

To create a new process using the multiprocessing module, developers can instantiate the Process class, passing the target function that the new process will execute. Once created, processes can be started using the start() method, which launches the process and begins its execution.

import multiprocessing
import time

# Define a target function that the new process will execute
def worker_function(name):
    print(f"Worker {name} is starting...")
    time.sleep(3)  # Simulate some time-consuming task
    print(f"Worker {name} is done!")

if __name__ == "__main__":
    # Create a new process
    process1 = multiprocessing.Process(target=worker_function, args=("Process 1",))

    # Start the process
    process1.start()

    print("Main process is doing some other work while the new process is running...")

    # Wait for the process to complete (optional)
    process1.join()

    print("Main process is continuing after the new process has finished.")

The multiprocessing module also comes with its own implementation of concepts of Lock, Event, and Semaphore. While the interface and concept of these constructs are just the same as their counterparts in threading module, the underlying compilation strategies of the two packages differ significantly. This arises from the innate difference between multi-threading and multiprocessing concepts.

๐Ÿ’ก
When using threads it's best to implement the above concepts from threading module, and while using processes it's best to implement them from multiprocessing module.

I won't be diving into details about the use case of each, as I have already covered them in my previous article:

Process Pools

Implementing Process Pools to Manage Multiple Worker Processes Efficiently: Process pooling is a technique where a fixed number of worker processes are created and kept alive in a pool, ready to execute tasks as soon as they become available. This eliminates the overhead of creating and destroying processes for each individual task, which can be costly.

Python's multiprocessing.Pool class is a convenient tool for creating process pools. It provides methods such as map(), map_async(), and apply_async() to parallelize tasks and distribute them among the worker processes in the pool.

  • map(): It applies a function to each item in an iterable and returns the results in the same order as the inputs. It blocks until all tasks are completed.

  • map_async(): It is similar to map(), but it returns an asynchronous result object that can be used to obtain the results at a later point or handle exceptions and errors.

  • apply_async(): It is used for applying a function to a single argument. It returns an asynchronous result object.

import multiprocessing
import time

# Function to be executed by worker processes
def task_function(task_id):
    print(f"Task {task_id} started.")
    time.sleep(2)  # Simulate some time-consuming task
    return f"Task {task_id} is done!"

if __name__ == "__main__":
    # Create a process pool with 3 worker processes
    pool = multiprocessing.Pool(processes=3)

    # Example using map()
    tasks = [1, 2, 3, 4, 5]
    results = pool.map(task_function, tasks)
    ## Console Log:
        # Task 1 started.
        # Task 2 started.
        # Task 3 started.
        ## 2 seconds later
        # Task 4 started.
        # Task 5 started.
        ## 2 seconds later
    print(results)
    ## Console Log:
        # ['Task 1 is done!', 'Task 2 is done!', 'Task 3 is done!', 'Task 4 is done!', 'Task 5 is done!']

    # Example using map_async()
    tasks = [6, 7, 8, 9, 10]
    async_results = pool.map_async(task_function, tasks)
    ## Console Log:
        # Task 6 started.
        # Task 7 started.
        # Task 8 started.
        ## 2 seconds later
        # Task 9 started.
        # Task 10 started.
        ## 2 seconds later
    async_results.wait()  # Wait for all tasks to complete
    # If we didn't use the `wait` function above, the main process would've continued with the rest of the code.
    print(async_results.get())
    ## Console Log:
        # ['Task 6 is done!', 'Task 7 is done!', 'Task 8 is done!', 'Task 9 is done!', 'Task 10 is done!']

    # Example using apply_async()
    result = pool.apply_async(task_function, (11,))
    print(result.get())
    ## Console Log:
        # Task 11 started.

    # Close the pool and wait for all processes to finish
    pool.close()
    pool.join()
โ—
Closing the pool is important because it allows the worker processes to terminate gracefully after completing their current tasks. If you don't close the pool, the worker processes may continue to run indefinitely, even after the main process has been completed, leading to potential resource leaks and unexpected behavior.

Shared Memory & Intercommunications

Shared Memory

  • Shared Memory: Shared memory is a technique where multiple processes can access the same region of memory, allowing them to share data directly. Python's multiprocessing module provides shared memory objects like Value and Array. These shared objects are implemented in shared memory, enabling multiple processes to read and write to the same memory locations safely.

  • Manager Objects: Manager objects, such as Manager().Value and Manager().List, provide an alternative approach for sharing more complex data structures, such as lists and dictionaries, between processes. These objects create a proxy-like mechanism, allowing processes to interact with the shared data safely.

To better grasp the above concepts, let's review the code below:

import multiprocessing

def update_shared_value(shared_value):
    shared_value.value += 1

if __name__ == "__main__":
    shared_value = multiprocessing.Value('i', 0)

    # Create a process pool with 5 worker processes
    pool = multiprocessing.Pool(processes=5)

    # Using the map method to distribute tasks to the worker processes
    pool.map(update_shared_value, [shared_value] * 5)

    # Close the pool and wait for all processes to finish
    pool.close()
    pool.join()

    print("Shared value:", shared_value.value)

If you run the code above you'll get an error:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

To fix this issue, we need to use Manager objects:

import multiprocessing

def update_shared_value(shared_value):
    shared_value.value += 1

if __name__ == "__main__":
    ### Changes Begin
    manager = multiprocessing.Manager()
    shared_value = manager.Value('i', 0)
    ### Changes End

    # Create a process pool with 5 worker processes
    pool = multiprocessing.Pool(processes=5)

    # Using the map method to distribute tasks to the worker processes
    pool.map(update_shared_value, [shared_value] * 5)

    # Close the pool and wait for all processes to finish
    pool.close()
    pool.join()

    print("Shared value:", shared_value.value)
    ## Console Log:
        # Shared value: 5

So... what's the difference?

  • multiprocessing.Value: Intended for sharing a simple data type (int, float, bool) between the main process and a single child process, but not well-suited for process pools.

  • multiprocessing.Manager: Designed for managing shared data structures, like lists, dictionaries, etc., and is suitable for use in process pools. It creates a manager process that handles shared data, ensuring proper synchronization and access by worker processes in the pool.

In the corrected example that used multiprocessing.Manager, the shared variable was handled properly by the manager process, making it work seamlessly with the worker processes created by the process pool. As a result, the RuntimeError was avoided, and the code ran as expected.

Synchronization Mechanisms

  • Locks: Locks are synchronization primitives used to enforce mutual exclusion. By using the Lock class from the multiprocessing module, you can ensure that only one process can access the critical section (shared data) at a time. This prevents data race conditions and maintains data integrity.

  • Semaphores: Semaphores are similar to locks but allow multiple processes to access the shared resource concurrently up to a specified limit. They are useful for managing resources with limited capacity.

  • Conditions: Conditions are used for more complex synchronization scenarios. They allow processes to wait for a specific condition to be met before proceeding with their tasks. Conditions are created using the Condition class from the multiprocessing module.

Read more about the practical implementation of these topics here:

Best Practices for Interprocess Communication

  • Avoid Excessive Data Sharing: Minimize the amount of data shared between processes to reduce contention and potential synchronization issues.

  • Use Locks Wisely: Use locks only when necessary, as excessive use of locks can introduce contention and reduce performance gains from parallel processing.

  • Be Mindful of Deadlocks: When using multiple synchronization primitives (locks, semaphores, etc.), be careful to avoid deadlocks, where processes wait indefinitely for each other to release locks.

  • Choose the Right Technique: Select the appropriate data-sharing technique based on the complexity and size of the data being shared. Use shared memory for simple data types and Manager objects for more complex data structures.

  • Exception Propagation: When a child process encounters an exception, it does not propagate it back to the parent process by default. As a result, the main process might not be aware of any exceptions that occurred in the child processes. To detect exceptions in child processes, we can use the apply_async() method of the process pool along with the get() method on the returned AsyncResult object. This allows us to catch and handle any exceptions raised by the child process.

      import multiprocessing
    
      def child_process_task():
          try:
              # Simulate an error by dividing by zero
              result = 10 / 0
          except Exception as e:
              # Propagate the exception back to the parent process
              raise e
    
      if __name__ == "__main__":
          pool = multiprocessing.Pool(processes=1)
    
          # Use apply_async to submit the task to the process pool
          async_result = pool.apply_async(child_process_task)
    
          try:
              # Get the result from the child process (this will raise an exception if any occurred)
              result = async_result.get()
          except Exception as e:
              print(f"Exception in child process: {e}")
              ## Console Log:
                  # Exception in child process: division by zero
    
          pool.close()
          pool.join()
    

Graceful Termination: In case of severe errors or critical exceptions that may render the application unstable, it is crucial to ensure that the child processes terminate gracefully. Abnormal terminations can leave resources unreleased and lead to unintended side effects. Implement mechanisms to ensure that child processes are terminated cleanly in case of any catastrophic errors. For example, use the try-except block to catch exceptions and initiate a clean shutdown of the child process.

import multiprocessing
import time
import asyncio

def child_process_task():
    try:
        # Simulate a task that may raise an exception
        asyncio.sleep(2)
        # Simulate an error by dividing by zero
        result = 10 / 0
    except Exception as e:
        # Log the exception, initiate cleanup, and terminate gracefully
        print(f"Exception in child process: {e}")

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=1)

    # Use apply_async to submit the task to the process pool
    async_result = pool.apply_async(child_process_task)

    try:
        # Get the result from the child process (this will raise an exception if any occurred)
        result = async_result.get()
    except Exception as e:
        # Log and handle the exception
        print(f"Exception in child process: {e}")

        # Terminate the process pool gracefully
        pool.terminate()

    pool.close()
    pool.join()
๐Ÿ’ก
If you use time.sleep the whole process along with all its threads will stop. However, if you use asyncio.sleep only that thread in that process will wait.

Final Word

I hope this article has shed light on the incredible world of multiprocessing in Python. By harnessing the power of multiple cores, you can drastically improve the performance and efficiency of your Python programs. Keep exploring and experimenting with multiprocessing techniques to unlock even more possibilities. Happy multiprocessing, and stay tuned for more exciting articles on our tech blog! ๐Ÿ’ป๐Ÿš€๐Ÿ

ย