Concurrency with Shared Memory

Лекция 7

Максим Иванов

Concurrency with Shared Memory

2

Happens-Before

a := 1
a += 1
3

Race Condition

4

Example Bank

// Package bank implements a bank with only one account.
package bank

var balance int

func Deposit(amount int) { balance = balance + amount }

func Balance() int { return balance }

Работает корректно в одной горутине.

// Alice:
go func() {
    bank.Deposit(200)
    fmt.Println("=", bank.Balance())
}()

// Bob
go bank.Deposit(100)

Ошибка в этом примере называется data race.

5

Data Race breaks memory safety

var x []int
go func() { x = make([]int, 10) }()
go func() { x = make([]int, 1000000) }()
x[999999] = 1 // NOTE: undefined behavior; memory corruption possible!

Программа с data race перестаёт вести себя так, как написано в коде.

Иногда говорят «это безопасная гонка». Не бывает безопасных data race.

6

Детектор гонок

go test -race ./...
go run -race main.go
7

Data Race

Data race случается, когда несколько горутин одновременно работают с одной переменной, и хотя бы одна из них пишет в эту переменную.

Как защититься от data race?

1. Не писать в переменную.
2. Не работать с переменной из нескольких горутин.
3. Не работать с переменной одновременно. (mutex)

8

Не писать в переменную

var icons = make(map[string]image.Image)
func loadIcon(name string) image.Image

// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
    icon, ok := icons[name]
    if !ok {
        icon = loadIcon(name)
        icons[name] = icon
    }
    return icon
}

Правильно:

var icons = map[string]image.Image{
    "spades.png":   loadIcon("spades.png"),
    "hearts.png":   loadIcon("hearts.png"),
    "diamonds.png": loadIcon("diamonds.png"),
    "clubs.png":    loadIcon("clubs.png"),
}

func Icon(name string) image.Image { return icons[name] }
9

Не работать с переменной из нескольких горутин

// Package bank provides a concurrency-safe bank with one account.
package bank

var deposits = make(chan int) // send amount to deposit
var balances = make(chan int) // receive balance

func Deposit(amount int) { deposits <- amount }
func Balance() int       { return <-balances }

func teller() {
    var balance int // balance is confined to teller goroutine
    for {
        select {
        case amount := <-deposits:
            balance += amount
        case balances <- balance:
        }
    }
}

func init() {
    go teller() // start the monitor goroutine
}
10

sync.Mutex

import "sync"

var (
    mu      sync.Mutex // guards balance
    balance int
)

func Deposit(amount int) {
    mu.Lock()
    balance = balance + amount
    mu.Unlock()
}

func Balance() int {
    mu.Lock()
    defer mu.Unlock()
    b := balance
    return b
}
11

sync.RWMutex

var mu sync.RWMutex
var balance int

func Balance() int {
    mu.RLock() // readers lock
    defer mu.RUnlock()
    return balance
}
12

Memory Order

var x, y int
go func() {
    x = 1                   // A1
    fmt.Print("y:", y, " ") // A2
}()
go func() {
    y = 1                   // B1
    fmt.Print("x:", x, " ") // B2
}()
13

Memory Order

var x, y int
go func() {
    x = 1                   // A1
    fmt.Print("y:", y, " ") // A2
}()
go func() {
    y = 1                   // B1
    fmt.Print("x:", x, " ") // B2
}()

Можем ожидать

y:0 x:1
x:0 y:1
x:1 y:1
y:1 x:1

Но реально может произойти

x:0 y:0
y:0 x:0
14

Atomics

var x, y atomic.Int32
go func() {
    x.Store(1)                     // A1
    fmt.Print("y:", y.Load(), " ") // A2
}()
go func() {
    y.Store(1)                     // B1
    fmt.Print("x:", x.Load(), " ") // B2
}()

В терминологии модели памяти Go, если эффект атомарной операции A наблюдается атомарной операцией B, то A «synchronizes before» B (синхронизируется-перед B).

Кроме того, все атомарные операции в программе ведут себя так, как если бы выполнялись в некотором последовательно-согласованном порядке (sequential consistency).

Эти гарантии эквивалентны семантике последовательно-согласованных атомиков в C++ и переменных volatile в Java.

15

Atomic Int

type Uint32 struct {}

// Load atomically loads and returns the value stored in x.
func (x *Uint32) Load() uint32

// Store atomically stores val into x.
func (x *Uint32) Store(val uint32)

// Swap atomically stores new into x and returns the previous value.
func (x *Uint32) Swap(new uint32) (old uint32)

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Uint32) CompareAndSwap(old, new uint32) (swapped bool)

// Add atomically adds delta to x and returns the new value.
func (x *Uint32) Add(delta uint32) (new uint32)
16

Atomic Value

