# 多线程读取文件

血崩!! 今天写完一个多线程读取文件，发现多线程反而比但线程慢多了；最后还是改成了多进程版本。多线程向多进程的转换也非常的方便，

> ### 为什么Python多线程反而更慢了？
>
> 原因就在于 GIL ，在 Cpython 解释器（Python语言的主流解释器）中，有一把全局解释锁（Global Interpreter Lock），在解释器解释执行 Python 代码时，先要得到这把锁，意味着，任何时候只可能有一个线程在执行代码，其它线程要想获得 CPU 执行代码指令，就必须先获得这把锁，如果锁被其它线程占用了，那么该线程就只能等待，直到占有该锁的线程释放锁才有执行代码指令的可能。
>
> ​ 因此，这也就是为什么两个线程一起执行反而更加慢的原因，因为同一时刻，只有一个线程在运行，其它线程只能等待，即使是多核CPU，也没办法让多个线程「并行」地同时执行代码，只能是交替执行，因为多线程涉及到上下文切换、锁机制处理（获取锁，释放锁等），所以，多线程执行不快反慢。

## 1.文件分块类

* 定义一个分块类
* 根据线程数对文件进行分块
* 获取每个线程需要读取的坐标

定义类的时候`class PartitionFile(object)`这种方式表示`PartitionFile`继承自`Object`类

```python
## 定义分块类
class PartitionFile(object):
    def __init__(self, fileName, threadNum):
        self.fileName = fileName
        self.blockNum = threadNum
```

定义对应的文件分块方法`partion`，最后返回每个区块文件的指针数组

```bash
   def partion(self):
        fd = open(self.fileName, 'r')
        fd.seek(0, 2)  # 移动文件指针到文件尾,用于获取文件大小
        fileSize = fd.tell()  # 获取文件字符数
        Pos_list = []  # 指针坐标，数组
        blockSize = int(fileSize/self.blockNum)
        start_Pos = 0  # 文件初始指针
        for i in range(self.blockNum):
            if i == self.blockNum-1:
                end_Pos = fileSize-1  # 最后一个文件区块为文件结尾
                Pos_list.append((start_Pos, end_Pos))
                break
            end_Pos = start_Pos+blockSize-1  # 均匀分配每个区块
            # if end_Pos >= fileSize:
            #   end_Pos=fileSize-1
            # if start_Pos >= fileSize:
            #   break
            Pos_list.append((start_Pos, end_Pos))
            start_Pos = end_Pos+1  # 下一个区块，开始坐标
        fd.close()
        return Pos_list
```

## 2.读取文件线程类

类初始化需要传递6个参数

* 线程编号
* 线程所属队列
* 文件名
* 文件区块其实指针
* 文件区块结束指针
* 自定义处理函数

读取文件线程也可以称作**生产者**，继承自Thread类，在初始化是调用，初始化一个线程

```python
class readThread(Thread):  # 这个括号表示继承threading.Thread类
    def __init__(self, thread_name, thread_queue, fileName, start_Pos, end_Pos, processFunction):
        super(readThread, self).__init__()  # 初始化一个线程
        self.name = thread_name
        self.queue = thread_queue
        self.start_Pos = start_Pos
        self.end_Pos = end_Pos
        self.fileName = fileName
        self.processFunction = processFunction
```

类中读取数据的函数`reader`

* 根据文件区块指针，进行一行一行读取
* 将读取的数据交给`processFunction`自定义函数处理，过滤掉一些行或者转化一下格式
* 将过滤后的结果存进队列中

`self.queue.put(tmp, block=True)`主要是当队列中数据已经满了，还来不及取出时，阻塞当前线程，等待队列闲置空间

```python
    def reader(self):
        fd = open(self.fileName, 'r')
        if self.start_Pos != 0:
            fd.seek(self.start_Pos-1)
            if fd.read(1) != '\n':  # 当前初始位置不是行首,移动到下一行行首
                fd.readline()
                self.start_Pos = fd.tell()
        fd.seek(self.start_Pos)  # 将文件指针定位到区块的行首
        while self.start_Pos <= self.end_Pos:  # 开始按行读取文件并且进行操作
            line = fd.readline()
            tmp = self.processFunction(line)
            if tmp:  # 判断数据是否需要放进队列
                self.queue.put(tmp, block=True)  # 阻塞队列等待有位置时就插入
            # else:
            #     self.start_Pos = fd.tell()
            #     continue
            self.start_Pos = fd.tell()  # 读完一行后，自动调整开始位置
        fd.close()
        return
```

