Programare concurentă în Python (continuare)

Obiective

Vom continua în cadrul acestui laborator prezentarea elementelor de sincronizare oferite de Python și nu numai.

Obiecte de sincronizare

Bariera

De multe ori este necesar ca un grup de thread-uri să ajungă toate într-un anumit punct al execuției (ex: fiecare thread a calculat un rezultat intermediar al algoritmului) și numai după aceea să își continue execuția (ex: rezultatele intermediare sunt partajate de toate thread-urile în partea următoare a algoritmului). Mecanismul de sincronizare potrivit pentru asemenea situații este bariera. Python 2.7 nu oferă un obiect de sincronizare care să implementeze comportamentul unei bariere, însă acesta poate fi ușor implementat într-un modul, pe baza celorlalte mecanisme de sincronizare. Începând cu Python 3.2 în modulul threading a fost introdusă clasa Barrier, acesta fiind o barieră reentrantă implementată folosind variabile condiție (cod sursă).

Ce trebuie să ofere o barieră?

  • un mecanism de blocare/deblocare a thread-urilor
  • un contor al numărului de thread-uri care au ajuns/mai trebuie să ajungă la barieră
  • deblocarea tuturor thread-urilor atunci când a ajuns și ultimul dintre ele la barieră

Bariera ne-reentrantă

Putem implementa o barieră folosind un semafor inițializat cu 0 (mecanismul de blocare/deblocare) și un contor al numărului de thread-uri care mai trebuie să ajungă la barieră (contorul), inițializat cu numărul de thread-uri utilizate. Semaforul este folosit pentru a bloca execuția thread-urilor. Contorul este decrementat de fiecare thread care ajunge la barieră și reprezintă numărul de thread-uri care au mai rămas de ajuns. Fiind o variabilă partajată, modificarea lui trebuie bineînțeles protejată de un lock. În momentul în care ultimul thread decrementează contorul, acesta va avea valoarea 0, semnalizând faptul că toate thread-urile au ajuns la barieră. Ultimul thread va incrementa astfel semaforul (deblocarea) și va debloca toate thread-urile blocate.

simple_barrier_semaphore.py
from threading import *
 
class SimpleBarrier():
    def __init__(self, num_threads):
        self.num_threads = num_threads
        self.count_threads = self.num_threads    # contorizeaza numarul de thread-uri ramase
        self.count_lock = Lock()                 # protejeaza accesarea/modificarea contorului
        self.threads_sem = Semaphore(0)          # blocheaza thread-urile ajunse
 
    def wait(self):
        with self.count_lock:
            self.count_threads -= 1
            if self.count_threads == 0:          # a ajuns la bariera si ultimul thread
                for i in range(self.num_threads):
                    self.threads_sem.release()   # incrementarea semaforului va debloca num_threads thread-uri
        self.threads_sem.acquire()               # num_threads-1 threaduri se blocheaza aici
                                                 # contorul semaforului se decrementeaza de num_threads ori
 
class MyThread(Thread):
    def __init__(self, tid, barrier):
        Thread.__init__(self)
        self.tid = tid
        self.barrier = barrier
 
    def run(self):
        print "I'm Thread " + str(self.tid) + " before\n",
        self.barrier.wait()
        print "I'm Thread " + str(self.tid) + " after barrier\n",

De ce nu este reentrantă bariera cu un semafor?

Fie cazul în care avem N thread-uri, iar acestea trebuie sincronizate prin barieră de mai multe ori:

  • N-1 thread-uri vor face acquire pe semafor
  • ultimul thread face release de N de ori pe semafor
    • unul din release-uri este pentru el
  • N-1 thread-uri se deblochează și își continuă execuția; ultimul thread ar trebui să facă acquire și să nu se blocheze, doarece semaforul ar trebui să fie 1
  • rulând în buclă însă, unul din thread-urile deblocate poate ajunge să facă acquire din nou, înainte ca ultimul thread să treacă de acquire
    • ultimul thread va rămâne blocat la acquire, urmând ca și celelalte thread-uri să se blocheze în al doilea acces al barierei; nici un thread nu mai face release, ducând astfel la deadlock

Bariera reentrantă

Barierele reentrante (eng. reusable barrier) sunt utile în prelucrări 'step-by-step' și/sau bucle. Unele aplicații pot necesita ca thread-urile să execute anumite operații în buclă, cu rezultatele tuturor thread-urilor din iterația curentă necesare pentru începerea iterației următoare. În acest caz, după fiecare iterație, se folosește o sincronizare cu barieră reentrantă.

Pentru a adapta bariera din secțiunea anterioară astfel încât să poată fi folosită de mai multe ori, avem nevoie de încă un semafor. Soluția aceasta se bazează pe necesitatea ca toate cele N thread-uri să treacă de acquire() înainte ca vreunul să revină la barieră. Astfel, partea de sincronizare este compusă din două etape, fiecare folosind câte un semafor.

