Archivi delle etichette: goroutine

Pool di goroutine

Sommario

Con il linguaggio Go, studieremo come implementare un pool di goroutine che si occupa di eseguire un insieme di compiti ciclicamente prelevando i dati da un unico canale di ingresso ed inserendo i risultati in un secondo canale di uscita.

Impiegati instancabili

Immaginiamo un gruppo di impiegati che in un unico grande ufficio evade una pila di pratiche una alla volta. Il capo deposita le pratiche sempre una alla volta in una pila, gli impiegati facendo la coda prelevano a turno dal fondo della pila una pratica, tornano alla scrivania per lavorarci e quando hanno finito la depositano in una pila di uscita per poi tornare a prelevare una nuova pratica.
Vedremo che le regole di questo ufficio sono alquanto strane: gli impiegati non si riposano mai nemmeno quando le pratiche sono tutte evase e la pila è vuota!

Tante gooutine si mettono in lista e prelevano il lavoro da fare...

Tante gooutine si mettono in lista e prelevano il lavoro da fare…

Pool di goroutine

Ok, niente panico! Passiamo a definire la geometria di un pool di gouroutine che si occupa di eseguire compiti. Ciascuna di esse preleva un dato di ingresso dal canale comune a tutte, ed elabora i risultati, li immette nel canale di uscita e ricomincia da capo.
Per concretizzare, mettiamo il caso in cui si voglia calcolare il quadrato di alcuni numeri.
A questo scopo, creiamo una funzione che spedisca uno alla volta in un canale i dati:

// arriva il capo ufficio!
func sending(data []int) chan int {
    ch := make(chan int, 10)
    go func () {
        for _, n := range data {
            ch <- n
        }
        close(ch)
    }()
    return ch
}

La simpatica funzione sending() restituisce il canale in cui inserisce i numeri tramite una goroutine così che quando il canale non è pronto essa rimarrà nello stato di blocco e non la goroutine principale con il rischio di generare un deadlock, a seconda dell’ordine di setup del pool.
Il deadlock è un errore di runtime: il programma in esecuzione si interrompe quando tutte le goroutine non possono proseguire perché bloccate.
Per sperimentare un deadlock, provate questo programma:

package main

func main() {
    ch := make(chan int)
    ch <- 10 // deadlock!
    println(<-ch)
}

Sembra tutto a posto, ma quando spediamo 10 sul canale, la goroutine principale — nella quale sta girando la funzione main() — non può proseguire all’istruzione successiva: entra in blocco nell’attesa che qualcosa richieda un dato all’uscita del canale.
Se definiamo il canale con una capacità maggiore di zero — canale bufferizzato — invece tutto andrà bene.

Tornando all’esempio, rimane da scrivere la funzione che crea il pool di goroutine, eccola:

// assunzione degli impiegati!
func makepool(in chan int, dim int) chan int {
    out := make(chan int) 
    for i := 0; i < dim; i++ {
        go func() {
            dojob(out, in)
        }()
    }
    return out
}

Ciascuna funzione del pool lavora con un ciclo infinito in cui ogni volta si preleva un numero dal canale d’ingresso — alimentato da sending() — e lo si spedisce su quello d’uscita dopo averlo elevato al quadrato:

// gli implacabili impiegati!
func dojob(out chan int, in chan int) {
    for {
        n := <- in 
        out <- n * n
    }
}

La funzione main() orchestra il tutto, prima costruendo ed alimentando il canale di entrata dei dati, poi costruendo il pool di 10 goroutine di elaborazione e per ultimo, attendendo che tutti i risultati arrivino:

func main() {
    // costruzione dello slice di dati
    n := 100
    data := make([]int, n)
    for i := 0; i < n; i++ {
        data[i] = i + 1
    }
    
    // pool's setup
    in := sending(data)
    out := makepool(in, 10)
    
    // waiting for the ending of entire job
    for i := 0; i < n; i++ {
        fmt.Printf("Result %d\n", <- out)
    }
}

Il codice completo pronto per l’esecuzione lo si può trovare a questo link.

Un difetto sottile

