Go语言网络爬虫缓冲池工具的实现

缓冲池的基本结构如下:
//数据缓冲池接口的实现类型
type myPool struct {
    //缓冲器的统一容量
    bufferCap uint32
    //缓冲器的最大数量
    maxBufferNumber uint32
    //缓冲器的实际数量
    bufferNumber uint32
    //池中数据的总数
    total uint64
    //存放缓冲器的通道
    bufCh chan Buffer
    //缓冲池的关闭状态:0-未关闭;1-已关闭
    closed uint32
    //保护内部共享资源的读写锁
    rwlock sync.RWMutex
}
前两个字段用于记录创建缓冲池时的参数,它们在缓冲池运行期间用到。bufferNumber 和 total 字段用于记录缓冲数据的实时状况。

注意,bufCh 字段的类型是 chan Buffer,一个元素类型为 Buffer 的通道类型。这与缓冲器中同样是通道类型的 ch 字段联合起来看,就是一个双层通道的设计。在放入或获取数据时,我会先从 bufCh 拿到一个缓冲器,再向该缓冲器放入数据或从该缓冲器获取数据,然后再把它发送回 bufCh。这样的设计有如下几点好处。

1) bufCh 中的每个缓冲器一次只会被一个 goroutine 中的程序(以下简称并发程序)拿到。并且,在放回 bufCh 之前,它对其他并发程序都是不可见的。一个缓冲器每次只会被并发程序放入或取走一个数据。即使同一个程序连续调用多次 Put 方法或 Get 方法,也会这样。缓冲器不至于一下被填满或取空。

2) 更进一步看,bufCh 是 FIFO 的。当把先前拿出的缓冲器归还给 bufCh 时,该缓冲器总会被放在队尾。也就是说,池中缓冲器的操作频率可以降到最低,这也有利于池中数据的均匀分布。

3) 在从 bufCh 拿到缓冲器后,我可以判断是否需要缩减缓冲器的数量。如果需要并且该缓冲器已空,就可以直接把它关掉,并且不还给 bufCh。另一方面,如果在放入数据时发现所有缓冲器都已满并且在一段时间内都没有空位,就可以新建一个缓冲器并放入 bufCh。总之,这让缓冲池自动伸缩功能的实现变得简单了。

4) 最后也最重要的是,bufCh 本身就提供了对并发安全的保障。

大家可能会想到,基于标准库的 container 包中的 List 或 Ring 类型也可以编写出并发安全的缓冲器队列。确实可以。不过,用它们来实现会让你不得不编写更多的代码,因为原本一些现成的操作和功能都需要我们自己去实现,尤其是在保证并发安全性方面。并且,这样的缓冲器队列的运行效率可不一定高。

注意,上述设计会导致缓冲池中的数据不是 FIFO 的。不过,对于网络爬虫框架以及调度器来说,这并不会造成问题。

再看最后一个字段 rwlock。之所以不叫它 closingLock,是因为它不仅仅为了消除缓冲器中的那个与关闭通道有关的竞态条件而存在。大家可以思考一下,怎样并发地向 bufCh 放入新的缓冲器,同时避免池中的缓冲器数量超过最大值。

NewPool 函数用于新建一个缓冲池。它会先检查参数的有效性,再创建并初始化一个 *myPool 类型的值并返回。在为它的 bufCh 字段赋值后,我们需要先向该值放入一个缓冲器。这算是对缓冲池的预热。关于该函数的具体实现,你可以直接查看示例项目中的对应代码。

对于 Pool 接口的 BufferCap> MaxBufferNumber> BufferNumber 和 Total 方法的实现,我也不多说了。myPool 类型中都有相对应的字段。不过需要注意的是,对 bufferNumber 和 total 字段的访问需要使用原子操作。

Put 方法有两个主要的功能。第一个功能显然是向缓冲池放入数据。第二个功能是,在发现所有的缓冲器都已满一段时间后,新建一个缓冲器并将其放入缓冲池。当然,如果当前缓冲池持有的缓冲器已达最大数量,就不能这么做了。

