Archivi delle etichette: rrency pattern

Pool di goroutine

Sommario

Con il linguaggio Go, studieremo come implementare un pool di goroutine che si occupa di eseguire un insieme di compiti ciclicamente prelevando i dati da un unico canale di ingresso ed inserendo i risultati in un secondo canale di uscita.

Impiegati instancabili

Immaginiamo un gruppo di impiegati che in un unico grande ufficio evade una pila di pratiche una alla volta. Il capo deposita le pratiche sempre una alla volta in una pila, gli impiegati facendo la coda prelevano a turno dal fondo della pila una pratica, tornano alla scrivania per lavorarci e quando hanno finito la depositano in una pila di uscita per poi tornare a prelevare una nuova pratica.
Vedremo che le regole di questo ufficio sono alquanto strane: gli impiegati non si riposano mai nemmeno quando le pratiche sono tutte evase e la pila è vuota!

Tante gooutine si mettono in lista e prelevano il lavoro da fare...

Tante gooutine si mettono in lista e prelevano il lavoro da fare…

Pool di goroutine

Ok, niente panico! Passiamo a definire la geometria di un pool di gouroutine che si occupa di eseguire compiti. Ciascuna di esse preleva un dato di ingresso dal canale comune a tutte, ed elabora i risultati, li immette nel canale di uscita e ricomincia da capo.
Per concretizzare, mettiamo il caso in cui si voglia calcolare il quadrato di alcuni numeri.
A questo scopo, creiamo una funzione che spedisca uno alla volta in un canale i dati:

// arriva il capo ufficio!
func sending(data []int) chan int {
    ch := make(chan int, 10)
    go func () {
        for _, n := range data {
            ch <- n
        }
        close(ch)
    }()
    return ch
}

La simpatica funzione sending() restituisce il canale in cui inserisce i numeri tramite una goroutine così che quando il canale non è pronto essa rimarrà nello stato di blocco e non la goroutine principale con il rischio di generare un deadlock, a seconda dell’ordine di setup del pool.
Il deadlock è un errore di runtime: il programma in esecuzione si interrompe quando tutte le goroutine non possono proseguire perché bloccate.
Per sperimentare un deadlock, provate questo programma:

package main

func main() {
    ch := make(chan int)
    ch <- 10 // deadlock!
    println(<-ch)
}

Sembra tutto a posto, ma quando spediamo 10 sul canale, la goroutine principale — nella quale sta girando la funzione main() — non può proseguire all’istruzione successiva: entra in blocco nell’attesa che qualcosa richieda un dato all’uscita del canale.
Se definiamo il canale con una capacità maggiore di zero — canale bufferizzato — invece tutto andrà bene.

Tornando all’esempio, rimane da scrivere la funzione che crea il pool di goroutine, eccola:

// assunzione degli impiegati!
func makepool(in chan int, dim int) chan int {
    out := make(chan int) 
    for i := 0; i < dim; i++ {
        go func() {
            dojob(out, in)
        }()
    }
    return out
}

Ciascuna funzione del pool lavora con un ciclo infinito in cui ogni volta si preleva un numero dal canale d’ingresso — alimentato da sending() — e lo si spedisce su quello d’uscita dopo averlo elevato al quadrato:

// gli implacabili impiegati!
func dojob(out chan int, in chan int) {
    for {
        n := <- in 
        out <- n * n
    }
}

La funzione main() orchestra il tutto, prima costruendo ed alimentando il canale di entrata dei dati, poi costruendo il pool di 10 goroutine di elaborazione e per ultimo, attendendo che tutti i risultati arrivino:

func main() {
    // costruzione dello slice di dati
    n := 100
    data := make([]int, n)
    for i := 0; i < n; i++ {
        data[i] = i + 1
    }
    
    // pool's setup
    in := sending(data)
    out := makepool(in, 10)
    
    // waiting for the ending of entire job
    for i := 0; i < n; i++ {
        fmt.Printf("Result %d\n", <- out)
    }
}

Il codice completo pronto per l’esecuzione lo si può trovare a questo link.

Un difetto sottile

Le goroutine del pool operano con un ciclo infinito. Ciascuna di esse tenta di ricevere dati dal canale d’ingresso in che ad un certo punto però viene chiuso una volta terminato l’invio del set di dati.
Poiché in Go, quando si riceve un dato da un canale chiuso non si ottiene un errore ma il valore zero relativo al tipo del canale (in questo caso interi quindi il numero 0), cosa impedisce alle goroutine del pool di continuare a lavorare, almeno fino a quando non termina la goroutine principale?

