nsq源码-diskqueue
2023-05-03 10:47:45 # Go # 源码阅读

nsq源码-diskqueue

有兴趣可以看看这篇文章
https://www.cnblogs.com/zhangboyu/p/7457070.html

一、队列存储

队列的特征是先入先出,也就是写入是从后面写入,读取是从前面读取
我们平时写的队列一般是放到内存里面,比如一个大的动态数组
这里如果队列中的数据很大,diskqueue则是将这个动态数组拆成了好多个文件来存储队列中的数据

如果队列是放在内存数组中,那么队列只需要记录两个属性,一个头的位置,一个是尾的位置,
队列大小depth = 头位置 - 尾位置

但是由于diskqueue是将数组保存在多个文件中
所以diskqueue就会有五个属性: 头所在的文件,头在文件中的位置,尾所在的文件,尾在文件中的位置,还有就是depth标识头和尾中间的数据数量
这五个数据作为diskqueue的元数据单独保存在一个文件里面。
所以New一个diskqueue的时候先要这几个元数据读取出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
d := diskQueue{
//名称
name: name,
//文件保存路径
dataPath: dataPath,
//每个文件大小最大值,超过要重新开启一个文件
maxBytesPerFile: maxBytesPerFile,
//写入消息最小大小
minMsgSize: minMsgSize,
//写入消息最大大小
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
}

// 读取队列数据
err := d.retrieveMetaData()
if err != nil && !os.IsNotExist(err) {
d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
}

go d.ioLoop()
return &d
}

// 读取队列数据
func (d *diskQueue) retrieveMetaData() error {
var f *os.File
var err error

fileName := d.metaDataFileName()
f, err = os.OpenFile(fileName, os.O_RDONLY, 0600)
if err != nil {
return err
}
defer f.Close()

//队列写入和读取位置中间有多少条数据,也就是队列的大小
var depth int64
//读取队列核心数据
//当前读取文件是哪个,读取位置是哪里
//当前写入的文件是哪个,写入文件位置
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
if err != nil {
return err
}
atomic.StoreInt64(&d.depth, depth)

//下一个读取文件
d.nextReadFileNum = d.readFileNum
//下一个读取位置
d.nextReadPos = d.readPos

return nil
}

二、写入队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (d *diskQueue) writeOne(data []byte) error {
var err error

// 当前写入文件是否打开,没有则打开当前写入文件
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
return err
}

d.logf(INFO, "DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName)

//如果当前写入位置大于0,则将文件位置移动到写入位置点
if d.writePos > 0 {
_, err = d.writeFile.Seek(d.writePos, 0)
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}
}

dataLen := int32(len(data))

//判断消息大小是否合法
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize)
}

//将缓冲区清空
d.writeBuf.Reset()
//将消息大小写入缓冲区
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
return err
}

//将消息写入缓冲区
_, err = d.writeBuf.Write(data)
if err != nil {
return err
}

// only write to the file once
//将缓冲区关联到文件
_, err = d.writeFile.Write(d.writeBuf.Bytes())
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}

//计算总大小
totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
//队列消息数量+1
atomic.AddInt64(&d.depth, 1)

//如果写入位置大于了文件最大大小
if d.writePos >= d.maxBytesPerFile {
//将当前写入文件+1
d.writeFileNum++
//当前写入位置重置为0
d.writePos = 0

// sync every time we start writing to a new file
//将缓存数据写入到磁盘
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}

if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}

return err
}

读取队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
func (d *diskQueue) readOne() ([]byte, error) {
var err error
var msgSize int32

// 当前读取文件是否打开,没有则打开当前文件
if d.readFile == nil {
// 打开读取的文件(也就是当前队列头所在的文件),如果文件大小到达上线maxBytesPerFile,readFileNum加一
curFileName := d.fileName(d.readFileNum)
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}

d.logf(INFO, "DISKQUEUE(%s): readOne() opened %s", d.name, curFileName)

//当前队列头在当前读取文件的位置
if d.readPos > 0 {
_, err = d.readFile.Seek(d.readPos, 0)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}
}

d.reader = bufio.NewReader(d.readFile)
}

// 先读取出消息的大小
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}

//判断消息大小是否合法
if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
// this file is corrupt and we have no reasonable guarantee on
// where a new message should begin
d.readFile.Close()
d.readFile = nil
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
}

//读取消息
readBuf := make([]byte, msgSize)
_, err = io.ReadFull(d.reader, readBuf)
if err != nil {
d.readFile.Close()
d.readFile = nil
return nil, err
}

totalBytes := int64(4 + msgSize)

//将下一个要读取的位置往后移
d.nextReadPos = d.readPos + totalBytes
d.nextReadFileNum = d.readFileNum

//判断下一个读取的位置是不是超过了文件大小
if d.nextReadPos > d.maxBytesPerFile {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}
//如果超过了,则当前队列头文件要往后移,且读取位置设置为0
d.nextReadFileNum++
d.nextReadPos = 0
}

return readBuf, nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 刷新缓存到磁盘
func (d *diskQueue) sync() error {
if d.writeFile != nil {
// 将缓冲区的数据从内存中拷贝刷新到硬盘中保存
err := d.writeFile.Sync()
if err != nil {
d.writeFile.Close()
d.writeFile = nil
return err
}
}

//保存元数据
err := d.persistMetaData()
if err != nil {
return err
}

d.needSync = false
return nil
}

五、ioLoop循环

这个函数是一个“守护”协程,
暴露的d.writeChan和d.readChan
如果外部有网writeChan里写数据在这里处理
同时,这里的消息也会通过d.readChan将消息不断的从队列中往外推

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
func (d *diskQueue) ioLoop() {
var dataRead []byte
var err error
var count int64
var r chan []byte

//开启一个timer,syncTimeout在系统配置里,默认是2秒一次
syncTicker := time.NewTicker(d.syncTimeout)

for {
// SyncEvery是系统配置,默认值是2500
// 也就是如果现在这段代码中的count的值如果到了2500,则将缓存中的数据保存到磁盘
if count == d.syncEvery {
d.needSync = true
}

// needSync这个字段如果为true,则将缓存中的数据保存到磁盘
if d.needSync {
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}
//同步缓存到磁盘成功,将count重置为0
count = 0
}

//如果队列头和尾中间还有数据,则从头部读取数据
if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) {
if d.nextReadPos == d.readPos {
dataRead, err = d.readOne()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) reading at %d of %s - %s",
d.name, d.readPos, d.fileName(d.readFileNum), err)
d.handleReadError()
continue
}
}
//这里读取的数据放到readChan这个通道里
r = d.readChan
} else {
r = nil
}

select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
//看上面的英文注释,如果r为空,则这个分支会被跳过,这是golang的一个特性
//将读取的消息,丢到d.readChan里面,r.readChan向外部暴露
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
//moveForward()会删除已经没用的file,这个文件中的数据已经全部被读取了,不在队列头和尾之间了
d.moveForward()
case <-d.emptyChan:
d.emptyResponseChan <- d.deleteAllFiles()
count = 0
case dataWrite := <-d.writeChan:
//如果d.writeChan有写入数据,则将消息数据写入到队列
count++
d.writeResponseChan <- d.writeOne(dataWrite)
case <-syncTicker.C:
//这里相当于两秒钟同步一次缓存到磁盘
if count == 0 {
// avoid sync when there's no activity
continue
}
d.needSync = true
case <-d.exitChan:
goto exit
}
}

exit:
d.logf(INFO, "DISKQUEUE(%s): closing ... ioLoop", d.name)
syncTicker.Stop()
d.exitSyncChan <- 1
}