Folosind implementarea de mai jos, garantăm că thread-urile ajung să se blocheze din nou pe primul semafor doar după ce toate au trecut în prealabil de acesta:

  • N-1 thread-uri vor face acquire pe semaforul 1
  • ultimul thread face release de N de ori pe semaforul 1
    • unul din release-uri este pt el
  • N-1 thread-uri se deblochează și fac acquire pe semaforul 2
  • ultimul thread face și el acquire pe semaforul 1 și trece de acesta
  • ultimul thread face release de N de ori pe semaforul 2
  • N-1 thread-uri se deblochează și fac acquire pe semaforul 1

s.a.m.d….

reusable_barrier_semaphore.py
from threading import *
 
class ReusableBarrier():
    def __init__(self, num_threads):
        self.num_threads = num_threads
        self.count_threads1 = [self.num_threads]
        self.count_threads2 = [self.num_threads]
        self.count_lock = Lock()                 # protejam accesarea/modificarea contoarelor
        self.threads_sem1 = Semaphore(0)         # blocam thread-urile in prima etapa
        self.threads_sem2 = Semaphore(0)         # blocam thread-urile in a doua etapa
 
    def wait(self):
        self.phase(self.count_threads1, self.threads_sem1)
        self.phase(self.count_threads2, self.threads_sem2)
 
    def phase(self, count_threads, threads_sem):
        with self.count_lock:
            count_threads[0] -= 1
            if count_threads[0] == 0:            # a ajuns la bariera si ultimul thread
                for i in range(self.num_threads):
                    threads_sem.release()        # incrementarea semaforului va debloca num_threads thread-uri
                count_threads[0] = self.num_threads  # reseteaza contorul
        threads_sem.acquire()                    # num_threads-1 threaduri se blocheaza aici
                                                 # contorul semaforului se decrementeaza de num_threads ori
 
class MyThread(Thread):
    def __init__(self, tid, barrier):
        Thread.__init__(self)
        self.tid = tid
        self.barrier = barrier
 
    def run(self):
        for i in xrange(10):
            self.barrier.wait()
            print "I'm Thread " + str(self.tid) + " after barrier, in step " + str(i) + "\n",

Event

Event-urile sunt obiecte simple de sincronizare care permit mai multor thread-uri blocarea voluntară până la apariția unui eveniment semnalat de un alt thread (ex: o condiție a devenit adevărată). Intern, un obiect Event conține un flag setat inițial la valoarea false. El oferă două operații de bază: set(), care setează valoarea flag-ului pe true, și wait(), care blochează execuția thread-urilor apelante până când flag-ul devine true. Dacă flag-ul este deja true în momentul apelării lui wait() aceasta nu va bloca execuția. În momentul setării flag-ului pe true toate thread-urile blocate în wait() vor fi deblocate și își vor continua execuția.

Event-ul mai oferă și alte două operații: clear(), care setează flag-ul intern la valoarea false (resetează evenimentul) și is_set(), care oferă posibilitatea de interogare a valorii curente a flag-ului.

Testarea valorii flag-ului cu is_set() într-o buclă, fără a executa calcule utile în acea buclă sau fără a apela metode blocante, reprezintă o formă de busy-waiting. Această utilizare trebuie evitată, deoarece, ca orice busy-waiting, irosește timp de procesor care ar putea fi folosit de celelate thread-uri.
Refolosirea obiectelor Event pentru a semnaliza un eveniment (cu metoda clear()) trebuie făcută cu grijă. A doua setare a flag-ului pe true poate fi ștearsă de un clear() întârziat, înainte ca thread-ul care dorește să aștepte evenimentul să facă wait() pentru a doua oară. Acest lucru va duce cel mai probabil la deadlock.
O altă problemă care poate apărea în cazul refolosirii unui obiect Event este că un thread care dorește o a doua așteptare și-ar putea continua execuția, fără ca evenimentul să fie semnalizat a doua oară. Această situație apare atunci când resetarea flag-ului cu metoda clear() este întârziată mai mult decât al doilea wait(). Evenimentul va rămâne astfel la valoarea true, iar al doilea wait() nu va bloca execuția în acestă situație precum se dorește.

Capabilitatea obiectului Event de a bloca execuția thread-urilor și de a le debloca pe toate în același timp poate fi folosită în locul semaforului, ca mecanism de blocare/deblocare, la implementarea unei bariere ne-reentrante. Avantajul acestei soluții față de cea cu semafor este claritatea. Event-ul se ocupă de blocarea/deblocarea thread-urilor, iar contorul și lock-ul țin evidența thread-urilor intrate în barieră. Soluția cu semafor conține însă două contoare (unul dat de semaforul însuși), care trebuie să rămână corelate, iar mecanismul de blocare/deblocare este combinat cu contorul, complicând astfel analiza implementării.

