多进程

在使用多线程并没有实质性的提速后,果断放弃了多线程;最后还是使用多进程的方式处理这个50G的大文件

多线程和多进程区别

一. 两者区别 进程是分配资源的基本单位;线程是系统调度和分派的基本单位。 属于同一进程的线程,堆是共享的,栈是私有的。 属于同一进程的所有线程都具有相同的地址空间。

多进程的优点:

①编程相对容易;通常不需要考虑锁和同步资源的问题。 ②更强的容错性:比起多线程的一个好处是一个进程崩溃了不会影响其他进程。 ③有内核保证的隔离:数据和错误隔离。 对于使用如C/C++这些语言编写的本地代码,错误隔离是非常有用的:采用多进程架构的程序一般可以做到一定程度的自恢复;(master守护进程监控所有worker进程,发现进程挂掉后将其重启)。

多线程的优点:

①创建速度快,方便高效的数据共享 共享数据:多线程间可以共享同一虚拟地址空间;多进程间的数据共享就需要用到共享内存、信号量等IPC技术。 ②较轻的上下文切换开销 - 不用切换地址空间,不用更改寄存器,不用刷新TLB。 ③提供非均质的服务。如果全都是计算任务,但每个任务的耗时不都为1s,而是1ms-1s之间波动;这样,多线程相比多进程的优势就体现出来,它能有效降低“简单任务被复杂任务压住”的概率。

1.文件分块类

通过初始化一个文件分块类,并调用partion函数,获得每个进程所需要读取的区间

pos_list = PartitionFile(fileName, ProcessNum).partion() # *存放所有文件指针坐标*

class PartitionFile(object):
    def __init__(self, fileName, jobsNum):
        self.fileName = fileName
        self.blockNum = jobsNum

    def partion(self):
        fd = open(self.fileName, 'r')
        fd.seek(0, 2)  # 移动文件指针到文件尾,用于获取文件大小
        fileSize = fd.tell()  # 获取文件字符数
        Pos_list = []  # 指针坐标,数组
        blockSize = int(fileSize/self.blockNum)
        start_Pos = 0  # 文件初始指针
        for i in range(self.blockNum):
            if i == self.blockNum-1:
                end_Pos = fileSize-1  # 最后一个文件区块为文件结尾
                Pos_list.append((start_Pos, end_Pos))
                break
            end_Pos = start_Pos+blockSize-1  # 均匀分配每个区块
            Pos_list.append((start_Pos, end_Pos))
            start_Pos = end_Pos+1  # 下一个区块的开始坐标
        fd.close()
        return Pos_list

2.读取进程

因为python中字典和列表都是引用类型数据,所以使用函数对其直接操作,可以改变原字典;这里只有文件的读取,不涉及子进程间的通信,所以不使用Queue

  • 进程类中定义一个self.outData用于存储每行结果

  • processFunction函数会对每行结果进行处理后存进self.outData

    def reader(self):
        fd = open(self.fileName, 'r')
        if self.start_Pos != 0:
            fd.seek(self.start_Pos-1)
            if fd.read(1) != '\n':  # 当前初始位置不是行首,移动到下一行行首
                fd.readline()
                self.start_Pos = fd.tell()
        fd.seek(self.start_Pos)  # 将文件指针定位到区块的行首
        while self.start_Pos <= self.end_Pos:  # 开始按行读取文件并且进行操作
            tmp = fd.readline()
            processFunction(tmp, self.outData)  # 将每行结果存进字典内
            self.start_Pos = fd.tell()  # 读完一行后,自动调整开始位置
        fd.close()
        return

子进程写入文件

这里没有直接使用生产者-消费者模型,是因为基本没有子进程间的通信;只有主进程和子进程的通信;我当时还直接将所有的子进程读到的文件数据写入Queue,然后再从主进程中读取,结果最后子进程一直被挂起了。原因可能是,队列容量达到系统上限,生成者太多了,消费者只有主进程一个

