Go sync.Map源码解读
2023-05-03 10:47:57 # Go # 源码阅读

Go sync.Map源码解读

内置的map

Go 的内置的 map 是不支持并发写操作的,不是并发安全的。

看下面的代码运行会提示

fatal error: concurrent map writes。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import "time"

var globleMap map[string]interface{}

func MapEdit(key string) {
for i := 0; i < 10000; i++ {
globleMap[key] = i
}
}

func main() {
globleMap = make(map[string]interface{})
go MapEdit("1")
go MapEdit("2")
time.Sleep(time.Second * 30)
}

因此官方在go 1.9版本中给了 sync.Map来满足并发编程中的应用。

这里实际上是一个lock-free,关于lock-free和wait-free可以看看这里:wait-free是指什么?

sync.Map

sync.Map 的实现原理可概括为:

read和dirty
我们可以这么理解,现在将map里面的数据分成两部分保存,我用两个桶来比喻,

  • 一些是干净(read)的,放在白桶
  • 一些是脏(dirty)的,放在脏桶,我这也叫黑桶

另外我还要记录一下,这个脏的程度,怎么判断脏的程度呢,这里我用一个脏值(misses)来标识脏的程度,每次读数据时发现有一个数据(key)不在白桶而是在黑桶,我就将脏值数量(misses)+1,如果发现脏值(misses)居然比黑桶(read)里的数据总数量还多,那就得清洗一下黑桶(dirty)里面的数据了,将清洗完的脏(dirty)数据全部放到白桶(read)那里去,这时候黑桶所有的数据(dirty)被清空了,脏值(misses)也要重置为0

白桶里面还贴了一个字条(amended: 英文释义修正的,改正的),这个字条表示黑桶是不是刚刚被清洗收拾,因为黑桶不是一直被收拾的,每次当黑桶被收拾,脏值清空的时候,这个字条就写上“已收拾”(amended=false)。

写数据

  • 数据在白桶里

一个新的数据key要保存到map里面,首先先查一下这个数据(key)是不是已经在白桶里面,如果数据在白桶里面,直接更新这个数据就行了。

  • 数据在黑桶里

如果这个数据(key)不在白桶里(read),字条还写着“欠收拾”那数据就有可能在黑桶(dirty)里,如果在黑桶(dirty)里找到了,更新这个黑桶(dirty)里面的数据(key),同时更新下脏值(misses)+1,接下来还要判断下脏值的数据量不是已经比白桶(read)里面数据总量还大,如果是,那就得清洗下黑桶(dirty)里面的数据

  • 数据即不在白桶里,也不在黑桶里

数据既不在白桶,也不在黑桶,要分两种情况

白条写着“已收拾”,说明黑桶恰巧刚刚被收拾了,那么现在重新将白桶里面所有的数据又复制一份放到黑桶里面去,白桶里面字条改成“欠收拾”(amended=true),然后直接写入数据到黑桶
白桶里面写着“欠收拾”,说明黑桶里面有脏数据,但是又没到收拾他的时候,直接写入数据到黑桶

读数据

  • 数据在白桶

直接从白桶取数据

  • 数据不在白桶,白条写着“欠收拾”

说明黑桶有白桶不存在的数据,数据可能在黑桶,从黑桶取数据,如果key数据存在,脏值+1,判断是否需要清洗黑桶

  • 数据不在白桶,黑桶也找不到

数据不存在,返回nil

用法问题

看下面的示例,写入map1000个key,但是读取只读了900次key

这时候根本白桶的数据始终都会是空的,因为黑桶只有脏值到了1000才会将数据清洗到白桶。

也就是这种情况,可能效率还没有自己写的使用内置的map,加锁的效率高。

所以sync.map适合少量写,大量读的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"math/rand"
"testgorm/mymap"
"time"
)

func main() {
var m mymap.Map
//写入1000个key,这时黑桶数据数量是1000
for i := 0; i < 1000; i++ {
rand.Seed(time.Now().UnixNano())
data := rand.Intn(1000)
m.Store(i, data)
}
//读取900次key,这时候脏值为900 < 黑桶数据数量
//白桶的数据一直都是空的,因为黑桶一直没到清洗的条件
for i := 0; i < 900; i++ {
if _, ok := m.Load(i); ok {
}
}
}

