Boosting Performance: Multi-Threading with Python

Overview

In today's fast-paced technological landscape, maximizing performance and efficiency is a top priority for developers. One powerful technique to achieve this is multi-threading, which allows Python developers to leverage the full potential of modern CPUs by running multiple threads concurrently. In this article, we delve into the world of multi-threading in Python, exploring its benefits, challenges, and best practices. From improving responsiveness in GUI applications to speeding up computationally-intensive tasks, multi-threading empowers developers to unlock the full potential of their Python programs. Join us on this journey to discover the art of parallelism and take your Python applications to new heights.

Before getting into practical use cases, we should first cover key concepts:

Key Concepts

Overview

Classes

  1. Thread: Represents an independent flow of execution within a program.

  2. Lock: Synchronization primitive used to control access to a shared resource.

  3. Semaphore: Counter-based synchronization primitive that limits concurrency.

  4. Condition: Allows threads to wait for a specific condition to be satisfied before proceeding.

  5. Barrier: Allows threads to synchronize their execution and coordinate their progress, making it useful in scenarios where multiple threads need to reach a common synchronization point before proceeding further.

  6. Event: Enables threads to coordinate their execution and communicate with each other based on the occurrence of an event or condition.

Functionalities

  1. Thread Synchronization: Mechanisms like locks, semaphores, and conditions to ensure thread safety and avoid race conditions.

  2. Thread Communication: Mechanisms for communication between threads, such as events, queues, and thread-local variables.

  3. Daemon Threads: Classification of threads as daemon or non-daemon threads, with daemon threads considered "background" threads.

  4. Thread Pooling: Managing and reusing a limited number of active threads for executing multiple tasks efficiently.

In-Depth Introduction

Thread

import threading
import time

# Basic Functional Implementation
# ==============================
def printFoo(times):
    for i in range(times):
        print("foo")
        time.sleep(1)

tr = threading.Thread(target=printFoo, args=(3, ))
# ==============================

# `.start()` will call the `.run()` method.
tr.start()
for i in range(2):
    print("bar")
    print(tr.is_alive())
    tr.join()
    print(tr.is_alive())
# Output:
# foo <= Before entering Loop
# bar <= Enters the Loop and Prints Once
# True <= Thread is alive
# foo <= After 1 second sleep, it prints another
# foo <= No more Bar is written, because of `.join()`. 
#        Basically, the program's flow will be interrupted
#        as it waits for the thread to stop.
# False <= Thread is finished, so the thread is not alive anymore.
# bar <= The loop will continue.
# False
# False

# Basic Object Oriented Implementation
# ==============================
class myThread(threading.Thread):
    def __init__(self, times):
        self.times = times
        super().__init__()
    # This will override the parent's method
    def run(self):
        for i in range(self.times):
            print("foo")
            time.sleep(1)
# ==============================
# Now calling the following codes, will bear the same results as above:
tr = myThread(3)
tr.start()
for i in range(2):
    print("bar")
    print(tr.is_alive())
    tr.join()
    print(tr.is_alive())

Daemonic Threads

In Python's threading module, threads can be classified as either daemon or non-daemon threads. The concept of daemon threads allows for more flexibility in managing threads within a program.

A daemon thread is a thread that runs in the background and does not prevent the program from exiting, even if it is still running. In other words, when all non-daemon threads have finished their execution, the Python interpreter can exit, regardless of whether any daemon threads are still active. Some key characteristics of daemon threads:

  1. Background Execution: Daemon threads are typically used for background tasks or services that run continuously while the main program performs other operations. These threads execute independently and are not critical for the main program's functionality.

  2. Termination with Main Program: When the main program completes its execution and all non-daemon threads have terminated, any remaining daemon threads are automatically stopped, and their resources are cleaned up. This behavior allows for a smoother and faster program exit.

  3. Inherited Daemon Status: By default, threads created from the main program inherit the daemon status of the parent thread. However, you can explicitly set the daemon status using the setDaemon() method.

  4. Interaction with Non-Daemon Threads: Daemon threads can interact with non-daemon threads without any issues. If a daemon thread creates additional threads, those threads can continue execution even after the parent daemon thread has terminated.

Note

  1. Daemon threads should not perform critical operations or rely on shared resources that could be left in an inconsistent state if abruptly terminated. Daemon threads are primarily suitable for non-critical background tasks or services that do not require explicit cleanup or synchronization.

  2. Threads cannot be abruptly terminated so if such functionality is needed, the run() method (and the loops involved) should be dependent on an external flag.

Lock

import threading

# Shared resource
shared_counter = 0
lock = threading.Lock()

def increment_counter():
    global shared_counter
    # Using `lock` as a context manager, will:
    #     1. At the start of block, calls `lock.acquire()`
    #        This method, blocks the thread until the `lock.release()` 
    #        has been called by other procedures.
    #     2. At the end of block, calls `lock.release()`
    with lock:
        # Critical section: Increment the shared counter
        shared_counter += 1

def decrement_counter():
    global shared_counter
    with lock:
        # Critical section: Decrement the shared counter
        shared_counter -= 1

# Create multiple threads to increment and decrement the counter
threads = []
for _ in range(10):
    t1 = threading.Thread(target=increment_counter)
    t2 = threading.Thread(target=decrement_counter)
    threads.append(t1)
    threads.append(t2)