Le goroutine del pool operano con un ciclo infinito. Ciascuna di esse tenta di ricevere dati dal canale d’ingresso in che ad un certo punto però viene chiuso una volta terminato l’invio del set di dati.
Poiché in Go, quando si riceve un dato da un canale chiuso non si ottiene un errore ma il valore zero relativo al tipo del canale (in questo caso interi quindi il numero 0), cosa impedisce alle goroutine del pool di continuare a lavorare, almeno fino a quando non termina la goroutine principale?

Nel listato il canale di output out non è bufferizzato, quindi in main() una volta ricevuti i dati attesi viene bloccato. Una goroutine del pool può ancora fare in tempo a calcolare un quadrato ed a richiedere l’invio nel canale di uscita.

Se invece il canale di uscita ha una capacità maggiore di zero, il pool può ancora riempirlo lavorando inutilmente con i valori zero provenienti dal canale di ingresso ormai chiuso, ammettendo che la funzione main() sia ancora impegnata in qualche altro compito, lasciando quindi tempo al pool.

Per riordinare le idee con del codice effettivo, proviamo a verificare cosa succede se continuamo a prelevare dati da un canale chiuso:

package main

func main() {
    ch := make(chan int, 2)
    ch <- 100
    ch <- 200
    close(ch)
    println(<-ch) // stampa 100
    println(<-ch) // stampa 200
    println(<-ch) // stampa 0
}

Possiamo accorgerci però che il canale è chiuso perché ricevere da esso comporta in realtà ottenere non uno ma due valori: il dato utile ed un valore booleano che è vero se il canale è aperto, falso viceversa, esattamente come nel caso della richiesta di una valore in una mappa. Nel seguente programma prima di stampare il valore ci chiediamo se il canale è chiuso e non stiamo per caso ricevendo anziché un dato effettivo solo il valore zero del tipo.
In Go ogni variabile è SEMPRE inizializzata al valore zero corrispondente al tipo, e questa è una grande differenza rispetto ai liguaggi dinamici tipo Lua o Python.

package main

func main() {
    ch := make(chan int, 2)
    ch <- 100
    ch <- 200
    close(ch)
    for val, isOpen:=<-ch;isOpen;val, isOpen = <-ch{
        println(val)
    }
}

Adesso modifichiamo il codice principale nell’intento di rilevare il lavoro ‘imprevisto’ delle goroutine. Mettiamo in pausa per qualche millisecondo la funzione principale prima che termini per dare un po’ di tempo alle goroutine del pool di riempire il canale di uscita di cui è stata aumentata la capacità.
Poi un nuovo canale di interi cycles ci servirà per contare il numero di cicli effettivi compiuti dal pool:

// Simple goroutine pool
package main

import (
    "fmt"
    "time"
)

func main() {
    // costruzione dello slice di dati
    n := 100
    data := make([]int, n)
    for i := 0; i < n; i++ {
        data[i] = i + 1
    }

    // pool's setup
    in := sending(data)
    out := makepool(in, 10)

    // waiting for the ending of job
    for i := 0; i < n; i++ {
        fmt.Printf("Result %d\n", <-out)
    }
    time.Sleep(200 * time.Millisecond)
}

func makepool(in chan int, dim int) chan int {
    out := make(chan int, dim)
    cycles := make(chan int, dim)
    c := 0
    go func() {
        for {
            c += <-cycles
            fmt.Println(c)
        }
    }()
    for i := 0; i < dim; i++ {
        go func() {
            for {
                n := <-in
                out <- n * n
                cycles <- 1
            }
        }()
    }
    return out
}

func sending(data []int) chan int {
    ch := make(chan int, 10)
    go func() {
        for _, n := range data {
            ch <- n
        }
        close(ch)
    }()
    return ch
}

L’esecuzione di questa prova stampa i numeri dei cicli fino a 110. Il pool ha continuato a lavorare fino a saturare il canale di uscita dei risultati, tutti pari a zero. Se richiedessimo la stampa di ulteriori dati dal canale out per controllare che i valori siano zero, si libererebbo dei posti che il pool riempirebbe di nuovo con valori zero, sempre se solo la funzione main() non termini prima.

Soluzione

