当前位置:首页 >> 脚本专栏

Go语言并发模型的2种编程方案

概述

我一直在找一种好的方法来解释 go 语言的并发模型:

不要通过共享内存来通信,相反,应该通过通信来共享内存

但是没有发现一个好的解释来满足我下面的需求:

1.通过一个例子来说明最初的问题
2.提供一个共享内存的解决方案
3.提供一个通过通信的解决方案

这篇文章我就从这三个方面来做出解释。

读过这篇文章后你应该会了解通过通信来共享内存的模型,以及它和通过共享内存来通信的区别,你还将看到如何分别通过这两种模型来解决访问和修改共享资源的问题。

前提

设想一下我们要访问一个银行账号:
复制代码 代码如下:
type Account interface {
  Withdraw(uint)
  Deposit(uint)
  Balance() int
}

type Bank struct {
  account Account
}

func NewBank(account Account) *Bank {
  return &Bank{account: account}
}

func (bank *Bank) Withdraw(amount uint, actor_name string) {
  fmt.Println("[-]", amount, actor_name)
  bank.account.Withdraw(amount)
}

func (bank *Bank) Deposit(amount uint, actor_name string) {
  fmt.Println("[+]", amount, actor_name)
  bank.account.Deposit(amount)
}

func (bank *Bank) Balance() int {
  return bank.account.Balance()
}

因为 Account 是一个接口,所以我们提供一个简单的实现:

复制代码 代码如下:
type SimpleAccount struct{
  balance int
}

func NewSimpleAccount(balance int) *SimpleAccount {
  return &SimpleAccount{balance: balance}
}

func (acc *SimpleAccount) Deposit(amount uint) {
  acc.setBalance(acc.balance + int(amount))
}

func (acc *SimpleAccount) Withdraw(amount uint) {
  if acc.balance >= int(mount) {
    acc.setBalance(acc.balance - int(amount))
  } else {
    panic("杰克穷死")
  }
}

func (acc *SimpleAccount) Balance() int {
  return acc.balance
}

func (acc *SimpleAccount) setBalance(balance int) {
  acc.add_some_latency()  //增加一个延时函数,方便演示
  acc.balance = balance
}

func (acc *SimpleAccount) add_some_latency() {
  <-time.After(time.Duration(rand.Intn(100)) * time.Millisecond)
}

你可能注意到了 balance 没有被直接修改,而是被放到了 setBalance 方法里进行修改。这样设计是为了更好的描述问题。稍后我会做出解释。

把上面所有部分弄好以后我们就可以像下面这样使用它啦:
复制代码 代码如下:
func main() {
  balance := 80
  b := NewBank(bank.NewSimpleAccount(balance))
 
  fmt.Println("初始化余额", b.Balance())
 
  b.Withdraw(30, "马伊琍")
 
  fmt.Println("-----------------")
  fmt.Println("剩余余额", b.Balance())
}

运行上面的代码会输出:

复制代码 代码如下:
初始化余额 80
[-] 30 马伊琍
-----------------
剩余余额 50

没错!

不错在现实生活中,一个银行账号可以有很多个附属卡,不同的附属卡都可以对同一个账号进行存取钱,所以我们来修改一下代码:
复制代码 代码如下:
func main() {
  balance := 80
  b := NewBank(bank.NewSimpleAccount(balance))
 
  fmt.Println("初始化余额", b.Balance())
 
  done := make(chan bool)
 
  go func() { b.Withdraw(30, "马伊琍"); done <- true }()
  go func() { b.Withdraw(10, "姚笛"); done <- true }()
 
  //等待 goroutine 执行完成
  <-done
  <-done
 
  fmt.Println("-----------------")
  fmt.Println("剩余余额", b.Balance())
}

这儿两个附属卡并发的从账号里取钱,来看看输出结果:
复制代码 代码如下:
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
-----------------
剩余余额 70

这下把文章高兴坏了:)