simple_barrier_event.py
from threading import *
 
class SimpleBarrier():
    def __init__(self, num_threads):
        self.num_threads = num_threads
        self.count_threads = self.num_threads    # contorizeaza numarul de thread-uri ramase
        self.count_lock = Lock()                 # protejeaza accesarea/modificarea contorului
        self.threads_event = Event()             # blocheaza thread-urile ajunse
 
    def wait(self):
        with self.count_lock:
            self.count_threads -= 1
            if self.count_threads == 0:          # a ajuns la bariera si ultimul thread
                self.threads_event.set()         # deblocheaza toate thread-urile
        self.threads_event.wait()                # num_threads-1 threaduri se blocheaza aici
                                                 # ultimul thread nu se va bloca deoarece event-ul a fost setat
 
class MyThread(Thread):
    def __init__(self, tid, barrier):
        Thread.__init__(self)
        self.tid = tid
        self.barrier = barrier
 
    def run(self):
        print "I'm Thread " + str(self.tid) + " before\n",
        self.barrier.wait()
        print "I'm Thread " + str(self.tid) + " after barrier\n",

Bariera obținută cu un singur obiect Event este însă ne-reentrantă. Încercări naive de transformare a acestei implementări într-o barieră reentrantă, prin resetarea evenimentului cu metoda clear(), vor duce fie la deadlock, fie la o barieră reentrantă care nu funcționează corect în momentul întârzierii apelului clear() (problemele care pot apărea la reutilizarea a unui obiect Event sunt exemplificate mai sus). O barieră reentrantă poate fi însă ușor implementată cu două obiecte Event, asemănător cu folosirea a două semafoare.

Condition

Condition (sau variabilă condiție) este un obiect de sincronizare care permite mai multor thread-uri blocarea voluntară până la apariția unei condiții semnalate de un alt thread, asemenător Event-urilor. Spre deosebire de acestea însă, un obiect Condition oferă un set de operații diferit și este asociat întotdeauna cu un lock. Lock-ul este creat implicit la instanțierea obiectului Condition sau poate fi pasat prin intermediul constructorului dacă mai multe obiecte Condition trebuie să partajeze același lock.

Un obiect Condition oferă operațiile acquire() și release() care vor bloca, respectiv, elibera lock-ul asociat și operațiile specifice: wait(), notify() și notify_all(). Aceste ultime trei operații trebuie întotdeauna apelate doar după blocarea prealabilă a lock-ului asociat. wait() va cauza blocarea thread-ului apelant până la semnalizarea condiției de către alt thread. Înainte de blocarea thread-ului apelant, wait() va debloca lock-ul asociat, iar după semnalizarea condiției, metoda wait() va aștepta preluarea lock-ului înainte de terminare. Toți acești trei pași ai metodei wait() sunt efectuați în mod atomic, thread-ul apelant fiind lăsat în aceeași stare ca înainte de apel (cu lock-ul asociat blocat). Semnalizarea unei condiții se face cu metodele notify() sau notify_all(). Diferența dintre aceste două metode este numărul de thread-uri deblocate în momentul apelului: notify() va debloca un singur thread, iar notify_all() va debloca toate thread-urile.

Cele trei operații: wait(), notify() și notify_all() vor lăsa lock-ul asociat în starea blocat, deblocarea acestuia făcându-se manual cu metoda release(). De remarcat că după un notify() sau notify_all(), thread-urile blocate în wait() nu vor continua imediat, ele trebuind să aștepte până când lock-ul asociat devine și el disponibil.
Funcționarea unui Condition este asemănătoare cu a monitorului asociat fiecărui obiect Java, metodele wait(), notify() și notify_all() / notifyAll() având aceeași semantică în ambele limbaje. Apelarea metodelor acquire() și release() este înlocuită în Java de blocul synchronize. Pentru o asemănare mai mare cu Java, în Python puteți folosi instrucțiunea with pentru apelarea automată a metodelor acquire() și release(), precum în exemplul următor:
Java Python
synchronize(c) {
    while(!check())
        c.wait();
}
with c:
    while(not check()):
        c.wait()
 

Un obiect Condition este util atunci când pe lângă semnalizarea unei condiții este necesar și un lock pentru a sincroniza accesul la o resursă partajată. În acest caz, un obiect Condition este de preferat unui Event deoarece oferă acest lock în mod implicit, revenirea din wait() în momentul semnalizării condiției făcându-se cu lock-ul blocat.