type Value struct {}

// Load atomically loads and returns the value stored in x.
func (x *Value) Load() any

// Store atomically stores val into x.
func (x *Value) Store(val any)

// Swap atomically stores new into x and returns the previous value.
func (x *Value) Swap(new any) (old any)

// CompareAndSwap executes the compare-and-swap operation for x.
func (x *Value) CompareAndSwap(old, new any) (swapped bool)
17

Config update

func main() {
      var config atomic.Value // holds current server configuration
      // Create initial config value and store into config.
      config.Store(loadConfig())
      go func() {
            // Reload config every 10 seconds
            // and update config value with the new version.
            for {
                  time.Sleep(10 * time.Second)
                  config.Store(loadConfig())
            }
      }()
      // Create worker goroutines that handle incoming requests
      // using the latest config value.
      for i := 0; i < 10; i++ {
            go func() {
                  for r := range requests() {
                        c := config.Load()
                        // Handle request r using config c.
                        _, _ = r, c
                  }
            }()
      }
}
18

Ленивая инициализация sync.Once.

var icons map[string]image.Image

func loadIcons() {
    icons = map[string]image.Image{
        "spades.png": loadIcon("spades.png")
    }
}

// NOTE: not concurrency-safe!
func Icon(name string) image.Image {
    if icons == nil {
        loadIcons() // one-time initialization
    }
    return icons[name]
}
19

Ленивая инициализация sync.Once.

var mu sync.Mutex // guards icons
var icons map[string]image.Image

// Concurrency-safe.
func Icon(name string) image.Image {
    mu.Lock()
    defer mu.Unlock()
    if icons == nil {
        loadIcons()
    }
    return icons[name]
}
20

Ленивая инициализация sync.Once.

var loadIconsOnce sync.Once
var icons map[string]image.Image

// Concurrency-safe.
func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}
21

Concurrent cache

// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 272.

//!+

// Package memo provides a concurrency-unsafe
// memoization of a function of type Func.
package memo

// A Memo caches the results of calling a Func.
type Memo struct {
    f     Func
    cache map[string]result
}

// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)

type result struct {
    value interface{}
    err   error
}

// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    return res.value, res.err
}
22

Concurrent cache

// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 275.

// Package memo provides a concurrency-safe memoization a function of
// type Func.  Concurrent requests are serialized by a Mutex.
package memo

import "sync"

// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

//!+

type Memo struct {
    f     Func
    mu    sync.Mutex // guards cache
    cache map[string]result
}

// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    if !ok {
        res.value, res.err = memo.f(key)
        memo.cache[key] = res
    }
    memo.mu.Unlock()
    return res.value, res.err
}
23

Concurrent cache

// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 276.

// Package memo provides a concurrency-safe memoization a function of
// type Func.  Requests for different keys run concurrently.
// Concurrent requests for the same key result in duplicate work.
package memo

import "sync"

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]result
}

type Func func(string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]result)}
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    res, ok := memo.cache[key]
    memo.mu.Unlock()
    if !ok {
        res.value, res.err = memo.f(key)

        // Between the two critical sections, several goroutines
        // may race to compute f(key) and update the map.
        memo.mu.Lock()
        memo.cache[key] = res
        memo.mu.Unlock()
    }
    return res.value, res.err
}
24

Concurrent cache

// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/

// See page 276.

// Package memo provides a concurrency-safe memoization a function of
// a function.  Requests for different keys proceed in parallel.
// Concurrent requests for the same key block until the first completes.
// This implementation uses a Mutex.
package memo

import "sync"

// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)

type result struct {
	value interface{}
	err   error
}

//!+

func New(f Func) *Memo {
	return &Memo{f: f, cache: make(map[string]*entry)}
}

type Memo struct {
	f     Func
	mu    sync.Mutex // guards cache
	cache map[string]*entry
}

type entry struct {
    res   result
    ready chan struct{} // closed when res is ready
}

func (memo *Memo) Get(key string) (value interface{}, err error) {
    memo.mu.Lock()
    e := memo.cache[key]
    if e == nil {
        // This is the first request for this key.
        // This goroutine becomes responsible for computing
        // the value and broadcasting the ready condition.
        e = &entry{ready: make(chan struct{})}
        memo.cache[key] = e
        memo.mu.Unlock()
        e.res.value, e.res.err = memo.f(key)
        close(e.ready) // broadcast ready condition
    } else {
        // This is a repeat request for this key.
        memo.mu.Unlock()
        <-e.ready // wait for ready condition
    }
    return e.res.value, e.res.err
}
25

sync.Map

Что происходит, когда Go совершает приведение типа из interface{}?

var v interface{}
x, ok := v.(int)
var v interface{}
x, ok := v.(io.Reader)

