Difference between revisions of "Python Multithreading/Multiprocessing Examples"

From dftwiki3
Jump to: navigation, search
(Computing Pi, Multithreaded Version)
(Computing Pi, Multiprocessing Version)
 
(3 intermediate revisions by the same user not shown)
Line 104: Line 104:
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
# ComputingPy.py
+
# ComputingPyMultiThreaded.py
 
# D. Thiebaut
 
# D. Thiebaut
 
#
 
#
Line 195: Line 195:
 
</source>
 
</source>
 
<br />
 
<br />
 +
=Multiprocessing in Python=
 
<br />
 
<br />
 +
Note, these examples do not work in Idle.  Instead, run them from the command line in a Terminal/Console window.
 
<br />
 
<br />
 +
==Example 1==
 
<br />
 
<br />
 +
::<source lang="python">
 +
# PythonMultiprocessing_Example1.py
 +
# D. Thiebaut
 +
# Taken and adapted from
 +
# https://pymotw.com/2/threading/
 +
#
 +
# Python program illustrating how threads can
 +
# be created and started in Python
 +
 +
from __future__ import print_function
 +
import multiprocessing
 +
import time
 +
 +
 +
def worker( Id ):
 +
    """
 +
    Process worker function.  It receives an Id,
 +
    and prints a message indicating it's starting.
 +
    waits 0.5 seconds, prints another message, and dies.
 +
    """
 +
    print( 'Worker %d started\n' % Id )
 +
    time.sleep( 2.5 )
 +
    print( 'Worker %d finished\n' % Id )
 +
    return
 +
 +
def main():
 +
    """
 +
    Spawns 5 processes that will run the worker function.
 +
    Waits for the threads to finish, then stops.
 +
    """
 +
    jobs = []
 +
    for i in range(5):
 +
        p = multiprocessing.Process( target=worker, args=(i,) )
 +
        jobs.append(p)
 +
        p.start()
 +
       
 +
    print( "Main has spawn all the threads" )
 +
 +
    # wait for all the threads to finish
 +
    for p in jobs:
 +
        p.join()
 +
       
 +
    print( "Done!" )
 +
   
 +
main()
 +
 +
</source>
 
<br />
 
<br />
 +
==Computing Pi, Multiprocessing Version==
 +
<br />
 +
::<source lang="python">
 +
# ComputingPyMultiThreaded.py
 +
# D. Thiebaut
 +
#
 +
# Computes an approximation of Pi by summing up
 +
# a series of terms.  The more terms, the closer
 +
# the approximation.
 +
#
 +
from __future__ import print_function
 +
import multiprocessing
 +
 +
 +
def f( x ):
 +
    """
 +
    The function being integrated, and which returns
 +
    an approximation of Pi when summed up over an interval
 +
    of integers.
 +
    """
 +
    return 4.0 / ( 1 + x*x )
 +
 +
class WorkerProcess(multiprocessing.Process):
 +
    """
 +
    That's the main worker Process, as a class.
 +
    It gets initialized via __init__ with the interval
 +
    and deltaX it sums up on.  It saves the accumulated
 +
    sum in self.sum
 +
    """
 +
   
 +
    def __init__( self, args ):
 +
        """
 +
        args must contain n1, n2, and deltaX, in that order.
 +
        """
 +
        multiprocessing.Process.__init__( self, args=args )
 +
        self.n1 = args[0]
 +
        self.n2 = args[1]
 +
        self.deltaX = args[2]
 +
        self.q  = args[3]
 +
 +
    def run( self ):
 +
        """
 +
        This will be called automatically by start() in
 +
        main.  It's the method that does all the work.
 +
        """
 +
        sum = 0.0
 +
        for i in range( self.n1, self.n2 ):
 +
            sum += f( i * self.deltaX )
 +
        self.q.put( sum )
 +
       
 +
   
 +
