Concurrency
Table of Contents
1. Abstract
- Important aspect when it comes to optimizing the design of a system.
2. Patterns
2.1. Fan-In
- multiplexes multiple input channels onto one output channel
- 3 components:
- Sources : set of one or more input channels with the same type, accepted by funnel
- Destination : an output channel of the same type as Sources. Created and provided by funnel
- Funnel : Accepts sources and immediately returns Destination. Any input from any Sources will be output by Destination
2.1.1. Code
Funnel is variadic on sources (channels of a type (string for this instance))
package main import ( "sync" "time" "fmt" ) func Funnel(sources ...<-chan string ) <-chan string { dest := make(chan string) //the shared destination channel var wg sync.WaitGroup // to close dest when all sources are closed wg.Add(len(sources)) // setting size of WaitGroup for _, ch := range sources { go func(c <-chan string){ // start goroutine for each source defer wg.Done() //notify wg when closing for n := range c { dest <- n } }(ch) } go func(){ //init goroutine for closing dest wg.Wait() // wati close(dest) }() return dest } func main() { sources := make([]<-chan string, 0) //empty channel slice for i:= 0; i<3; i++ { ch := make(chan string) sources = append(sources, ch) go func(chanNum int) { // run goroutine for each channel defer close(ch) for i:= 1; i<= 5; i++ { ch <- fmt.Sprintf("sending %d from chan %d", i, chanNum) time.Sleep(time.Second ) } }(i) } dest := Funnel(sources...) for d := range dest { fmt.Println(d) } }
sending 1 from chan 0 sending 1 from chan 2 sending 1 from chan 1 sending 2 from chan 0 sending 2 from chan 2 sending 2 from chan 1 sending 3 from chan 2 sending 3 from chan 0 sending 3 from chan 1 sending 4 from chan 1 sending 4 from chan 0 sending 4 from chan 2 sending 5 from chan 2 sending 5 from chan 0 sending 5 from chan 1
- Note that the sends were from multiple sources but the Prints are from the destination channel
2.2. Fan-Out
- evenly distributes messages from an input channel to multiple output channels
- 3 components
- Source : input channel, accepted by split
- Destinations : output channels, of the same type as that of source; provided by split
- split : accepts source and returns destination
2.2.1. Code
- two possible implementations:
- single source to destination writer that round-robins across destinations
- delays, if the current destination is unable to receive
- needs just one goroutine
- multiple destination readers that compete for source's output
- slightly more complex but decreased likelihood of holdups due to a single worker
- chances of a destination reader starving
- showing example for this
- single source to destination writer that round-robins across destinations
package main import ( "fmt" "sync" ) func Split(source <-chan string, n int) []<-chan string { dests := make([]<-chan string, 0) //creating destinations' channel slice for i:= 0; i<n ; i ++ { ch := make(chan string)// creating n destinations dests = append(dests, ch) go func(chanNum int) { defer close(ch) // dedicated goroutine for each channel // that competes for reads from the source for val := range source { ch <- fmt.Sprintf("dest chan # %d intook { %s }",i, val) } }(i) } return dests } func main(){ source := make(chan string) //The input channel dests := Split(source, 5) // Retrieve 5 output channels go func() { for i := 1; i<= 10; i++ { //sending into source source <- fmt.Sprintf("%d from source chan", i) } close(source) //closing when done }() var wg sync.WaitGroup // wait until all dests close wg.Add(len(dests)) for i, ch := range dests { go func(i int, d <- chan string) { defer wg.Done() for val := range d { fmt.Println(val) } }(i, ch) } wg.Wait() }
dest chan # 4 intook { 5 from source chan } dest chan # 4 intook { 6 from source chan } dest chan # 4 intook { 8 from source chan } dest chan # 1 intook { 2 from source chan } dest chan # 1 intook { 7 from source chan } dest chan # 1 intook { 10 from source chan } dest chan # 4 intook { 9 from source chan } dest chan # 3 intook { 4 from source chan } dest chan # 0 intook { 1 from source chan } dest chan # 2 intook { 3 from source chan }
2.3. Future/Promise/Delays
- provides a placeholder for a value that is not yet known
not frequently used in golang, cause one can just return a channel that will receive the output later on
func ConcurrentInverse(m Matrix) <-chan Matrix { out := make(chan Matrix) go func () { out <- BlockingInverse(m) close(out) }() return out } func InverseProduct(a, b Matrix) Matrix { inva := ConcurrentInverse(a) invb := ConcurrentInverse(b) return Product(<-inva, <-invb) }
has some dowsides, if not careful how you convey the Promise
return Product(<-ConcurrentInverse(a), <-ConcurrentInverse(b))
- will begin computing inverse of b after the wait on a's output channel is over, i.e. when a has been completely computed, which is not the case in the usage mentioned before that.
- furthermore, channels being used as futures do not play well with multiple returns (each needs its dedicated receiver) : increased awkwardness.
- The future pattern encapsulates this
- 3 components:
- Future: interface that is received by the consumer to retrieve the eventual result
- SlowFunction : wrapper function around some function to be asynchronously executed: provides future
- InnerFuture : Satisfies the Future Interface, includes an attached method that contains the result accesss logic
2.3.1. Code
- Slow function is responsible for all the book-keeping needed to maitain InnerFuture
package main import ( "fmt" "sync" "context" "time" ) type Future interface{ Result() (string, error) } type InnerFuture struct { once sync.Once wg sync.WaitGroup res string err error resCh <-chan string errCh <-chan error } func (f *InnerFuture) Result() (string, error) { f.once.Do(func(){ f.wg.Add(1) defer f.wg.Done() f.res = <-f.resCh f.err = <-f.errCh }) f.wg.Wait() return f.res, f.err } func SlowFunction(ctx context.Context) Future { resCh := make(chan string) errCh := make(chan error) go func() { select { case <- time.After(time.Second * 2): resCh <- "This slept for 2 seconds" errCh <- nil case <-ctx.Done(): resCh <- "" errCh <- ctx.Err() } }() return &InnerFuture{resCh: resCh, errCh: errCh} } func main() { ctx := context.Background() future := SlowFunction(ctx) fmt.Printf("Obtained Future at %v\n", time.Now()) fmt.Printf("This routine is free to do other stuff now\n") res, err := future.Result() if err != nil { fmt.Println("error:", err) return } fmt.Printf("Obtained Promised value at %v\n", time.Now()) fmt.Println(res) }
2.4. Sharding
- splits a large data structure into multiple partitions to localize the effects of read/write locks
- helps distribute load and provide redundancy
- Usually employed natively by databases : horizontal sharding
- the sort of issues that do arise:
- waiting on the mutexes might become the bottleneck : lock contention
- can be resolved by scaling the instances, at the cost of increased complexity and latency
- reads can be easily distributed but writes need to be consistent
- can be resolved by scaling the instances, at the cost of increased complexity and latency
- waiting on the mutexes might become the bottleneck : lock contention
- the sort of issues that do arise:
- Alternative strat : Vertical sharding
- the larger data structure is partitioned into multiple structures each representing a portion of the whole.
- this only calls for a portion being locked when reading/writing from there : decreasing lock contention
- This doesn't scale well though
- Might consider using a hybrid adaptive strategy:
- initiate vertically shards based on access density (more requests to a region : create more shards )
- finally horizontally shard the vertically sharded portions adaptively
- two components:
- ShardedMap : wrapping over multiple shards abstracting access into that of a single map
- Shard : individually lockable collection representing a single data partition
2.4.1. Code
- starting out with unsharded global wrap
var items = struct { sync.RWMutex m map[string]int }(m: make(map[string]int)) func ThreadSafeRead(key string) int { items.RLock() value := items.m[key] items.RUnlock() return value } func ThreadSafeWrite(key string, value int) { items.Lock() items.m[key] = value items.Unlock() }
vertically sharding
package main import ( "fmt" "sync" "crypto/sha1" ) type Shard struct{ sync.RWMutex //composing the lock in m map[string]interface{} } type ShardedMap []*Shard func NewShardedMap(nshards int) ShardedMap{ shards := make(ShardedMap, nshards) for i:= 0; i< nshards; i++ { shard := make(map[string]interface{}) shards[i] = &Shard{m: shard} } return shards } func (m ShardedMap) getShardIndex(key string) int { checksum := sha1.Sum([]byte(key)) // choose an arbitrary hasher's input // a uint8 will only be able to handle 256 shards // for more shards : can use something like // hash := int(checksum[13]) << 8 | int(checksum[17]) // for byte 13 and 17 : exp(2, 16) shards possible now hash:= int(checksum[17]) return hash % len(m) } func (m ShardedMap) getShard(key string) *Shard { return m[m.getShardIndex(key)] } func(m ShardedMap) Get(key string) interface{} { shard := m.getShard(key) shard.RLock() defer shard.RUnlock() return shard.m[key] } func(m ShardedMap) Set(key string, val interface{}) { shard := m.getShard(key) shard.Lock() defer shard.Unlock() shard.m[key] = val } func(m ShardedMap) Keys() []string { keys := make([]string,0) mutex := sync.Mutex{} wg := sync.WaitGroup{} wg.Add(len(m)) for _,shard := range m { go func(s *Shard) { s.RLock() for key:= range s.m { mutex.Lock() keys = append(keys, key) mutex.Unlock() } s.RUnlock() wg.Done() }(shard) } wg.Wait() return keys } func main() { shardedMap := NewShardedMap(5) shardedMap.Set("alpha",1) shardedMap.Set("beta",2) shardedMap.Set("gamma",3) fmt.Println(shardedMap.Get("alpha")) fmt.Println(shardedMap.Get("beta")) fmt.Println(shardedMap.Get("gamma")) keys := shardedMap.Keys() for _,k := range keys{ fmt.Println(k) } }
1 2 3 gamma beta alpha