所以,这里我们首先需要建立一个发现和触发追加缓冲器操作的机制。我规定当对池中所有缓冲器的操作的失败次数都达到 5 次时,就追加一个缓冲器入池。其实这方面的控制可以做得很细,也可以新增参数并把问题抛给使用方。不过这里先用这个简易版本。如果你觉得这确实有必要,可以自己编写一个改进的版本。

以下是我编写的 Put 方法的实现:
func (pool *myPool) Put(datum interface{}) (err error) {
    if pool.Closed() {
        return ErrClosedBufferPool
    }
    var count uint32
    maxCount := pool.BufferNumber() * 5
    var ok bool
    for buf := range pool.bufCh {
        ok, err = pool.putData(buf, datum, &count, maxCount)
        if ok || err != nil {
            break
        }
    }
    return
}
实际上,放入操作的核心逻辑在 myPool 类型的 putData 方法中。Put 方法本身做的主要是不断地取岀池中的缓冲器,并持有一个统一的“已满”计数。请注意 count 和 maxCount 变量的初始值,并体会它们的关系。

下面来看 putData 方法,其声明如下:
//用于向给定的缓冲器放入数据,并在必要时把缓冲器归还给池
func (pool *myPool) putData(
    buf Buffer, datum interface{}, count *uint32, maxCount uint32) (ok bool, err error) {
    //省略部分代码
}
由于这个方法比较长,所以会分段讲解。第一段,putData 为了及时响应缓冲池的关闭,需要在一开始就检查缓冲池的状态。并且在方法执行结束前还要检查一次,以便及时释放资源。代码如下所示:
if pool.Closed() {
    return false, ErrClosedBufferPool
}
defer func() {
    pool.rwlock.RLock()
    if pool.Closed() {
        atomic.AddUint32(&pool.bufferNumber, ^uint32(O))
        err = ErrClosedBufferPool
    } else {
        pool.bufCh <- buf
    }
    pool.rwlock.RUnlock()
}()
在 defer 语句中,我用到了 rwlock 的读锁,因为这其中包含了向 bufCh 发送值的操作。如果在方法即将执行结束时,发现缓冲池已关闭,那么就不会归还拿到的缓冲器,同时把对应的错误值赋给结果变量 err。注意,不归还缓冲器时,一定要递减 bufferNumber 字段的值。

第二段,执行向拿到的缓冲器放入数据的操作,并在必要时增加“已满”计数:
ok, err = buf.Put(datum)
if ok {
    atomic.AddUint64(&pool.total, 1)
    return
}
if err != nil {
    return
}
//若因缓冲器已满而未放入数据,就递增计数
(*count)++
请注意那两条 return 语句以及最后的 (*count)++。在试图向缓冲器放入数据后,我们需要立即判断操作结果。如果 ok 的值是 true,就说明放入成功,此时就可以在递增 total 字段的值后直接返回。

如果 err 的值不为 nil,就是说缓冲器已关闭,这时就不需要再执行后面的语句了。除了这两种情况,我们就需要递增 count 的值。因为这时说明缓冲器已满。如果你忘了 myBuffer 的 Put 方式是怎样实现的,可以现在回顾一下。

这里的 count 值递增操作与第三段代码息息相关,这涉及对追加缓冲器的操作的触发。下面是第三段代码:
//如果尝试向缓冲器放入数据的失败次数达到阈值,
//并且池中缓冲器的数量未达到最大值,
//那么就尝试创建一个新的缓冲器,先放入数据再把它放入池
if *count >= maxCount && pool.BufferNumber() < pool.MaxBufferNumber() {
    pool.rwlock.Lock()
    if pool.BufferNumber() < pool.MaxBufferNumber() {
        if pool.Closed() {
            pool.rwlock.Uniock()
            return
        }
        newBuf, _ := NewBuffer(pool.bufferCap)
        newBuf.Put(datum)
        pool.bufCh <- newBuf
        atomic.AddUint32(&pool.bufferNumber, 1)
        atomic.AddUint64(&pool.total, 1)
        ok = true
    }
    pool.rwlock.Uniock()
    *count = 0
}
return
在这段代码中,我用到了双检锁。如果第一次条件判断通过,就会立即再做一次条件判断。不过这之前,我会先锁定 rwlock 的写锁。这有两个作用:
  • 第一:防止向已关闭的缓冲池追加缓冲器。
  • 第二:防止缓冲器的数量超过最大值。在确保这两种情况不会发生后,把一个已放入那个数据的缓冲器追加到缓冲池中。

