Concurrency

1. Abstract

2. Patterns

2.1. Fan-In

  • multiplexes multiple input channels onto one output channel
  • 3 components:
    • Sources : set of one or more input channels with the same type, accepted by funnel
    • Destination : an output channel of the same type as Sources. Created and provided by funnel
    • Funnel : Accepts sources and immediately returns Destination. Any input from any Sources will be output by Destination

2.1.1. Code

  • Funnel is variadic on sources (channels of a type (string for this instance))

    package main
    
    import (
            "sync"
            "time"
            "fmt"
    )
    func Funnel(sources ...<-chan string ) <-chan string {
            dest := make(chan string) //the shared destination channel
    
            var wg sync.WaitGroup // to close dest when all sources are closed
    
            wg.Add(len(sources)) // setting size of WaitGroup
    
            for _, ch := range sources {
                    go func(c <-chan string){ // start goroutine for each source
                            defer wg.Done() //notify wg when closing
                            for n := range c {
                                    dest <- n
                            }
                    }(ch)
            }
    
            go func(){ //init goroutine for closing dest
                    wg.Wait() // wati
                    close(dest)
            }()
    
            return dest
    }
    
    func main() {
            sources := make([]<-chan string, 0) //empty channel slice
    
            for i:= 0; i<3; i++ {
                    ch := make(chan string)
                    sources = append(sources, ch)
    
                    go func(chanNum int) { // run goroutine for each channel
                            defer close(ch)
    
                            for i:= 1; i<= 5; i++ {
                                    ch <- fmt.Sprintf("sending %d from chan %d", i, chanNum)
                                    time.Sleep(time.Second )
                            }
                    }(i)
            }
    
            dest := Funnel(sources...)
            for d := range dest {
                    fmt.Println(d)
            }
    }
    
    sending 1 from chan 0
    sending 1 from chan 2
    sending 1 from chan 1
    sending 2 from chan 0
    sending 2 from chan 2
    sending 2 from chan 1
    sending 3 from chan 2
    sending 3 from chan 0
    sending 3 from chan 1
    sending 4 from chan 1
    sending 4 from chan 0
    sending 4 from chan 2
    sending 5 from chan 2
    sending 5 from chan 0
    sending 5 from chan 1
    
  • Note that the sends were from multiple sources but the Prints are from the destination channel

2.2. Fan-Out

  • evenly distributes messages from an input channel to multiple output channels
  • 3 components
    • Source : input channel, accepted by split
    • Destinations : output channels, of the same type as that of source; provided by split
    • split : accepts source and returns destination

2.2.1. Code

  • two possible implementations:
    • single source to destination writer that round-robins across destinations
      • delays, if the current destination is unable to receive
      • needs just one goroutine
    • multiple destination readers that compete for source's output
      • slightly more complex but decreased likelihood of holdups due to a single worker
      • chances of a destination reader starving
      • showing example for this
package main

import (
        "fmt"
        "sync"
)

func Split(source <-chan string, n int) []<-chan string {
        dests := make([]<-chan string, 0) //creating destinations' channel slice

        for i:= 0; i<n ; i ++ {
                ch := make(chan string)// creating n destinations
                dests = append(dests, ch)

                go func(chanNum int) {
                        defer close(ch)
                        // dedicated goroutine for each channel
                        // that competes for reads from the source

                        for val := range source {
                                ch <- fmt.Sprintf("dest chan # %d intook { %s }",i, val)
                        }
                }(i)
        }

        return dests
}


func main(){
        source := make(chan string) //The input channel
        dests := Split(source, 5) // Retrieve 5 output channels

        go func() {
                for i := 1; i<= 10; i++ { //sending into source
                        source <- fmt.Sprintf("%d from source chan", i)
                }
                close(source) //closing when done
        }()

        var wg sync.WaitGroup // wait until all dests close

        wg.Add(len(dests))

        for i, ch := range dests {
                go func(i int, d <- chan string) {
                        defer wg.Done()

                        for val := range d {
                                fmt.Println(val)
                        }
                }(i, ch)
        }
        wg.Wait()
}
dest chan # 4 intook { 5 from source chan }
dest chan # 4 intook { 6 from source chan }
dest chan # 4 intook { 8 from source chan }
dest chan # 1 intook { 2 from source chan }
dest chan # 1 intook { 7 from source chan }
dest chan # 1 intook { 10 from source chan }
dest chan # 4 intook { 9 from source chan }
dest chan # 3 intook { 4 from source chan }
dest chan # 0 intook { 1 from source chan }
dest chan # 2 intook { 3 from source chan }

2.3. Future/Promise/Delays

  • provides a placeholder for a value that is not yet known
  • not frequently used in golang, cause one can just return a channel that will receive the output later on

    func ConcurrentInverse(m Matrix) <-chan Matrix {
            out := make(chan Matrix)
    
            go func () {
                    out <- BlockingInverse(m)
                    close(out)
            }()
    
            return out
    }
    
    func InverseProduct(a, b Matrix) Matrix {
            inva := ConcurrentInverse(a)
            invb := ConcurrentInverse(b)
    
            return Product(<-inva, <-invb)
    }
    
  • has some dowsides, if not careful how you convey the Promise

    return Product(<-ConcurrentInverse(a), <-ConcurrentInverse(b))
    
    • will begin computing inverse of b after the wait on a's output channel is over, i.e. when a has been completely computed, which is not the case in the usage mentioned before that.
    • furthermore, channels being used as futures do not play well with multiple returns (each needs its dedicated receiver) : increased awkwardness.
  • The future pattern encapsulates this
  • 3 components:
    • Future: interface that is received by the consumer to retrieve the eventual result
    • SlowFunction : wrapper function around some function to be asynchronously executed: provides future
    • InnerFuture : Satisfies the Future Interface, includes an attached method that contains the result accesss logic