Abbiamo studiato nei dettagli il problema di un pool di goroutine che tenta di ricevere dati da un canale anche se chiuso. La soluzione è semplicemente quella di modificare il ciclo infinito per tener conto dello stato del canale: nel momento in cui una goroutine del pool scopre che è il canale è chiuso allora può terminare:

func dojob(out chan int, in chan int) {
    for n := range in {
        out <- n * n
    }
}

Alla prossima…
Un saluto.
R.

Giocando a ping pong in Go

Il sommario

Approfondiremo il funzionamento delle goroutine del linguaggio Go studiando l’esempio del ping pong, ovvero due goroutine che si rimbalzano dati sullo stesso canale.
L’argomento è particolarmente interessante perché il supporto diretto alla programmazione concorrente offerto dal Go tramite goroutine e canali, promette di rendere semplice accedere alla potenza di elaborazione dei moderni dispositivi multicore.
L’esempio del ping pong — presentato recentemente anche al Google I/O 2013 — è da una parte semplice in modo che si possono capire i concetti della dinamica di esecuzione concorrente e, dall’altra, offre la possibilità di studiare in dettaglio il comportamento del programma evidenziandone le sottigliezze.

Il codice del ping pong

Riporto subito il codice in Go (tra l’altro già presentato anche da Juhan in una variante): nella funzione main() vengono lanciate due goroutine che eseguono indipendentemente la funzione player() non appena creato il canale di comunicazione tra le due. A questo punto sia la prima goroutine chiamata Ann, sia la seconda chiamata Bob sono bloccate perché nessun dato è ancora disponibile nel canale. Ad iniziare il gioco ci pensa l’istruzione successiva che spedisce il numero 1 nel canale.
Da questo momento cosa accade?

package main

import (
    "fmt"
    "time"
)

func main() {
    ball := make(chan int)
    go player("Ann", ball)
    go player("Bob", ball)

    ball <- 1 // start the match
    time.Sleep(10 * time.Second)
    <-ball // stop the match
}

func player(name string, ball chan int) {
    for {
        touch := <- ball
        fmt.Printf("Player %s: %d\n", name, touch)
        touch++
        ball <- touch
        time.Sleep(100 * time.Millisecond)
    }
}

Quello che accade è riportato nello schema seguente: le due linee verticali rappresentano lo stato delle due goroutine Ann e Bob, con lo scorrerere del tempo, ipotizzando che le istruzioni vengano eseguite in un tempo zero. Una croce sulla linea significa che la goroutine è bloccata, un tratto spesso significa che la goroutine è in attesa.
Le frecce indicano dati inviati sul canale ed infine a destra è riportata la stampa in console prodotta dal programma.

Schema dinamico per goroutine

Schema dinamico per le goroutine che giocano a ping pong… i tratti spessi sono i periodi in cui la goroutine è in idle, i tratti con la crocetta sono periodi in cui la goroutine è bloccata.

Lo schema risponde alle domande spontanee: perché al tempo zero entrambi i giocatori danno un tocco alla palla per poi darne uno alternativamente ogni 100 millisecondi?
Come fare per ottenere invece un gioco regolare fin da subito?
Ed ancora, perché se nella funzione principale prelevo un dato dal canale il gioco si ferma?

Risposte spontanee

Quando Ann riceve il primo numero (1) seguendo il codice si ricava che essa stampa 1 e spedisce 2 sul canale e poi si mette in attesa per 100 miliisecondi. Bob è in attesa sul canale, riceve immediatamente (o quasi) il 2, lo stampa ma si blocca all’istruzione che invia 3 sul canale perché Ann sta dormendo. Finalmente Ann si svegli per t=100ms, Bob può quindi spedire il dato ed entrare in letargo per i prossimi 100ms.
Nel frattempo Ann ha già stampato 3 ma non può preseguire perché il canale è bloccato almeno fino a t=200ms. E così via.
All’inizio, come potete notare eseguendo il programma, ci sono due rimbalzi ma poi il gioco si fa regolare per effetto del blocco del canale quando l’altra goroutine dorme…

Per ottenere un gioco regolare fin dall’inizio basta anticipare la messa in attesa della funzione rispetto all’invio sul canale, così (fate per esercizio lo schema dinamico corrispondente e confermatene la correttezza eseguendo il programma):

func player(name string, ball chan int) {
    for {
        touch := <- ball
        fmt.Printf("Player %s: %d\n", name, touch)
        time.Sleep(100 * time.Millisecond)
        touch++
        ball <- touch
    }
}

Infine, se è la funzione main() a prelevare il dato dal canale sia Ann che Bob si metteranno in attesa di nuovo sulla prima istruzione del ciclo for infinito. La funzione principale viene eseguita essa stessa in una goroutine, ed esegue una vera e propria intercettazione della palla.
Per dimostrare con un programma questo meccanismo consideriamo il seguente codice:

// ping pong test
package main

import (
    "fmt"
    "time"
)

func main() {
    ball := make(chan int)
    go player("Ann", ball)
    go player("Bob", ball)
    
    fmt.Println("Start the match")
    ball <- 1
    time.Sleep(10 * time.Second)
    fmt.Println("Pause the match for five seconds")
    tmp := <-ball
    time.Sleep(5 * time.Second)
    fmt.Println("Ok. Go again now")
    ball <- tmp
    time.Sleep(10 * time.Second)
    fmt.Println("Stop!")
    <-ball
}


func player( name string, ball chan int) {
    for {
        touch := <- ball
        fmt.Printf("Player %s: %d\n", name, touch)
        time.Sleep(1*time.Second)
        touch++
        ball <- touch
    }
}

Ok, un esercizio

Cosa succede se faccio giocare tre giocatori invece che due?
Anche in questo caso uno schema temporale di esecuzione come quello proposto chiarisce il comportamento del programma: ad ogni intervallo (per esempio i soliti 100ms iniziali) ci sono due giocatori che fanno un rimbalzo quindi il conteggio è doppio rispetto al caso precedente.
Fate la prova!

Ping pong multigiocatori

Se volessimo far giocare 10 giocatori l’idea potrebbe essere quella di collegarli tramite canali a formare un cerchio. Ciascun giocatore riceve la palla da quello alla sua destra e la rimanda a quello alla sua sinistra.
Ecco il curioso programma (ovvio che vi lascio verificare il risultato):

// ping pong test
package main

import (
    "fmt"
    "time"
)

const p = 10

func main() {
    // creo p canali
    var chs [p]chan int
    for i := 0; i < p; i++ {
        chs[i] = make(chan int)
    }
    // players in action
    for i := 1; i < p; i++ {
        go player(i, chs[i-1], chs[i])
    }
    go player(p, chs[p-1], chs[0])
    
    fmt.Println("Start the match")
    chs[0] <- 1
    time.Sleep(10 * time.Second)
    
    <-chs[0]
    fmt.Println("Stop!")
}

func player( name int, ball, pass chan int) {
    for {
        touch := <- ball
        fmt.Printf("Player %d: %d\n", name, touch)
        time.Sleep(500*time.Millisecond)
        touch++
        pass <- touch
    }
}

Attenzione però, perchè stranamente il programma non termina subito quando la funzione principale chiede di ricevere il numero da un canale. Quello che accade è che comunque la palla compie un giro fino ad arrivare al canale in cui attende la main().
Il motivo di questo comportamento a mio parere è che nel momento in cui desidereremo interrompere il gioco prelevando un numero da uno dei canali, due goroutine sono in competizione: quella della funzione principale e quella a cui il giocatore precedente vorrebbe inviare il numero, ed in questa competizione vince la prima volta la goroutine del giocatore, e la seconda quella della funzione principale.
In altre parole, la palla va al giocatore successivo e non all’arbitro che comunque avrà successo al passaggio successivo.
Su questo punto sarebbe interessante conoscere la vostra interpretazione. La soluzione che propongo io è quella di creare un canale a parte in cui da main() si spedisce un segnale di interruzione. La funzione player() va modificata con un istruzione select che per prima cosa tenta di ricevere dal canale di interruzione partita, altrimenti spedisce il numero di tocco al giocatore vicino:

// ping pong test
package main

import (
    "fmt"
    "time"
)

const p = 3

