# 多线程读取文件

血崩!! 今天写完一个多线程读取文件，发现多线程反而比但线程慢多了；最后还是改成了多进程版本。多线程向多进程的转换也非常的方便，

> ### 为什么Python多线程反而更慢了？
>
> 原因就在于 GIL ，在 Cpython 解释器（Python语言的主流解释器）中，有一把全局解释锁（Global Interpreter Lock），在解释器解释执行 Python 代码时，先要得到这把锁，意味着，任何时候只可能有一个线程在执行代码，其它线程要想获得 CPU 执行代码指令，就必须先获得这把锁，如果锁被其它线程占用了，那么该线程就只能等待，直到占有该锁的线程释放锁才有执行代码指令的可能。
>
> ​ 因此，这也就是为什么两个线程一起执行反而更加慢的原因，因为同一时刻，只有一个线程在运行，其它线程只能等待，即使是多核CPU，也没办法让多个线程「并行」地同时执行代码，只能是交替执行，因为多线程涉及到上下文切换、锁机制处理（获取锁，释放锁等），所以，多线程执行不快反慢。

## 1.文件分块类

* 定义一个分块类
* 根据线程数对文件进行分块
* 获取每个线程需要读取的坐标

定义类的时候`class PartitionFile(object)`这种方式表示`PartitionFile`继承自`Object`类

```python
## 定义分块类
class PartitionFile(object):
    def __init__(self, fileName, threadNum):
        self.fileName = fileName
        self.blockNum = threadNum
```

定义对应的文件分块方法`partion`，最后返回每个区块文件的指针数组

```bash
   def partion(self):
        fd = open(self.fileName, 'r')
        fd.seek(0, 2)  # 移动文件指针到文件尾,用于获取文件大小
        fileSize = fd.tell()  # 获取文件字符数
        Pos_list = []  # 指针坐标，数组
        blockSize = int(fileSize/self.blockNum)
        start_Pos = 0  # 文件初始指针
        for i in range(self.blockNum):
            if i == self.blockNum-1:
                end_Pos = fileSize-1  # 最后一个文件区块为文件结尾
                Pos_list.append((start_Pos, end_Pos))
                break
            end_Pos = start_Pos+blockSize-1  # 均匀分配每个区块
            # if end_Pos >= fileSize:
            #   end_Pos=fileSize-1
            # if start_Pos >= fileSize:
            #   break
            Pos_list.append((start_Pos, end_Pos))
            start_Pos = end_Pos+1  # 下一个区块，开始坐标
        fd.close()
        return Pos_list
```

## 2.读取文件线程类

类初始化需要传递6个参数

* 线程编号
* 线程所属队列
* 文件名
* 文件区块其实指针
* 文件区块结束指针
* 自定义处理函数

读取文件线程也可以称作**生产者**，继承自Thread类，在初始化是调用，初始化一个线程

```python
class readThread(Thread):  # 这个括号表示继承threading.Thread类
    def __init__(self, thread_name, thread_queue, fileName, start_Pos, end_Pos, processFunction):
        super(readThread, self).__init__()  # 初始化一个线程
        self.name = thread_name
        self.queue = thread_queue
        self.start_Pos = start_Pos
        self.end_Pos = end_Pos
        self.fileName = fileName
        self.processFunction = processFunction
```

类中读取数据的函数`reader`

* 根据文件区块指针，进行一行一行读取
* 将读取的数据交给`processFunction`自定义函数处理，过滤掉一些行或者转化一下格式
* 将过滤后的结果存进队列中

`self.queue.put(tmp, block=True)`主要是当队列中数据已经满了，还来不及取出时，阻塞当前线程，等待队列闲置空间

```python
    def reader(self):
        fd = open(self.fileName, 'r')
        if self.start_Pos != 0:
            fd.seek(self.start_Pos-1)
            if fd.read(1) != '\n':  # 当前初始位置不是行首,移动到下一行行首
                fd.readline()
                self.start_Pos = fd.tell()
        fd.seek(self.start_Pos)  # 将文件指针定位到区块的行首
        while self.start_Pos <= self.end_Pos:  # 开始按行读取文件并且进行操作
            line = fd.readline()
            tmp = self.processFunction(line)
            if tmp:  # 判断数据是否需要放进队列
                self.queue.put(tmp, block=True)  # 阻塞队列等待有位置时就插入
            # else:
            #     self.start_Pos = fd.tell()
            #     continue
            self.start_Pos = fd.tell()  # 读完一行后，自动调整开始位置
        fd.close()
        return
```