def main():
 +
    """
 +
    Gets a number of terms from the user, then sums
 +
    up the series and prints the resulting sum, which
 +
    should approximate Pi.
 +
    """
 +
 +
    # get the number of terms
 +
    N = 10000000  #int( input( "# steps > " ) )
 +
    P = 10        #int( input( "# processes? > " ) )
 +
   
 +
    deltaX = 1.0 / N    # the interval
 +
    deltaN = int( N/P ) # the number of samples per thread
 +
 +
    # start T
 +
    n1 = 0
 +
    jobs = []
 +
    q = multiprocessing.Queue()
 +
   
 +
    for i in range( P ):
 +
        n2 = n1 + deltaN
 +
        p = WorkerProcess( args=(n1, n2, deltaX, q) )
 +
        p.start()
 +
        jobs.append( p )
 +
        n1 = n2
 +
 +
    # wait for the threads to be done
 +
    #for p in jobs:
 +
    #    p.join()
 +
 +
    # gather the sums
 +
    sum = 0
 +
    for i in range( P ):
 +
        sum += q.get()
 +
 +
    # print result
 +
    print( "sum = %1.9f" % ( sum * deltaX ) )
 +
 +
main()
 +
 +
</source>
 
<br />
 
<br />
 
<br />
 
<br />

Latest revision as of 21:06, 25 January 2017

--D. Thiebaut (talk) 15:27, 25 January 2017 (EST)


Multithreading in Python


Warning! The multithreading module in Python does not provide true parallelism. Read up on the Global Interactive Lock if you want to learn more about this problem.

A good collection of examples can be found here: https://pymotw.com/2/threading/.

Example 1


# PythonThreading_Example1.py
# D. Thiebaut
# Taken and adapted from
# https://pymotw.com/2/threading/
#
# Python program illustrating how threads can
# be created and started in Python

from __future__ import print_function
import threading
import time


def worker( Id ):
    """
    thread worker function.  It receives an Id,
    and prints a message indicating it's starting.
    waits 0.5 seconds, prints another message, and dies.
    """
    print( 'Worker %d started\n' % Id )
    time.sleep( 0.5 )
    print( 'Worker %d finished\n' % Id )
    return

def main():
    """
    Spawns 5 threads that will run the worker function.
    Waits for the threads to finish, then stops.
    """
    threads = []
    for i in range(5):
        t = threading.Thread( target=worker, args=(i,) )
        threads.append(t)
        t.start()
    print( "Main has spawn all the threads" )

    # wait for all the threads to finish
    for t in threads:
        t.join()
        
    print( "Done!" )
    
main()


Computing Pi, Serial Version


# ComputingPy.py
# D. Thiebaut
#
# Computes an approximation of Pi by summing up
# a series of terms.  The more terms, the closer
# the approximation.
#

def f( x ):
    """
    The function being integrated, and which returns
    an approximation of Pi when summed up over an interval
    of integers.
    """
    return 4.0 / ( 1 + x*x )


def main():
    """
    Gets a number of terms from the user, then sums
    up the series and prints the resulting sum, which
    should approximate Pi.
    """

    # get the number of terms
    N = int( input( "> " ) )

    sum = 0.0          # where we sum up the individual
                       # intervals
    deltaX = 1.0 / N   # the interval

    # sum over N terms
    for i in range( N ):
        sum += f( i * deltaX )

    # display results
    print( "sum = %1.9f" % ( sum * deltaX ) )

main()


Computing Pi, Multithreaded Version


# ComputingPyMultiThreaded.py
# D. Thiebaut
#
# Computes an approximation of Pi by summing up
# a series of terms.  The more terms, the closer
# the approximation.
#
from __future__ import print_function
import threading


def f( x ):
    """
    The function being integrated, and which returns
    an approximation of Pi when summed up over an interval
    of integers.
    """
    return 4.0 / ( 1 + x*x )

class WorkerThread(threading.Thread):
    """
    That's the main worker thread, as a class.
    It gets initialized via __init__ with the interval
    and deltaX it sums up on.  It saves the accumulated
    sum in self.sum
    """
    
    def __init__( self, args ):
        """
        args must contain n1, n2, and deltaX, in that order.
        """
        threading.Thread.__init__( self, args=args )
        self.n1 = args[0]
        self.n2 = args[1]
        self.deltaX = args[2]

    def run( self ):
        """
        This will be called automatically by start() in
        main.  It's the method that does all the work.
        """
        self.sum = 0.0
        for i in range( self.n1, self.n2 ):
            self.sum += f( i * self.deltaX )

    def getSum( self ):
        """
        gets the computed sum.
        """
        return self.sum
    