func main() {
    // creo p canali
    var chs [p]chan int
    stopsignal := make(chan int)
    for i := 0; i < p; i++ {
        chs[i] = make(chan int)
    }
    // players in action
    for i := 1; i < p; i++ {
        go player(i, chs[i-1], chs[i], stopsignal)
    }
    go player(p, chs[p-1], chs[0], stopsignal)

    fmt.Println("Start the match")
    chs[0] <- 1
    time.Sleep(10 * time.Second)
    fmt.Println("Stop!")
    stopsignal <- 1
    time.Sleep(3 * time.Second)
}

func player(name int, ball, pass, stop chan int) {
    for {
        touch := <-ball
        fmt.Printf("Player %d: %d\n", name, touch)
        time.Sleep(500 * time.Millisecond)
        touch++
        select {
            case <- stop:
                stop <- touch
            default:
                pass <- touch
        }
    }
}

Conclusioni

Ma ci devono sempre essere le conclusioni? Io vorrei continuare a giocare per esempio ma vi lancio volentieri la palla…
Alla prossima.
R.

Go Fibonacci!

Sommario

Creeremo un programma per la generazione dei numeri di Fibonacci per poi entrare nel campo dell’esecuzione concorrente in Go.

Fibonacci

La sequenza di Fibonacci si genera sommando i precendenti due numeri della serie. Questa regola necessita di definire i primi due numeri e questi sono semplicemente assunti pari a 0 ed 1.
In Go il calcolo dell’ennesimo numero della sequenza può essere ottenuto con il codice seguente sfruttando direttamente la definizione della serie e l’assegnazione multipla (linea 8) tra l’altro disponibile anche nei linguaggi Lua e Python:

package main

// trova l'ennesimo numero della serie
// di Fibonacci
func fibonacci(n int) int {
    var a, b int = 0, 1
    for i := 0; i < n-1; i++ {
        a, b = b, a+b
    }
    return a
}

func main() {
    for i := 1; i < 10; i++ {
        print(fibonacci(i), " ")
    }
    println()
}

Calcoli indipendenti

Il supporto alla programmazione concorrente del Go è probabilmente — in un mondo multiprocessore — la principale caratteristica per la sua diffusione, e pensare che nel linguaggio vi sono pochissimi costrutti sintattici per implementarla (caso mai la semplicità fosse un vantaggio).
Li abbiamo visti già tutti all’opera su Ok, panico!, grazie ai post di Juhan ;-).

Come forse avrete intuito, proveremo a calcolare molti numeri di Fibonacci in modo indipendente. Questa parola è importante perché la programmazione concorrente non è altro che un insieme di esecuzioni che si svolgono indipendentemente una dall’altra — come sottolinea Robert Pike. La distinzione è dovuta al fatto che ci si può sbagliare usando per questa modalità di esecuzione il termine parallelismo, che invece indica un insieme di esecuzioni che avvengono contemporamente. Nei moderni pc multicore, l’esecuzione concorrente può avvicinarsi al parallelismo.

Fibonacci independente

Un semplice schema per l’esecuzione concorrente di più funzioni di Fibonacci, è quello di avviarne l’esecuzione in una goroutine ed attenderne in quella principale i risultati provenienti da un canale.

Ecco il codice in cui si deve intendere che la funzione mancante fibonacci() sia quella del listato precedente:

func fibonacci(n int) int64 {
    // as before with int64 return value
}

var num = []int{50, 36, 80, 93, 66}

func main() {
    ans := make(chan int, len(num)) // buffered channel
    for _, f := range num {
        go func(f int) {
            ans <- fibonacci(f)
        }(f)
    }
    
    // stampo i risultati provenienti dal canale
    for i := 0; i < len(num); i++ {
        fib := <-ans
        fmt.Printf("Fibonacci(%d)=%d\n", num[i], fib)
    }
}

Nella funzione main() dopo aver creato un canale, avviamo tante goroutine quanti sono i numeri della serie da calcolare iterando sugli elementi di uno slice (num). Al termine del ciclo avremo nel nostro caso 5 goroutine in esecuzione indipendente da quella principale.
La prima diversità dalla programmazione classica è che l’istruzione go avvia una nuova linea di esecuzione senza attendere che questa termini. Quasi immediatamente raggiungiamo quindi il secondo ciclo for che preleva in sequenza i dati dal canale.