启动线程函数

当线程启动是调用读取函数进行文件读取

```python
    def run(self):
        print("线程" + self.name+": 开始读取文件...")
        self.reader()
        print("线程" + self.name+": 读取完成...")
        return
```

## 3.获取队列数据线程类

也被称为**消费者**线程，

初始化

* 包含线程名
* 所属队列
* 以及初始化一个用于存放队列数据的数组

```python
class getThread(threading.Thread):
    def __init__(self, name, queue):
        super(getThread, self).__init__()
        self.name = name
        self.queue = queue
        self.out = []
```

获取队列数据`run`方法

* 当线程启动时，就开始向队列取数据
* 如果在向队列取数据时，等待时间过长，之间关闭当前进程

`Queue.get`函数默认有一个`block=True`，的参数；当线程取不到数据时，就一直会进入等待

```python
    def run(self):
        while True:
            try:
                self.out.append(self.queue.get(timeout=2))  # 等待2s，读不到数据直接退出
            except:
                break
```

## 4.函数封装

为了实现代码的可重复利用，可以将这几个模块一起封装成一个函数

* `defaultProcessFunction`默认自定义函数，不会对文件中行进行处理
* 定义生成者线程池
* 定义消费者生成池
* 等待所有线程池结束后，调用消费者线程的`getData`方法，获取所有数据

```python
import time
import threading
from queue import Queue
import re

def defaultProcessFunction(line):  # 对行数据不做处理的默认函数
    return line

def readFileByThread(fileName, ThreadNum, processFunction=defaultProcessFunction):
    out = []
    # 设置队列
    workQueue = Queue()
    # 线程池
    readThreads = []
    getThreads = []
    pos_list = PartitionFile(fileName, ThreadNum).partion()
    for i in range(len(pos_list)):
        postion = pos_list[i]
        mythread = readThread(str(i), workQueue, fileName,
                              postion[0], postion[1], processFunction)  # 初始化线程,设置预处理函数
        mythread.start()  # 启动线程
        getdataThread = getThread(str(i), workQueue)
        getdataThread.start()
        readThreads.append(mythread)  # 添加到线程池
        getThreads.append(getdataThread)  # 添加到线程池
    for i in readThreads:
        i.join()  # 等待所有线程完成
    for i in getThreads:
        i.join()  # 等待所有线程完成
    for i in getThreads:
        out += i.getData()
    return out
```

## 5.性能测试

由于测试文件只有20万行，进程间的调度也会消耗时间，所有有可能出现多进程比单进程慢一丢丢的情况

```python
if __name__ == "__main__":
    start_time = time.clock()
    readFileByThread(sys.argv[1], int(sys.argv[2]))
    end_time = time.clock()
    print('Cost Time is {:.2f}'.format(end_time-start_time))
```

* 多线程模式下

```bash
##单个线程情况
线程0: 开始读取文件...
线程0: 读取完成...
Cost Time is 2.61
## 4个线程情况
线程0: 开始读取文件...
线程1: 开始读取文件...
线程2: 开始读取文件...
线程3: 开始读取文件...
线程2: 读取完成...
线程1: 读取完成...
线程0: 读取完成...
线程3: 读取完成...
Gbar_A01
Cost Time is 17.77
```

## 6.源代码

<https://github.com/BiocottonHub/zpliuCode/blob/master/Hi-c/readFileByThread.py>

## 7.参考

1. [Queue队列](https://blog.csdn.net/GeekLeee/article/details/77883252)
2. [多线程爬取数据](https://blog.csdn.net/aa57255621/article/details/88965975)
3. [super父类](https://www.runoob.com/python/python-func-super.html)
4. [文件指针](https://www.runoob.com/python/file-seek.html)
5. [文件分块](https://blog.csdn.net/onlyforr/article/details/52094581?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase\&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase)
6. [python继承类](https://fishc.com.cn/thread-115047-1-1.html)
7. [Queue](https://www.cnblogs.com/xiangsikai/p/8185031.html)
8. [Queue取数据](https://www.cnblogs.com/wt11/p/5952500.html)
9. [子进程不结束](https://www.zhihu.com/question/63265466)
