Concurrency with Shared Memory
Лекция 7
Максим Иванов
Максим Иванов
a := 1 happens before a += 1.a := 1 a += 1
// Package bank implements a bank with only one account.
package bank
var balance int
func Deposit(amount int) { balance = balance + amount }
func Balance() int { return balance }Работает корректно в одной горутине.
// Alice:
go func() {
bank.Deposit(200)
fmt.Println("=", bank.Balance())
}()
// Bob
go bank.Deposit(100)
Ошибка в этом примере называется data race.
5var x []int
go func() { x = make([]int, 10) }()
go func() { x = make([]int, 1000000) }()
x[999999] = 1 // NOTE: undefined behavior; memory corruption possible!Программа с data race перестаёт вести себя так, как написано в коде.
Иногда говорят «это безопасная гонка». Не бывает безопасных data race.
6-race:go test -race ./... go run -race main.go
Data race случается, когда несколько горутин одновременно работают с одной переменной, и хотя бы одна из них пишет в эту переменную.
Как защититься от data race?
1. Не писать в переменную.
2. Не работать с переменной из нескольких горутин.
3. Не работать с переменной одновременно. (mutex)
var icons = make(map[string]image.Image)
func loadIcon(name string) image.Image
// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
icon, ok := icons[name]
if !ok {
icon = loadIcon(name)
icons[name] = icon
}
return icon
}Правильно:
var icons = map[string]image.Image{
"spades.png": loadIcon("spades.png"),
"hearts.png": loadIcon("hearts.png"),
"diamonds.png": loadIcon("diamonds.png"),
"clubs.png": loadIcon("clubs.png"),
}
func Icon(name string) image.Image { return icons[name] }// Package bank provides a concurrency-safe bank with one account.
package bank
var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance
func Deposit(amount int) { deposits <- amount }
func Balance() int { return <-balances }
func teller() {
var balance int // balance is confined to teller goroutine
for {
select {
case amount := <-deposits:
balance += amount
case balances <- balance:
}
}
}
func init() {
go teller() // start the monitor goroutine
}import "sync"
var (
mu sync.Mutex // guards balance
balance int
)
func Deposit(amount int) {
mu.Lock()
balance = balance + amount
mu.Unlock()
}
func Balance() int {
mu.Lock()
defer mu.Unlock()
b := balance
return b
}var mu sync.RWMutex
var balance int
func Balance() int {
mu.RLock() // readers lock
defer mu.RUnlock()
return balance
}var x, y int
go func() {
x = 1 // A1
fmt.Print("y:", y, " ") // A2
}()
go func() {
y = 1 // B1
fmt.Print("x:", x, " ") // B2
}()var x, y int
go func() {
x = 1 // A1
fmt.Print("y:", y, " ") // A2
}()
go func() {
y = 1 // B1
fmt.Print("x:", x, " ") // B2
}()Можем ожидать
y:0 x:1 x:0 y:1 x:1 y:1 y:1 x:1
Но реально может произойти
x:0 y:0 y:0 x:0
var x, y atomic.Int32
go func() {
x.Store(1) // A1
fmt.Print("y:", y.Load(), " ") // A2
}()
go func() {
y.Store(1) // B1
fmt.Print("x:", x.Load(), " ") // B2
}()В терминологии модели памяти Go, если эффект атомарной операции A наблюдается атомарной операцией B, то A «synchronizes before» B (синхронизируется-перед B).
Кроме того, все атомарные операции в программе ведут себя так, как если бы выполнялись в некотором последовательно-согласованном порядке (sequential consistency).
Эти гарантии эквивалентны семантике последовательно-согласованных атомиков в C++ и переменных volatile в Java.
15type Uint32 struct {}
// Load atomically loads and returns the value stored in x.
func (x *Uint32) Load() uint32
// Store atomically stores val into x.
func (x *Uint32) Store(val uint32)
// Swap atomically stores new into x and returns the previous value.
func (x *Uint32) Swap(new uint32) (old uint32)
// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Uint32) CompareAndSwap(old, new uint32) (swapped bool)
// Add atomically adds delta to x and returns the new value.
func (x *Uint32) Add(delta uint32) (new uint32)type Value struct {}
// Load atomically loads and returns the value stored in x.
func (x *Value) Load() any
// Store atomically stores val into x.
func (x *Value) Store(val any)
// Swap atomically stores new into x and returns the previous value.
func (x *Value) Swap(new any) (old any)
// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Value) CompareAndSwap(old, new any) (swapped bool)func main() {
var config atomic.Value // holds current server configuration
// Create initial config value and store into config.
config.Store(loadConfig())
go func() {
// Reload config every 10 seconds
// and update config value with the new version.
for {
time.Sleep(10 * time.Second)
config.Store(loadConfig())
}
}()
// Create worker goroutines that handle incoming requests
// using the latest config value.
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// Handle request r using config c.
_, _ = r, c
}
}()
}
}var icons map[string]image.Image
func loadIcons() {
icons = map[string]image.Image{
"spades.png": loadIcon("spades.png")
}
}
// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
if icons == nil {
loadIcons() // one-time initialization
}
return icons[name]
}var mu sync.Mutex // guards icons
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
mu.Lock()
defer mu.Unlock()
if icons == nil {
loadIcons()
}
return icons[name]
}var loadIconsOnce sync.Once
var icons map[string]image.Image
// Concurrency-safe.
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 272.
//!+
// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo
// A Memo caches the results of calling a Func.
type Memo struct { f Func cache map[string]result } // Func is the type of the function to memoize. type Func func(key string) (interface{}, error) type result struct { value interface{} err error } // NOTE: not concurrency-safe! func (memo *Memo) Get(key string) (interface{}, error) { res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } return res.value, res.err }
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 275.
// Package memo provides a concurrency-safe memoization a function of
// type Func. Concurrent requests are serialized by a Mutex.
package memo
import "sync"
// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
//!+
type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]result } // Get is concurrency-safe. func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } memo.mu.Unlock() return res.value, res.err }
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 276.
// Package memo provides a concurrency-safe memoization a function of
// type Func. Requests for different keys run concurrently.
// Concurrent requests for the same key result in duplicate work.
package memo
import "sync"
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]result
}
type Func func(string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] memo.mu.Unlock() if !ok { res.value, res.err = memo.f(key) // Between the two critical sections, several goroutines // may race to compute f(key) and update the map. memo.mu.Lock() memo.cache[key] = res memo.mu.Unlock() } return res.value, res.err }
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
// See page 276.
// Package memo provides a concurrency-safe memoization a function of
// a function. Requests for different keys proceed in parallel.
// Concurrent requests for the same key block until the first completes.
// This implementation uses a Mutex.
package memo
import "sync"
// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)
type result struct {
value interface{}
err error
}
//!+
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]*entry
}
type entry struct { res result ready chan struct{} // closed when res is ready } func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() e := memo.cache[key] if e == nil { // This is the first request for this key. // This goroutine becomes responsible for computing // the value and broadcasting the ready condition. e = &entry{ready: make(chan struct{})} memo.cache[key] = e memo.mu.Unlock() e.res.value, e.res.err = memo.f(key) close(e.ready) // broadcast ready condition } else { // This is a repeat request for this key. memo.mu.Unlock() <-e.ready // wait for ready condition } return e.res.value, e.res.err }
Что происходит, когда Go совершает приведение типа из interface{}?
==.var v interface{}
x, ok := v.(int)var v interface{}
x, ok := v.(io.Reader)
// Конкретный тип v должен иметь метод Read([]byte) (int, error)var cache map[typeConversion]conversionResult
var cache map[typeConversion]conversionResult
Как защитить cache? sync.Mutex? Кеш из раннего примера?
Какой паттерн нагрузки?
Чего хотим?
map без лока.var cache sync.Map
func convertType(from, to typ) *conversionResult {
key := typeConversion{from: from, to: to}
res, ok := cache.Load(key)
if ok {
return res.(*conversionResult)
}
res = doConversion(from, to)
cache.Store(key, res)
return res.(*conversionResult)
}sync.Map хранит две map внутри. clean и dirty.clean всегда происходит без лока.dirty требует лока.dirty повышается до clean.package synconce
import "sync"
var cache sync.Map
type result struct{} func do(key string) *result { return new(result) } type entry struct { res *result sync.Once } func get(key string) *result { myEntry := &entry{} old, loaded := cache.LoadOrStore(key, myEntry) if loaded { myEntry = old.(*entry) } myEntry.Do(func() { myEntry.res = do(key) }) return myEntry.res }
type Once struct {
done, running bool
mu sync.Mutex
cond *sync.Cond
}
func (once *Once) Do(f func()) {
once.mu.Lock()
defer once.mu.Unlock()
if once.done {
return
}
if once.running {
once.cond.Wait() // releases and acquires mutex
return
}
once.running = true
once.mu.Unlock()
f()
once.mu.Lock()
once.done = true
once.running = false
once.cond.Broadcast()
}Пакеты в golang.org/x содержат код, который не вошёл в стандартную библиотеку.
sync.WaitGroup со встроенной обработкой ошибокtype Context interface {
// Возвращает время, когда операция будет оповещена о необходимости завершения
Deadline() (deadline time.Time, ok bool)
// Возвращает канал, который будет закрыт при необходимости завершить операцию
// Служит в качестве инструмента оповещения об отмене
Done() <-chan struct{}
// Если Done не закрыт - возвращает nil.
// Если Done закрыт, Err ошибку с объяснением причины:
// - Canceled - контекст был отменен
// - DeadlineExceeded - наступил дедлайн.
// После возвращения не nil ошибки Err всегда возвращает данную ошибку.
Err() error
// Позволяет получить произвольный объект из контекста
Value(key interface{}) interface{}
}Типы контекстов:
// root context
todo := context.TODO()
ctx := context.Background()
// manual cancel
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// manual cancel with explicit cause
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(fmt.Errorf("job not needed"))
// cancel by timeout
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
defer cancel()func SimpleCancelation() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { time.Sleep(5 * time.Second) cancel() }() if err := doSlowJob(ctx); err != nil { panic(err) } }
func doSlowJob(ctx context.Context) error { for { select { case <-ctx.Done(): return context.Cause(ctx) default: // perform a portion of slow job time.Sleep(1 * time.Second) } } }
func SimpleTimeout() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := doSlowJob(ctx); err != nil { panic(err) } }
func doSlowJob(ctx context.Context) error { for { select { case <-ctx.Done(): return context.Cause(ctx) default: // perform a portion of slow job time.Sleep(1 * time.Second) } } }
По соглашению Context всегда передается первым параметром в функции, обычно именуясь ctx.
database/sql.(*DB).QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
database/sql.(*DB).ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
net/http.NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error)
golang.org/x/sync/errgroup.WithContext(ctx context.Context) (*Group, context.Context)
...Быстрый пример:
ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", "http://loremipsum.com", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
// возможно тут будет DeadlineExceeded
}type myKey struct{} // use private type to restrict access to this package func WithUser(ctx context.Context, user string) context.Context { return context.WithValue(ctx, myKey{}, user) } // Export type-safe interface for users of this value func ContextUser(ctx context.Context) (string, bool) { v := ctx.Value(myKey{}) s, ok := v.(string) return s, ok }
func main() { ctx := context.Background() user, ok := ContextUser(ctx) fmt.Println(ok, user) ctx = WithUser(ctx, "petya") user, ok = ContextUser(ctx) fmt.Println(ok, user) }
Максим Иванов