结果当然是错误的,剩余余额应该是40而不是70,那么让我们看看到底哪儿出问题了。

问题

当并发访问共享资源时,无效状态有很大可能会发生。

在我们的例子中,当两个附属卡同一时刻从同一个账号取钱后,我们最后得到银行账号(即共享资源)错误的剩余余额(即无效状态)。

我们来看一下执行时候的情况:

复制代码 代码如下:
     处理情况
             --------------
             _马伊琍_|_姚笛_
 1. 获取余额     80  |  80
 2. 取钱       -30  | -10
 3. 当前剩余     50  |  70
                ... | ...
 4. 设置余额     50  "codetitle">复制代码 代码如下:
type LockingAccount struct {
  lock    sync.Mutex
  account *SimpleAccount
}

//封装一下 SimpleAccount
func NewLockingAccount(balance int) *LockingAccount {
  return &LockingAccount{account: NewSimpleAccount(balance)}
}

func (acc *LockingAccount) Deposit(amount uint) {
  acc.lock.Lock()
  defer acc.lock.Unlock()
  acc.account.Deposit(amount)
}

func (acc *LockingAccount) Withdraw(amount uint) {
  acc.lock.Lock()
  defer acc.lock.Unlock()
  acc.account.Withdraw(amount)
}

func (acc *LockingAccount) Balance() int {
  acc.lock.Lock()
  defer acc.lock.Unlock()
  return acc.account.Balance()
}

直接明了!注意 lock sync.Lock,lock.Lock(),lock.Unlock()。

这样每次一个附属卡访问银行账号(即共享资源),这个附属卡会自动获得锁直到最后操作完毕。

我们的 LockingAccount 像下面这样使用:
复制代码 代码如下:
func main() {
  balance := 80
  b := NewBank(bank.NewLockingAccount(balance))
 
  fmt.Println("初始化余额", b.Balance())
 
  done := make(chan bool)
 
  go func() { b.Withdraw(30, "马伊琍"); done <- true }()
  go func() { b.Withdraw(10, "姚笛"); done <- true }()
 
  //等待 goroutine 执行完成
  <-done
  <-done
 
  fmt.Println("-----------------")
  fmt.Println("剩余余额", b.Balance())
}

输出的结果是:
复制代码 代码如下:
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
-----------------
剩余余额 40

现在结果正确了!

在这个例子中第一个处理程序加锁后独享共享资源,其它处理程序只能等待它执行完成。

我们接着看一下执行时的情况,假设马伊琍先拿到了锁:
复制代码 代码如下:
处理过程
                        ________________
                        _马伊琍_|__姚笛__
        加锁                   ><
        得到余额            80  |
        取钱               -30  |
        当前余额            50  |
                           ... |
        设置余额            50  |
        解除锁                 <>
                               |
        当前余额                50
                               |
        加锁                   ><
        得到余额                |  50
        取钱                    | -10
        当前余额                |  40
                               |  ...
        设置余额                |  40
        解除锁                  <>
                        ________________
        剩余余额                40

现在我们的处理程序在访问共享资源时相继的产生了正确的结果。

通过通信的解决方案

又叫 “通过通信来共享内存”。

现在账号被命名为 ConcurrentAccount,像下面这样来实现:
复制代码 代码如下:
type ConcurrentAccount struct {
  account     *SimpleAccount
  deposits    chan uint
  withdrawals chan uint
  balances    chan chan int
}

func NewConcurrentAccount(amount int) *ConcurrentAccount{
  acc := &ConcurrentAccount{
    account :    &SimpleAccount{balance: amount},
    deposits:    make(chan uint),
    withdrawals: make(chan uint),
    balances:    make(chan chan int),
  }
  acc.listen()
 
  return acc
}

func (acc *ConcurrentAccount) Balance() int {
  ch := make(chan int)
  acc.balances <- ch
  return <-ch
}

func (acc *ConcurrentAccount) Deposit(amount uint) {
  acc.deposits <- amount
}