Questo schema è piuttosto semplice. Non conosciamo l’ordine con cui i dati arrivano e dobbiamo ricordarci che l’istruzione

        fib := <-ans

comporta il blocco dell’esecuzione della goroutine principale (quella in cui gira la funzione main()), che attende fino all’arrivo di un dato, assicurandoci che vengano attesi cinque valori dal canale altrimenti la funzione main() terminerà prima che le goroutine di calcolo abbiano portato a termine il lavoro.
La goroutine infatti non sanno niente di quello che stanno facendo le altre eventuali goroutine in esecuzione e se main() termina, termineranno forzatamente tutte.
Dal punto di vista della singola goroutine al termine del calcolo l’invio sul canale del risultato è immediato, essendo questo dotato di capacità pari al numero dei risultati che vi saranno inviati (buffered channel), altrimenti essa avrebbe dovuto attendere che la gorountine main() fosse pronta a ricevere un dato (sincronizzazione del mandante con il ricevente).

Per capire la concorrenza in Go conviene quindi immaginare il funzionamento delle cose in modo dinamico tenendo conto del blocco o meno dell’invio o della ricezione dei dati dai canali.

Prestazioni

Ho fatto alcune prove variando la quantità dei numeri da calcolare. Sulla mia macchina Linux dotata di un processore con un unico core, le prestazioni migliorano solo di alcuni punti percentuali, ed addirittura peggiorano quando crescono i numeri da calcolare in num.
Evidentemente il costo per la creazione delle goroutine — sia pure piccolo — non è compensato su una macchina ad un unico core da vantaggi particolari.
Oltre a capire sperimentando il codice, quello che importa adesso non sono le prestazioni ma che occorre considerare con precisione la natura del problema per poter scegliere o meno una soluzione a calcolo indipentente.

Si tratta di un argomento affascinante!

Difetto

Il codice precedente ha un difetto: è necessario attendere che tutte le goroutine siano state create e lanciate prima di passare a raccogliere i risultati. Per esempio se per creare una goroutine accorresse 1 millisecondo e ciascuna mediamente richiedesse 50ms di esecuzione concorrente, allora i risultati dovrebbero attendere stipati nel canale se le goroutine fossero circa più di 50.

La soluzione è quella di inserire il ciclo di creazione delle goroutine esso stesso all’interno di una goroutine:

func main() {
    // Use all the machine's cores
    runtime.GOMAXPROCS(runtime.NumCPU()) 
    res := make(chan int64)
    
    go func() {
        for _, f := range num {
            go func(f int) {
                res <- fibonacci(f)
            }(f)
        }
    }()
    
    for i := 0; i < len(num); i++ {
        <-res
    }
}

Altro simpatico esempio elegante

Questa volta spediamo sul canale la serie di Fibonacci da una goroutine separata basata su un ciclo for infinito (a terminare il programma sarà brutalmente il termine della funzione main() nella quale chiederemo la stampa dei primi dieci numeri della serie):

package main

import "fmt"

func main() {
    ch := make(chan int)
    go func(a1, a2 int) {
        for {
            ch <- a1
            a1, a2 = a2, a1 + a2
        }
    }(0, 1)
    
    for i := 0; i<10; i++ {
        fmt.Print(<- ch, " ")
    }
    fmt.Println()
}

Sfida…

Invito i visitatori del blog a presentare nuovi schemi di calcolo concorrente o semplicemente solo i risultati ottenuti con i vostri megacalcolatori.

Quello che serve è una installazione di Go, e magari sapere che esistono comode funzioni nel pacchetto time che misurano con precisione il tempo macchina, come nel seguente esempio:

package main

import (
    "fmt"
    "math"
    "time"
)

func main() {
    multiPi := make([]float64, 10000)
    t := time.Now()
    for i := 0; i < 10000; i++ {
        multiPi[i] = math.Pi * float64(i)
    }
    fmt.Printf("Executin time: %v\n", time.Since(t))
}

Un saluto.
R.