// Конкретный тип v должен иметь метод Read([]byte) (int, error)
var cache map[typeConversion]conversionResult
26

sync.Map

var cache map[typeConversion]conversionResult

Как защитить cache? sync.Mutex? Кеш из раннего примера?

Какой паттерн нагрузки?

Чего хотим?

27

sync.Map

var cache sync.Map

func convertType(from, to typ) *conversionResult {
    key := typeConversion{from: from, to: to}
    res, ok := cache.Load(key)
    if ok {
        return res.(*conversionResult)
    }

    res = doConversion(from, to)
    cache.Store(key, res)
    return res.(*conversionResult)
}
28

Как сделать sync.Map без двойного вычисления?

package synconce

import "sync"

var cache sync.Map

type result struct{}

func do(key string) *result { return new(result) }

type entry struct {
    res *result
    sync.Once
}

func get(key string) *result {
    myEntry := &entry{}

    old, loaded := cache.LoadOrStore(key, myEntry)
    if loaded {
        myEntry = old.(*entry)
    }

    myEntry.Do(func() {
        myEntry.res = do(key)
    })

    return myEntry.res
}
29

sync.Cond

type Once struct {
    done, running bool
    mu            sync.Mutex
    cond          *sync.Cond
}

func (once *Once) Do(f func()) {
    once.mu.Lock()
    defer once.mu.Unlock()
    if once.done {
        return
    }
    if once.running {
        once.cond.Wait() // releases and acquires mutex
        return
    }

    once.running = true
    once.mu.Unlock()
    f()
    once.mu.Lock()
    once.done = true
    once.running = false
    once.cond.Broadcast()
}
30

golang.org/x/sync

Пакеты в golang.org/x содержат код, который не вошёл в стандартную библиотеку.

31

context

type Context interface {
  // Возвращает время, когда операция будет оповещена о необходимости завершения
  Deadline() (deadline time.Time, ok bool)

  // Возвращает канал, который будет закрыт при необходимости завершить операцию
  // Служит в качестве инструмента оповещения об отмене
  Done() <-chan struct{}

  // Если Done не закрыт - возвращает nil.
  // Если Done закрыт, Err ошибку с объяснением причины:
  // - Canceled - контекст был отменен
  // - DeadlineExceeded - наступил дедлайн.
  // После возвращения не nil ошибки Err всегда возвращает данную ошибку.
  Err() error

  // Позволяет получить произвольный объект из контекста
  Value(key interface{}) interface{}
}
32

context

Типы контекстов:

// root context
todo := context.TODO()
ctx := context.Background()

// manual cancel
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// manual cancel with explicit cause
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(fmt.Errorf("job not needed"))

// cancel by timeout
ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()

ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second))
defer cancel()
33

Отменяем операции

func SimpleCancelation() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    go func() {
        time.Sleep(5 * time.Second)
        cancel()
    }()

    if err := doSlowJob(ctx); err != nil {
        panic(err)
    }
}
func doSlowJob(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return context.Cause(ctx)
        default:
            // perform a portion of slow job
            time.Sleep(1 * time.Second)
        }
    }
}
34

Отменяем операции

func SimpleTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    if err := doSlowJob(ctx); err != nil {
        panic(err)
    }
}
func doSlowJob(ctx context.Context) error {
    for {
        select {
        case <-ctx.Done():
            return context.Cause(ctx)
        default:
            // perform a portion of slow job
            time.Sleep(1 * time.Second)
        }
    }
}
35

context в библиотеках Go

По соглашению Context всегда передается первым параметром в функции, обычно именуясь ctx.

database/sql.(*DB).QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error)
database/sql.(*DB).ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error)
net/http.NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error)
golang.org/x/sync/errgroup.WithContext(ctx context.Context) (*Group, context.Context)
...

Быстрый пример:

ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
defer cancel()

req, _ := http.NewRequestWithContext(ctx, "GET", "http://loremipsum.com", nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
  // возможно тут будет DeadlineExceeded
}
36

context и передача значений

type myKey struct{} // use private type to restrict access to this package

func WithUser(ctx context.Context, user string) context.Context {
    return context.WithValue(ctx, myKey{}, user)
}

// Export type-safe interface for users of this value
func ContextUser(ctx context.Context) (string, bool) {
    v := ctx.Value(myKey{})
    s, ok := v.(string)
    return s, ok
}
func main() {
    ctx := context.Background()

    user, ok := ContextUser(ctx)
    fmt.Println(ok, user)

    ctx = WithUser(ctx, "petya")
    user, ok = ContextUser(ctx)
    fmt.Println(ok, user)
}
37

Thank you

Максим Иванов

Use the left and right arrow keys or click the left and right edges of the page to navigate between slides.
(Press 'H' or navigate to hide this message.)