参考:https://www.zhihu.com/question/63265466

  • 遍历self.outData数据,将结果写入文件

  • 由于涉及到多个进程对文件的写,这里使用了文件锁

  • 将写入的文件名通过Queue传递给主进程

    def run(self):
        print("Process: " + self.name+": reading file...")
        self.reader()
        print("Process: " + self.name+": begin to write temporary data to file...")
        for key in self.outData.keys():
            self.queue.put(key, block=True, timeout=None)
            with open(path+"/"+key, 'a') as File:
                fcntl.flock(File.fileno(), fcntl.LOCK_EX)
                for item in self.outData[key]:
                    File.write(item[0]+" "+key+" "+" ".join(item[1:])+"\n")
        print("Process: " + self.name+": completed write...")

3.排序进程

  • 排序进程的文件名是,主进程从队列中获取得到的

    def sortFile(self):
        with open(path+"/"+self.fileName, 'r') as File:
            for line in File.readlines():
                line = line.split(" ")
                try:
                    # [tmpDict[line[3]], line[1],line[2], '0', tmpDict[line[6]], line[4], line[5], '0']
                    # 都是同一条染色体对应的Chr1-Chr2 Chr1-Chr3
                    self.outData[line[5]].append(
                        [line[0], line[2], line[3], line[4], line[6], line[7]])
                except KeyError:
                    self.outData[line[5]] = [
                        [line[0], line[2], line[3], line[4], line[6], line[7]]]
        with open(path+"/"+self.fileName+"_sorted", 'w') as File:
            sortKey = sorted(self.outData)
            for key in sortKey:
                for item in self.outData[key]:
                    File.write(item[0]+" "+self.fileName+" " +
                               " ".join(item[1:4])+" "+key+" "+item[-2]+" "+item[-1])

4.封装后的函数

  • 主进程通过队列的方式从子进程中获取染色体编号

    path = 'tmp'+str(int(time.time()))
    mkdir(path)
    workQueue = Queue()  # 用于存放子进程文件数据
    read_jobs = []
    sort_jobs = []
    chrosomes = []
    pos_list = PartitionFile(fileName, ProcessNum).partion()  # 存放所有文件指针坐标
    for i in range(ProcessNum):
        position = pos_list[i]
        myprocess = readProcess(
            str(i), fileName, workQueue, position[0], position[1], processFunction)
        myprocess.start()
        read_jobs.append(myprocess)
    for i in read_jobs:
        i.join()
    while True:
        try:
            chrosomes.append(workQueue.get(block=True, timeout=1))  # 获取子进程数据
        except:
            break
    for i in list(set(chrosomes)):
        myprocess = sortProcess(str(i), i)  # 排序进程
        myprocess.start()
        sort_jobs.append(myprocess)
    for i in sort_jobs:
        i.join()

5.性能测试

  • 单进程

Process: 0: reading file...
Process: 0: begin to write temporary data to file...
Process: 0: completed write...
sorting chrosome: Gbar_A01...
chrosome: Gbar_A01ok...
merge chrosomes to a single file...
completed!
there are some temporary file in directory: <./tmp1593916264>
if you can remove it by yourself!
Cost Time is 46.96
  • 四个进程·

Process: 1: reading file...
Process: 0: reading file...
Process: 3: reading file...
Process: 2: reading file...
Process: 3: begin to write temporary data to file...
Process: 1: begin to write temporary data to file...
Process: 3: completed write...
Process: 2: begin to write temporary data to file...
Process: 0: begin to write temporary data to file...
Process: 1: completed write...
Process: 2: completed write...
Process: 0: completed write...
sorting chrosome: Gbar_A01...
chrosome: Gbar_A01  ok...
merge chrosomes to a single file...
completed!
there are some temporary file in directory: <./tmp1593916319>
if you can remove it by yourself!
Cost Time is 18.70

6.源代码

https://github.com/BiocottonHub/zpliuCode/blob/master/Hi-c/HiCProTojuicer.py

7.参考

Last updated