Nel listato il canale di output out non è bufferizzato, quindi in main() una volta ricevuti i dati attesi viene bloccato. Una goroutine del pool può ancora fare in tempo a calcolare un quadrato ed a richiedere l’invio nel canale di uscita.

Se invece il canale di uscita ha una capacità maggiore di zero, il pool può ancora riempirlo lavorando inutilmente con i valori zero provenienti dal canale di ingresso ormai chiuso, ammettendo che la funzione main() sia ancora impegnata in qualche altro compito, lasciando quindi tempo al pool.

Per riordinare le idee con del codice effettivo, proviamo a verificare cosa succede se continuamo a prelevare dati da un canale chiuso:

package main

func main() {
    ch := make(chan int, 2)
    ch <- 100
    ch <- 200
    close(ch)
    println(<-ch) // stampa 100
    println(<-ch) // stampa 200
    println(<-ch) // stampa 0
}

Possiamo accorgerci però che il canale è chiuso perché ricevere da esso comporta in realtà ottenere non uno ma due valori: il dato utile ed un valore booleano che è vero se il canale è aperto, falso viceversa, esattamente come nel caso della richiesta di una valore in una mappa. Nel seguente programma prima di stampare il valore ci chiediamo se il canale è chiuso e non stiamo per caso ricevendo anziché un dato effettivo solo il valore zero del tipo.
In Go ogni variabile è SEMPRE inizializzata al valore zero corrispondente al tipo, e questa è una grande differenza rispetto ai liguaggi dinamici tipo Lua o Python.

package main

func main() {
    ch := make(chan int, 2)
    ch <- 100
    ch <- 200
    close(ch)
    for val, isOpen:=<-ch;isOpen;val, isOpen = <-ch{
        println(val)
    }
}

Adesso modifichiamo il codice principale nell’intento di rilevare il lavoro ‘imprevisto’ delle goroutine. Mettiamo in pausa per qualche millisecondo la funzione principale prima che termini per dare un po’ di tempo alle goroutine del pool di riempire il canale di uscita di cui è stata aumentata la capacità.
Poi un nuovo canale di interi cycles ci servirà per contare il numero di cicli effettivi compiuti dal pool:

// Simple goroutine pool
package main

import (
    "fmt"
    "time"
)

func main() {
    // costruzione dello slice di dati
    n := 100
    data := make([]int, n)
    for i := 0; i < n; i++ {
        data[i] = i + 1
    }

    // pool's setup
    in := sending(data)
    out := makepool(in, 10)

    // waiting for the ending of job
    for i := 0; i < n; i++ {
        fmt.Printf("Result %d\n", <-out)
    }
    time.Sleep(200 * time.Millisecond)
}

func makepool(in chan int, dim int) chan int {
    out := make(chan int, dim)
    cycles := make(chan int, dim)
    c := 0
    go func() {
        for {
            c += <-cycles
            fmt.Println(c)
        }
    }()
    for i := 0; i < dim; i++ {
        go func() {
            for {
                n := <-in
                out <- n * n
                cycles <- 1
            }
        }()
    }
    return out
}

func sending(data []int) chan int {
    ch := make(chan int, 10)
    go func() {
        for _, n := range data {
            ch <- n
        }
        close(ch)
    }()
    return ch
}

L’esecuzione di questa prova stampa i numeri dei cicli fino a 110. Il pool ha continuato a lavorare fino a saturare il canale di uscita dei risultati, tutti pari a zero. Se richiedessimo la stampa di ulteriori dati dal canale out per controllare che i valori siano zero, si libererebbo dei posti che il pool riempirebbe di nuovo con valori zero, sempre se solo la funzione main() non termini prima.

Soluzione

Abbiamo studiato nei dettagli il problema di un pool di goroutine che tenta di ricevere dati da un canale anche se chiuso. La soluzione è semplicemente quella di modificare il ciclo infinito per tener conto dello stato del canale: nel momento in cui una goroutine del pool scopre che è il canale è chiuso allora può terminare:

func dojob(out chan int, in chan int) {
    for n := range in {
        out <- n * n
    }
}

Alla prossima…
Un saluto.
R.