class getThread(threading.Thread):
def __init__(self, name, queue):
super(getThread, self).__init__()
self.name = name
self.queue = queue
self.out = []
获取队列数据run方法
当线程启动时,就开始向队列取数据
如果在向队列取数据时,等待时间过长,之间关闭当前进程
Queue.get函数默认有一个block=True,的参数;当线程取不到数据时,就一直会进入等待
def run(self):
while True:
try:
self.out.append(self.queue.get(timeout=2)) # 等待2s,读不到数据直接退出
except:
break
4.函数封装
为了实现代码的可重复利用,可以将这几个模块一起封装成一个函数
defaultProcessFunction默认自定义函数,不会对文件中行进行处理
定义生成者线程池
定义消费者生成池
等待所有线程池结束后,调用消费者线程的getData方法,获取所有数据
import time
import threading
from queue import Queue
import re
def defaultProcessFunction(line): # 对行数据不做处理的默认函数
return line
def readFileByThread(fileName, ThreadNum, processFunction=defaultProcessFunction):
out = []
# 设置队列
workQueue = Queue()
# 线程池
readThreads = []
getThreads = []
pos_list = PartitionFile(fileName, ThreadNum).partion()
for i in range(len(pos_list)):
postion = pos_list[i]
mythread = readThread(str(i), workQueue, fileName,
postion[0], postion[1], processFunction) # 初始化线程,设置预处理函数
mythread.start() # 启动线程
getdataThread = getThread(str(i), workQueue)
getdataThread.start()
readThreads.append(mythread) # 添加到线程池
getThreads.append(getdataThread) # 添加到线程池
for i in readThreads:
i.join() # 等待所有线程完成
for i in getThreads:
i.join() # 等待所有线程完成
for i in getThreads:
out += i.getData()
return out
5.性能测试
由于测试文件只有20万行,进程间的调度也会消耗时间,所有有可能出现多进程比单进程慢一丢丢的情况
if __name__ == "__main__":
start_time = time.clock()
readFileByThread(sys.argv[1], int(sys.argv[2]))
end_time = time.clock()
print('Cost Time is {:.2f}'.format(end_time-start_time))
多线程模式下
##单个线程情况
线程0: 开始读取文件...
线程0: 读取完成...
Cost Time is 2.61
## 4个线程情况
线程0: 开始读取文件...
线程1: 开始读取文件...
线程2: 开始读取文件...
线程3: 开始读取文件...
线程2: 读取完成...
线程1: 读取完成...
线程0: 读取完成...
线程3: 读取完成...
Gbar_A01
Cost Time is 17.77