# Start the threads
for thread in threads:
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

# Print the final value of the shared counter
print("Final value of the shared counter:", shared_counter)
# Output:
# 0

Please note that the output value (0) is irrelevant and unimportant. What's important, is the isolative accessibility to a shared resource, as demonstrated above.

Semaphore

Semaphore is exactly like a Lock with the difference being that a Lock allows just one thread to access a resource, while Semaphore is initialized with a maximum counter, that allows up to that many threads to access the resources.

import threading
import time
from database import Database # Hypothetical Class

# Create a Semaphore with a limit of 3 concurrent connections, equal to the amount of safe concurrency limit of the Database.
database_semaphore = threading.Semaphore(3)
database = Database()  # Instantiate the Database class

def write_to_database(data):
    with database_semaphore:
        # Acquire a permit from the semaphore
        # Critical section: Write data to the database
        database.write(data)
        print(f"Data '{data}' written to the database.")
        time.sleep(2)  # Simulate some processing time

# Create multiple threads to write data to the database concurrently
data = ['Data1', 'Data2', 'Data3', 'Data4', 'Data5']
threads = []
for d in data:
    t = threading.Thread(target=write_to_database, args=(d,))
    threads.append(t)

# Start the threads
for thread in threads:
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

print("All data written to the database.")

Lock is best used for file access, while Semaphore could be used for API and Database calls that allow limited concurrency.

Condition

Building upon the Lock class, it can pause a thread until a condition is met by making use of the Lock.acquire and Lock.release methods, internally.

import threading
import random
from database import Database # Hypothetical Class

# Instantiate a Lock object
lock = threading.Lock()

# Encapsulate it with a Condition
condition = threading.Condition(lock)
data = list(range(10))
database = Database()  # Instantiate the Database class
random.shuffle(data)

def consumer():
    with condition:
        print("Consumer is waiting for data...")
        # Pauses the threading operation
        condition.wait()
        # Store Processed Data
        database.write(data)
        print("Consumer received data!")

def producer():
    with condition:
        print("Producer is producing data...")
        # Transform Data
        data.sort()
        print("Producer produced data!")
        # Wakes up ONE thread that is waiting for the condition's lock
        # to be released.
        condition.notify(n=1)
        # Wakes up all threads
        # condition.notify_all()

# Create consumer and producer threads
consumer_thread = threading.Thread(target=consumer)
producer_thread = threading.Thread(target=producer)

# Start the threads
consumer_thread.start()
producer_thread.start()

# Wait for the threads to complete
consumer_thread.join()
producer_thread.join()

print("Program completed.")

Note

Condition is very much like Lock that provides more flexible synchronization strategies to be implemented using notify(n) to wake up threads in order, and on demand.

Barrier

A synchronization tool that can be used like a dam to block a total number of threads at bay, until the number of threads has reached the prespecified limit.

import threading

# Initialize Barrier to synchronize 3 threads
barrier = threading.Barrier(parties=3)

def worker(n):
    print(f"Worker thread #{n} started.")
    # Waits until all parties have reached the `barrier.wait()` point
    barrier.wait()
    print("Worker thread passed the barrier.")

# Create worker threads
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]

# Start the threads
for thread in threads:
    thread.start()

# Wait for the threads to complete
for thread in threads:
    thread.join()

print("All worker threads completed.")
# Output
# Worker thread #1 started.
# Worker thread #2 started.
# Worker thread #3 started.
# Worker thread passed the barrier.
# Worker thread passed the barrier.
# Worker thread passed the barrier.
# All worker threads completed.

Event

import threading
import time

event = threading.Event()

def subscriber():
    # Unset the Event, and hold all threads on-pause
    # if they come across `event.wait()` statement,
    # until the `.set()` method is called once-again.
    event.clear()
    print("Waiter thread is waiting...")
    # Wait for the specific event's `is_set()` returns True.
    event.wait()
    print("Waiter thread is done waiting!")

def publisher():
    print("Setter thread is setting the event...")
    # Fire the event, waking up all threads that are
    # waiting on this particular even to be `set`.
    event.set()
    print("Setter thread is done setting the event.")

# Create waiter and setter threads
subscriber_thread = threading.Thread(target=subscriber)
publisher_thread = threading.Thread(target=publisher)

# Start the threads
subscriber_thread.start()
publisher_thread.start()

# Wait for the threads to complete
subscriber_thread.join()
publisher_thread.join()

print("Program completed.")

Worth Noting

The Python Global Interpreter Lock (GIL) affects the performance of multithreading:

  1. It limits the execution of Python bytecodes to one thread at a time. This can significantly limit the performance of programs that rely on heavy multiprocessing or concurrent processing.

  2. Secondly, because only one thread can execute bytecode at a time, switching between threads can become an overhead for the interpreter. This overhead can result in longer execution times than for single-threaded programs.

So for CPU-intensive operations, the use of multithreading using the threading package will negatively impact the results. In order to perform parallel computations, we need to use a concept called multiprocessing in Python, which I'll cover in a future article.


Final Note

I hope you learned the basics of multi-threading in Python. I retouched some subjects while writing this article, and enjoyed myself a lot.

More articles are on the way.