func (acc *ConcurrentAccount) Withdraw(amount uint) {
  acc.withdrawals <- amount
}

func (acc *ConcurrentAccount) listen() {
  go func() {
    for {
      select {
      case amnt := <-acc.deposits:
        acc.account.Deposit(amnt)
      case amnt := <-acc.withdrawals:
        acc.account.Withdraw(amnt)
      case ch := <-acc.balances:
        ch <- acc.account.Balance()
      }
    }
  }()
}

ConcurrentAccount 同样封装了 SimpleAccount ,然后增加了通信通道

调用代码和加锁版本的一样,这里就不写了,唯一不一样的就是初始化银行账号的时候:
复制代码 代码如下:
b := NewBank(bank.NewConcurrentAccount(balance))

运行产生的结果和加锁版本一样:

复制代码 代码如下:
初始化余额 80
[-] 30 马伊琍
[-] 10 姚笛
-----------------
剩余余额 40

让我们来深入了解一下细节。

通过通信来共享内存是如何工作的

一些基本注意点:

共享资源被封装在一个控制流程中。

结果就是资源成为了非共享状态。没有处理程序能够直接访问或者修改资源。你可以看到访问和修改资源的方法实际上并没有执行任何改变。
复制代码 代码如下:
func (acc *ConcurrentAccount) Balance() int {
    ch := make(chan int)
    acc.balances <- ch
    balance := <-ch
    return balance
  }
  func (acc *ConcurrentAccount) Deposit(amount uint) {
    acc.deposits <- amount
  }

  func (acc *ConcurrentAccount) Withdraw(amount uint) {
    acc.withdrawals <- amount
  }

访问和修改是通过消息和控制流程通信。

在控制流程中任何访问和修改的动作都是相继发生的。

当控制流程接收到访问或者修改的请求后会立即执行相关动作。让我们仔细看看这个流程:

复制代码 代码如下:
func (acc *ConcurrentAccount) listen() {
    // 执行控制流程
    go func() {
      for {
        select {
        case amnt := <-acc.deposits:
          acc.account.Deposit(amnt)
        case amnt := <-acc.withdrawals:
          acc.account.Withdraw(amnt)
        case ch := <-acc.balances:
          ch <- acc.account.Balance()
        }
      }
    }()
  }

select  不断地从各个通道中取出消息,每个通道都跟它们所要执行的操作相一致。

重要的一点是:在 select 声明内部的一切都是相继执行的(在同一个处理程序中排队执行)。一次只有一个事件(在通道中接受或者发送)发生,这样就保证了同步访问共享资源。

领会这个有一点绕。

让我们用例子来看看 Balance() 的执行情况:
复制代码 代码如下:
 一张附属卡的流程      |   控制流程
      ----------------------------------------------

 1.     b.Balance()         |
 2.             ch -> [acc.balances]-> ch
 3.             <-ch        |  balance = acc.account.Balance()
 4.     return  balance <-[ch]<- balance
 5                          |

这两个流程都干了点什么呢?

附属卡的流程

1.调用 b.Balance()
2.新建通道 ch,将 ch 通道塞入通道 acc.balances 中与控制流程通信,这样控制流程也可以通过 ch 来返回余额
3.等待 <-ch 来取得要接受的余额
4.接受余额
5.继续

控制流程

1.空闲或者处理
2.通过 acc.balances 通道里面的 ch 通道来接受余额请求
3.取得真正的余额值
4.将余额值发送到 ch 通道
5.准备处理下一个请求

控制流程每次只处理一个 事件。这也就是为什么除了描述出来的这些以外,第2-4步没有别的操作执行。

总结

这篇博客描述了问题以及问题的解决办法,但那时没有深入去探究不同解决办法的优缺点。

其实这篇文章的例子更适合用 mutex,因为这样代码更加清晰。

最后,请毫无顾忌的指出我的错误!