def main():
    """
    Gets a number of terms from the user, then sums
    up the series and prints the resulting sum, which
    should approximate Pi.
    """

    # get the number of terms
    N = 1000000000 #int( input( "# steps > " ) )
    T = 1000       #int( input( "# threads? > " ) )
    
    deltaX = 1.0 / N    # the interval
    deltaN = int( N/T ) # the number of samples per thread

    # start T 
    n1 = 0
    threads = []
    for i in range( T ):
        n2 = n1 + deltaN
        t = WorkerThread( args=(n1, n2, deltaX,) )
        t.start()
        threads.append( t )
        n1 = n2

    # wait for the threads to be done
    for t in threads:
        t.join()

    # gather the sums
    sum = 0
    for t in threads:
        sum += t.getSum()

    # print result
    print( "sum = %1.9f" % ( sum * deltaX ) )

main()


Multiprocessing in Python


Note, these examples do not work in Idle. Instead, run them from the command line in a Terminal/Console window.

Example 1


# PythonMultiprocessing_Example1.py
# D. Thiebaut
# Taken and adapted from
# https://pymotw.com/2/threading/
#
# Python program illustrating how threads can
# be created and started in Python

from __future__ import print_function
import multiprocessing
import time


def worker( Id ):
    """
    Process worker function.  It receives an Id,
    and prints a message indicating it's starting.
    waits 0.5 seconds, prints another message, and dies.
    """
    print( 'Worker %d started\n' % Id )
    time.sleep( 2.5 )
    print( 'Worker %d finished\n' % Id )
    return

def main():
    """
    Spawns 5 processes that will run the worker function.
    Waits for the threads to finish, then stops.
    """
    jobs = []
    for i in range(5):
        p = multiprocessing.Process( target=worker, args=(i,) )
        jobs.append(p)
        p.start()
        
    print( "Main has spawn all the threads" )

    # wait for all the threads to finish
    for p in jobs:
        p.join()
        
    print( "Done!" )
    
main()


Computing Pi, Multiprocessing Version


# ComputingPyMultiThreaded.py
# D. Thiebaut
#
# Computes an approximation of Pi by summing up
# a series of terms.  The more terms, the closer
# the approximation.
#
from __future__ import print_function
import multiprocessing


def f( x ):
    """
    The function being integrated, and which returns
    an approximation of Pi when summed up over an interval
    of integers.
    """
    return 4.0 / ( 1 + x*x )

class WorkerProcess(multiprocessing.Process):
    """
    That's the main worker Process, as a class.
    It gets initialized via __init__ with the interval
    and deltaX it sums up on.  It saves the accumulated
    sum in self.sum
    """
    
    def __init__( self, args ):
        """
        args must contain n1, n2, and deltaX, in that order.
        """
        multiprocessing.Process.__init__( self, args=args )
        self.n1 = args[0]
        self.n2 = args[1]
        self.deltaX = args[2]
        self.q  = args[3]

    def run( self ):
        """
        This will be called automatically by start() in
        main.  It's the method that does all the work.
        """
        sum = 0.0
        for i in range( self.n1, self.n2 ):
            sum += f( i * self.deltaX )
        self.q.put( sum )
        
    
def main():
    """
    Gets a number of terms from the user, then sums
    up the series and prints the resulting sum, which
    should approximate Pi.
    """

    # get the number of terms
    N = 10000000   #int( input( "# steps > " ) )
    P = 10         #int( input( "# processes? > " ) )
    
    deltaX = 1.0 / N    # the interval
    deltaN = int( N/P ) # the number of samples per thread

    # start T 
    n1 = 0
    jobs = []
    q = multiprocessing.Queue()
    
    for i in range( P ):
        n2 = n1 + deltaN
        p = WorkerProcess( args=(n1, n2, deltaX, q) )
        p.start()
        jobs.append( p )
        n1 = n2

    # wait for the threads to be done
    #for p in jobs:
    #    p.join()

    # gather the sums
    sum = 0
    for i in range( P ):
        sum += q.get()

    # print result
    print( "sum = %1.9f" % ( sum * deltaX ) )

main()