源码阅读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package mymap

import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)

type Map struct {
//用来锁dirty
mu sync.Mutex
//白桶
read atomic.Value
//黑桶
dirty map[interface{}]*entry
//脏值
misses int
}

type readOnly struct {
//封装的map
m map[interface{}]*entry
//白条
amended bool
}

//expunged是一个指针指向一个空对象,也就是指向interface{}
var expunged = unsafe.Pointer(new(interface{}))

//entry就是一个指针
type entry struct {
p unsafe.Pointer
}

func newEntry(i interface{}) *entry {
return &entry{p: unsafe.Pointer(&i)}
}

//从sync.map读数据
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
//先看白桶里面有没有key
read, _ := m.read.Load().(readOnly)
//有则直接返回
e, ok := read.m[key]
//如果白桶里没有这个key,而且白条写着“欠收拾”,说明黑桶里面可能有key
if !ok && read.amended {
m.mu.Lock()
//上面白桶获取时没有加锁,上锁后再检查一次
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
//再检查一次也没有,则直接从黑桶里面读取数据
if !ok && read.amended {
e, ok = m.dirty[key]
//脏值+1, 如果脏值比黑桶数据数量还大,则收拾黑桶
m.missLocked()
}
m.mu.Unlock()
}
//如果白桶黑桶里都没有数据,返回nil
if !ok {
return nil, false
}
//返回entry指针对应的值
return e.load()
}

//获取entry指针对应的值
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*interface{})(p), true
}

//往sync.map写数据
func (m *Map) Store(key, value interface{}) {
//先看白桶里面有没有key
read, _ := m.read.Load().(readOnly)
//白桶存在key直接更新白桶里面的数据
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}

m.mu.Lock()
//上面白桶获取没有加锁,上锁后再写入一次白桶
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
//白桶存在key直接更新白桶里面的数据
m.dirty[key] = e
}
//将黑桶里面的key也指向value,相当于黑桶和白桶都指向同一份数据
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
//如果白桶里面没有这个key,但是黑桶里面有,直接更新黑桶里面的数据
e.storeLocked(&value)
} else {
//白桶和黑桶都没有这个key
if !read.amended {
//如果黑桶刚刚被收拾过,白条“已收拾”,则需要将白桶里的数据,复制一份到黑桶
m.dirtyLocked()
//设置白条为“欠收拾”
m.read.Store(readOnly{m: read.m, amended: true})
}
//将新数据丢到黑桶
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
}

//tryStore就是一个基于CAS的原子写操作
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
//如果原先key对应的值是一个interface{}空对象,返回失败
if p == expunged {
return false
}
//CAS原子写入entity
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}

//判断entry是否=expunged(interface{}),如果是将entry设置为nil
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

//原子写入entry
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

//参照Store
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}

m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
actual, loaded, _ = e.tryLoadOrStore(value)
m.missLocked()
} else {
if !read.amended {
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()

return actual, loaded
}

func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}

ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*interface{})(p), true, true
}
}
}

func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
//数据在白桶,直接删除白桶里面的数据
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
//数据不在白桶,黑桶“欠收拾”
if !ok && read.amended {
//直接黑桶清除数据
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}

func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return true
}
}
}

func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}

for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}

//脏值+1,
func (m *Map) missLocked() {
//脏值+1,
m.misses++
//如果脏值比黑桶数据数量还大,则收拾黑桶
if m.misses < len(m.dirty) {
return
}
m.read.Store(readOnly{m: m.dirty})
//将黑桶map清空
m.dirty = nil
//将脏值清零
m.misses = 0
}

func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}

//黑桶已经被清洗过了,这里要重新初始化黑桶
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
//将白桶里面的复制一份到黑桶
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}

//这里就是判断entry是不是指向空对象或者是nil
//如果entry=是nil,赋值为空对象,也返回true
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
//如果entry指向nil,给entry赋值空对象interface{},返回true
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}