启动线程函数

当线程启动是调用读取函数进行文件读取

```python
    def run(self):
        print("线程" + self.name+": 开始读取文件...")
        self.reader()
        print("线程" + self.name+": 读取完成...")
        return
```

## 3.获取队列数据线程类

也被称为**消费者**线程，

初始化

* 包含线程名
* 所属队列
* 以及初始化一个用于存放队列数据的数组

```python
class getThread(threading.Thread):
    def __init__(self, name, queue):
        super(getThread, self).__init__()
        self.name = name
        self.queue = queue
        self.out = []
```

获取队列数据`run`方法

* 当线程启动时，就开始向队列取数据
* 如果在向队列取数据时，等待时间过长，之间关闭当前进程

`Queue.get`函数默认有一个`block=True`，的参数；当线程取不到数据时，就一直会进入等待

```python
    def run(self):
        while True:
            try:
                self.out.append(self.queue.get(timeout=2))  # 等待2s，读不到数据直接退出
            except:
                break
```

## 4.函数封装

为了实现代码的可重复利用，可以将这几个模块一起封装成一个函数

* `defaultProcessFunction`默认自定义函数，不会对文件中行进行处理
* 定义生成者线程池
* 定义消费者生成池
* 等待所有线程池结束后，调用消费者线程的`getData`方法，获取所有数据

```python
import time
import threading
from queue import Queue
import re

def defaultProcessFunction(line):  # 对行数据不做处理的默认函数
    return line

def readFileByThread(fileName, ThreadNum, processFunction=defaultProcessFunction):
    out = []
    # 设置队列
    workQueue = Queue()
    # 线程池
    readThreads = []
    getThreads = []
    pos_list = PartitionFile(fileName, ThreadNum).partion()
    for i in range(len(pos_list)):
        postion = pos_list[i]
        mythread = readThread(str(i), workQueue, fileName,
                              postion[0], postion[1], processFunction)  # 初始化线程,设置预处理函数
        mythread.start()  # 启动线程
        getdataThread = getThread(str(i), workQueue)
        getdataThread.start()
        readThreads.append(mythread)  # 添加到线程池
        getThreads.append(getdataThread)  # 添加到线程池
    for i in readThreads:
        i.join()  # 等待所有线程完成
    for i in getThreads:
        i.join()  # 等待所有线程完成
    for i in getThreads:
        out += i.getData()
    return out
```

## 5.性能测试

由于测试文件只有20万行，进程间的调度也会消耗时间，所有有可能出现多进程比单进程慢一丢丢的情况

```python
if __name__ == "__main__":
    start_time = time.clock()
    readFileByThread(sys.argv[1], int(sys.argv[2]))
    end_time = time.clock()
    print('Cost Time is {:.2f}'.format(end_time-start_time))
```

* 多线程模式下

```bash
##单个线程情况
线程0: 开始读取文件...
线程0: 读取完成...
Cost Time is 2.61
## 4个线程情况
线程0: 开始读取文件...
线程1: 开始读取文件...
线程2: 开始读取文件...
线程3: 开始读取文件...
线程2: 读取完成...
线程1: 读取完成...
线程0: 读取完成...
线程3: 读取完成...
Gbar_A01
Cost Time is 17.77
```

## 6.源代码

<https://github.com/BiocottonHub/zpliuCode/blob/master/Hi-c/readFileByThread.py>

## 7.参考

1. [Queue队列](https://blog.csdn.net/GeekLeee/article/details/77883252)
2. [多线程爬取数据](https://blog.csdn.net/aa57255621/article/details/88965975)
3. [super父类](https://www.runoob.com/python/python-func-super.html)
4. [文件指针](https://www.runoob.com/python/file-seek.html)
5. [文件分块](https://blog.csdn.net/onlyforr/article/details/52094581?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase\&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.nonecase)
6. [python继承类](https://fishc.com.cn/thread-115047-1-1.html)
7. [Queue](https://www.cnblogs.com/xiangsikai/p/8185031.html)
8. [Queue取数据](https://www.cnblogs.com/wt11/p/5952500.html)
9. [子进程不结束](https://www.zhihu.com/question/63265466)


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://zpliu.gitbook.io/booknote/python/duo-xian-cheng.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
