'''
Usage:
python3.6 $0 inputfile outputfile coreNumber
'''
import sys
import multiprocessing
from scipy import stats
import time
def BioTest(TestData):
output = []
for line in range(0, len(TestData)):
TestData[line] = TestData[line].strip("\n").split("\t")
count = int(TestData[line][2])+int(TestData[line][3])
p_value = stats.binom.pmf(int(TestData[line][2]), count, 0.006)
TestData[line].append(p_value)
if p_value < 1e-5:
output.append(TestData[line])
return output
if __name__ == "__main__":
startTime=time.time()
with open(sys.argv[1], 'r') as inputfiel:
TestData = inputfiel.readlines()
output = []
ProcessNum=int(sys.argv[3])
average=int(len(TestData)/ProcessNum)
p=multiprocessing.Pool(8)
for processId in range(0,ProcessNum):
if processId == ProcessNum-1:
start=processId*average
end=len(TestData)
else:
start=processId*average
end=(processId+1)*average
output.append(p.apply_async(BioTest,(TestData[start:end],)))
p.close()
p.join()
with open(sys.argv[2], 'w') as OUTPUT:
for result in output:
for line in result.get():
OUTPUT.write(line[0]+"\t"+line[1]+"\t"+line[2]+"\t"+line[3]+"\t"+str(line[4])+"\n")
end_Time=time.time()
print("运行时间为%.2f" %(end_Time-startTime))
'''
Usage:
python3.6 $0 inputfile outputfile coreNumber
input Date type:
chromosomeID position count1 count2
output Date type:
chromosomeID position count1 count2 p-value
'''
import sys
import multiprocessing
from scipy import stats
import time
import fcntl
def BioTest(TestData,processId):
output = []
for line in range(0, len(TestData)):
TestData[line] = TestData[line].strip("\n").split("\t")
count = int(TestData[line][2])+int(TestData[line][3])
p_value = stats.binom.pmf(int(TestData[line][2]), count, 0.006)
TestData[line].append(p_value)
if p_value < 1e-5:
output.append(TestData[line])
return [output,processId]
def WtriteFile(callbackData):
with open(sys.argv[2], 'a') as OUTPUT:
fcntl.flock(OUTPUT.fileno(), fcntl.LOCK_EX) #文件锁
print(str(callbackData[1])+" 进程开始写入文件!")
for line in callbackData[0]:
OUTPUT.write(line[0]+"\t"+line[1]+"\t"+line[2]+"\t"+line[3]+"\t"+str(line[4])+"\n")
print(str(callbackData[1])+" 进程写入完成!")
if __name__ == "__main__":
startTime=time.time()
with open(sys.argv[1], 'r') as inputfiel:
TestData = inputfiel.readlines()
output = []
ProcessNum=int(sys.argv[3])
average=int(len(TestData)/ProcessNum)
p=multiprocessing.Pool(20) #设置进程池数,设置数目超过20将需要排队等候
for processId in range(0,ProcessNum):
if processId == ProcessNum-1:
start=processId*average
end=len(TestData) #防止除不尽,最后一个进程计算到最后
else:
start=processId*average
end=(processId+1)*average
p.apply_async(BioTest,(TestData[start:end],processId),callback=WtriteFile) #将切片数据分发给多个进程,最终结果存进列表里
p.close() #不在接收进程
p.join() #进程池运行结束后,进入主进程
# with open(sys.argv[2], 'w') as OUTPUT:
# for result in output:
# for line in result.get():
# OUTPUT.write(line[0]+"\t"+line[1]+"\t"+line[2]+"\t"+line[3]+"\t"+str(line[4])+"\n")
end_Time=time.time()
print("运行时间为%.2f" %(end_Time-startTime))