O altă utilizare a obiectului Condition poate fi văzută în implementarea barierei reentrante, ca mecanism de blocare/deblocare. Bariera poate fi implementată cu un singur obiect deoarece prezența implicită a lock-ului în operațiile obiectului Condition, împreună cu funcționarea atomică a metodei wait() ne permit evitarea problemelor ce apar la refolosirea obiectelor Event. Pe lângă notificarea thread-urilor de îndeplinirea condiție barierei, putem folosi obiectul Condition și pentru protejarea resursei partajate (contorul de thread-uri blocate), eliminând astfel necesitatea unui Lock separat.

reusable_barrier_condition.py
from threading import *
 
class ReusableBarrier():
    def __init__(self, num_threads):
        self.num_threads = num_threads
        self.count_threads = self.num_threads    # contorizeaza numarul de thread-uri ramase
        self.cond = Condition()                  # blocheaza/deblocheaza thread-urile
                                                 # protejeaza modificarea contorului
 
    def wait(self):
        self.cond.acquire()                      # intra in regiunea critica
        self.count_threads -= 1;
        if self.count_threads == 0:
            self.cond.notify_all()               # deblocheaza toate thread-urile
            self.count_threads = self.num_threads    # reseteaza contorul
        else:
            self.cond.wait();                    # blocheaza thread-ul eliberand in acelasi timp lock-ul
        self.cond.release();                     # iese din regiunea critica
 
 
class MyThread(Thread):
    def __init__(self, tid, barrier):
        Thread.__init__(self)
        self.tid = tid
        self.barrier = barrier
 
    def run(self):
        for i in xrange(10):
            self.barrier.wait()
            print "I'm Thread " + str(self.tid) + " after barrier, in step " + str(i) + "\n",

Queue

Cozile sincronizate sunt implementate în Python în modulul Queue în clasele Queue, LifoQueue și PriorityQueue. Obiectele de aceste tipuri sunt folosite pentru implementarea comunicării între threaduri, după modelul producători-consumatori.

Metodele oferite de aceste clase permit adăugarea și scoaterea de elemente într-un mod sincronizat (put(item) și get()) și interogarea stării cozii (empty(), qsize() și full()). În plus față de acestea, putem implementa ușor modelul master-worker folosind metodele task_done() și join(), ca în exemplul din documentație.

Exerciții

  1. (2p) Pornind de la exemplul de folosire a obiectelor Event din fișierul event.py modificați metodele get_work și get_result astfel încât fiecare metodă să afișeze numele ei, obiectul self cu care a fost apelată și thread-ul curent pe care rulează. Există vreo corelație între obiectul self și thread-ul curent? Justificați.
  2. (4p) Implementați în fișierul gossiping.py o propagare ciclică de tip gossiping folosind bariere.
    • Se consideră N noduri (obiecte de tip Thread), cu indecși 0…N-1.
    • Fiecare nod ține o valoare generată random.
    • Calculați valoarea minimă folosind următorul procedeu:
      • nodurile rulează în cicluri
      • într-un ciclu, fiecare nod comunică cu un subset de alte noduri pentru a obține valoarea acestora și a o compara cu a sa
        • ca subset considerați random 3 noduri din lista de noduri primită în constructor și obțineți valoarea acestora (metoda get_value)
      • după terminarea unui ciclu, fiecare nod va avea ca valoare minimul obținut în ciclul anterior
      • la finalul iterației toate nodurile vor conține valoarea minimă
    • Folosiți una din barierele reentrante din modulul barrier din scheletul de laborator.
    • Notă: pentru a simplifica implementarea, este folosit un număr predefinit de cicluri, negarantând astfel convergența tuturor nodurilor la același minim.
  3. (4p) Pornind de la fișierul event.py, înlocuiți obiectele Event work_available și result_available cu o singură variabilă condiție, păstrând funcționalitatea programului intactă.
  4. (2p) Rulați fișierul broken-event.py. Încercați diferite valori pentru sleep()-ul de la linia 47. Încercați eliminarea apelului sleep(). Ce observați? Precizați secvența _minimă_ de intercalare a apelurilor set și wait pe cele două obiecte Event care generează comportamentul observat.
    • Folosiți Ctrl+\ pentru a opri un program blocat.
    • Folosiți sleep() pentru a forța diferite intercalări ale thread-urilor.
    • Folosiți instrucțiuni print înaintea metodelor care lucrează cu Event-uri pentru a avea o idee asupra ordinii operațiilor.
    • :!: Datorită intercalărilor thread-urilor este posibil ca print-urile să nu reflecte ordinea exactă a operațiilor. Rulați de mai multe ori pentru a putea prinde problema.

Resurse

Referințe

asc/lab3/index.txt · Last modified: 2019/03/04 22:29 by mihai.volmer
CC Attribution-Share Alike 4.0 International
www.chimeric.de Valid CSS Driven by DokuWiki do yourself a favour and use a real browser - get firefox!! Recent changes RSS feed Valid XHTML 1.0