同样,及时更新计数也很重要。一旦第一次条件判断通过,即使最后没有追加缓冲器也应该清零 count 的值。及时清零“已满”计数可以有效减少不必要的操作和资源消耗。另外,一旦追加缓冲器成功,就一定要递增 bufferNumber 和 total 的值。

Get 方法的总体流程与 Put 方法基本一致:
func (pool *myPool) Get() (datum interfaced, err error) {
    if pool.Closed() {
        return nil, ErrClosedBufferPool
    }
    var count uint32
    maxCount := pool.BufferNumber() * 10
    for buf := range pool.bufCh {
        datum, err = pool.getData(buf, &count, maxCount)
        if datum != nil || ^rr != nil {
            break
        }
    }
    return
}
把“已空”计数的上限 maxCount 设为缓冲器数量的 10 倍。也就是说,若在遍历所有缓冲器 10 次之后仍无法获取到数据,Get 方法就会从缓冲池中去掉一个空的缓冲器。getData 方法的声明如下:
//用于从给定的缓冲器获取数据,并在必要时把缓冲器归还给池
func (pool *myPool) getData(
    buf Buffer, count *uint32, maxCount uint32) (datum interface!}, err error) {
    //省略部分代码
}
getData 方法的实现稍微简单一些,可分为两段。第一段代码的关键仍然是状态检查和资源释放:
if pool.Closed() {
    return rdl, ErrClosedBufferPool
}
defer func() {
    //如果尝试从缓冲器获取数据的失败次数达到阈值,
    //同时当前缓冲器已空且池中缓冲器的数量大于1,
    //那么就直接关掉当前缓冲器,并不归还给池
    if *count >= maxCount &&
        buf.Len() == 0 &&
        pool.BufferNumber() > 1 {
        buf.Close()
        atomic.AddUint32(&pool.bufferNumber, ^uint32(0))
        *count = 0
        return
    }
    pool.rwlock.RLock()
    if pool.Closed() {
        atomic.AddUint32(Spool.bufferNumber, ^uint32(0))
        err = ErrClosedBufferPool
    } else {
        pool.bufCh <- buf
    }
    pool.rwlock.RUnlock()
}()
defer 语句中第一条 if 语句的作用是,当不归还当前缓冲器的所有条件都已满足时,我们就在关掉当前缓冲器和更新计数后直接返回。只有条件不满足时,才在确认缓冲池未关闭之后再把它归还给缓冲池。注意,这时候需要锁定 rwlock 的读锁,以避免向已关闭的 bufCh 发送值。

第二段代码的作用是试图从当前缓冲器获取数据。在成功取出数据时,必须递减 total 字段的值。同时,如果取出失败且没有发现错误,就会递增“已空”计数。相关代码如下:
datum, err = buf.Get()
if datum != nil {
    atomic.AddUint64(&pool.total, ^uint64(0))
    return
}
if err != nil {
    return
}
//若因缓冲器已空未取出数据,就递增计数
(*count)++
return
putData 和 getData 方法中对 rwlock 的读锁或写锁的锁定就是为了预防关闭 bufCh 可能引发的运行时恐慌。显然,这些操作能够起作用的前提是 Close 方法对 rwlock 的合理使用,该方法的代码如下:
func (pool *myPool) Close() bool {
    if !atomic.CompareAndSwapUint32(&pool.closed, 0, 1) {
        return false
    }
    pool.rwlock.Lock()
    defer pool.rwlock.Unlock()
    close(pool.bufCh)
    for buf := range pool.bufCh {
        buf.Close()
    }
    return true
}
以上就是对缓冲池实现主要部分的展示和说明。

编程帮,一个分享编程知识的公众号。跟着站长一起学习,每天都有进步。

通俗易懂,深入浅出,一篇文章只讲一个知识点。

文章不深奥,不需要钻研,在公交、在地铁、在厕所都可以阅读,随时随地涨姿势。

文章不涉及代码,不烧脑细胞,人人都可以学习。

当你决定关注「编程帮」,你已然超越了90%的程序员!

编程帮二维码
微信扫描二维码关注