2.3.1. Code

  • Slow function is responsible for all the book-keeping needed to maitain InnerFuture
package main


import (
        "fmt"
        "sync"
        "context"
        "time"
)

type Future interface{
        Result() (string, error)
}

type InnerFuture struct {
        once sync.Once
        wg sync.WaitGroup

        res string
        err error
        resCh <-chan string
        errCh <-chan error
}

func (f *InnerFuture) Result() (string, error) {
        f.once.Do(func(){
                f.wg.Add(1)
                defer f.wg.Done()
                f.res = <-f.resCh
                f.err = <-f.errCh
        })

        f.wg.Wait()

        return f.res, f.err
}
func SlowFunction(ctx context.Context) Future {
        resCh := make(chan string)
        errCh := make(chan error)

        go func() {
                select {
                case <- time.After(time.Second * 2):
                        resCh <- "This slept for 2 seconds"
                        errCh <- nil
                case <-ctx.Done():
                        resCh <- ""
                        errCh <- ctx.Err()
                }
        }()

        return &InnerFuture{resCh: resCh, errCh: errCh}
}

func main() {
        ctx := context.Background()

        future := SlowFunction(ctx)
        fmt.Printf("Obtained Future at %v\n", time.Now())

        fmt.Printf("This routine is free to do other stuff now\n")

        res, err := future.Result()
        if err != nil {
                fmt.Println("error:", err)
                return
        }
        fmt.Printf("Obtained Promised value at %v\n", time.Now())

        fmt.Println(res)
}

2.4. Sharding

  • splits a large data structure into multiple partitions to localize the effects of read/write locks
    • helps distribute load and provide redundancy
  • Usually employed natively by databases : horizontal sharding
    • the sort of issues that do arise:
      • waiting on the mutexes might become the bottleneck : lock contention
        • can be resolved by scaling the instances, at the cost of increased complexity and latency
          • reads can be easily distributed but writes need to be consistent
  • Alternative strat : Vertical sharding
    • the larger data structure is partitioned into multiple structures each representing a portion of the whole.
    • this only calls for a portion being locked when reading/writing from there : decreasing lock contention
    • This doesn't scale well though
  • Might consider using a hybrid adaptive strategy:
    • initiate vertically shards based on access density (more requests to a region : create more shards )
    • finally horizontally shard the vertically sharded portions adaptively
  • two components:
    • ShardedMap : wrapping over multiple shards abstracting access into that of a single map
    • Shard : individually lockable collection representing a single data partition

2.4.1. Code

  • starting out with unsharded global wrap
var items = struct {
        sync.RWMutex
        m map[string]int
}(m: make(map[string]int))

func ThreadSafeRead(key string) int {
        items.RLock()
        value := items.m[key]
        items.RUnlock()
        return value
}

func ThreadSafeWrite(key string, value int) {
        items.Lock()
        items.m[key] = value
        items.Unlock()
}
  • vertically sharding

    package main
    
    import (
            "fmt"
            "sync"
            "crypto/sha1"
    )
    
    type Shard struct{
            sync.RWMutex //composing the lock in
            m map[string]interface{}
    }
    
    type ShardedMap []*Shard
    
    func NewShardedMap(nshards int) ShardedMap{
            shards := make(ShardedMap, nshards)
    
            for i:= 0; i< nshards; i++  {
                    shard := make(map[string]interface{})
                    shards[i] = &Shard{m: shard}
            }
    
            return shards
    }
    
    func (m ShardedMap) getShardIndex(key string) int {
            checksum := sha1.Sum([]byte(key))
            // choose an arbitrary hasher's input
            // a uint8 will only be able to handle 256 shards
            // for more shards : can use something like
            // hash := int(checksum[13]) << 8 |  int(checksum[17])
            // for byte 13 and 17 : exp(2, 16) shards possible now
            hash:= int(checksum[17])
            return hash % len(m)
    }
    
    func (m ShardedMap) getShard(key string) *Shard {
            return m[m.getShardIndex(key)]
    }
    
    func(m ShardedMap) Get(key string) interface{} {
            shard := m.getShard(key)
            shard.RLock()
            defer shard.RUnlock()
            return shard.m[key]
    }
    
    func(m ShardedMap) Set(key string, val interface{}) {
            shard := m.getShard(key)
            shard.Lock()
            defer shard.Unlock()
    
            shard.m[key] = val
    }
    
    func(m ShardedMap) Keys() []string {
            keys := make([]string,0)
    
            mutex := sync.Mutex{}
    
            wg := sync.WaitGroup{}
            wg.Add(len(m))
    
            for _,shard := range m {
                    go func(s *Shard) {
                            s.RLock()
    
                            for key:= range s.m {
                                    mutex.Lock()
                                    keys = append(keys, key)
                                    mutex.Unlock()
                            }
                            s.RUnlock()
                            wg.Done()
                    }(shard)
            }
    
            wg.Wait()
    
            return keys
    }
    
    func main() {
            shardedMap := NewShardedMap(5)
    
            shardedMap.Set("alpha",1)
            shardedMap.Set("beta",2)
            shardedMap.Set("gamma",3)
    
            fmt.Println(shardedMap.Get("alpha"))
            fmt.Println(shardedMap.Get("beta"))
            fmt.Println(shardedMap.Get("gamma"))
    
            keys := shardedMap.Keys()
    
            for _,k := range keys{
                    fmt.Println(k)
            }
    }
    
    1
    2
    3
    gamma
    beta
    alpha
    

4. Resources

4.1. BOOK: Cloud Native Go : Chapter 4

Tags::programming: