时间:2021-07-01 10:21:17 帮助过:14人阅读
只要存在数据库,就会有后台批量处理数据的需求,比如数据表备份、定期清理、数据替换、数据迁移,对于批量处理来说,往往会涉及大量的查询、过滤、归类、聚合计算,在批量脚本中直接查询数据库往往性能太低,甚至会因为一个大型的SQL导致数据库锁表出现线上事故,因此一般采用先导出到文件,在文件上计算然后再导入,比如:
1、使用mysql -e “select * from table” > output.txt的方式,执行SQL,将结果导出到文件中;
2、针对文件,使用各种方式进行聚合、过滤、替换等计算,最后产出成需要使用的格式;
3、发布产出的文件,或者使用load data命令导入到数据库;
由于只是一次性的批量查询数据库导出到文件,然后针对文件进行计算,而不是每次都查询数据库,大量节省了网络的IO耗费,从而提升处理的速度。
然而得到了导出的文件之后,如果文件过大,或者计算逻辑复杂比如大量的调用了耗费CPU的正则匹配、聚合计算,那么单线程的处理会耗费大量的时间,这时候就可以引入并发处理,使得机器的CPU、内存、IO、网络等资源全部充分利用起来,大幅度降低处理时间。
HADOOP的MAP-REDUCE的做法,是先将文件split成小分片文件,然后针对每个分片做计算,最后将每个分片的结果聚合在一起,然而由于HADOOP的调度、集群稳定性等各种原因,对于MB大小级别的文件处理,会发现速度非常慢,有时候甚至比单机单线程处理速度还慢,将单机单线程改成多线程,往往会发现令人惊讶的效果提升。
直观的做法,是使用主线程读取输入的单个大文件,然后将读取的结果分配给子线程处理,然后主线程做整合,这种方式因为多线程共用了单个文件的IO,需要加入对文件的同步机制,最后会发现性能瓶颈在这单个文件的读取同步之上。
可以将大文件分片成小文件,然后每个文件分配给单个线程单独处理,避免线程间的资源同步,每个线程会享用单独的CPU核、内存单元、文件句柄,处理速度能达到最快。
使用这种方式,可以用以下的步骤进行:
1、使用SHELL,将输入文件拆分成预定线程数目的份数,存放到一个目录中;
2、以输入文件的目录路径作为参数,编程语言JAVA/PYTHON读取该目录的所有文件,对于每个文件启动一个处理线程,进行处理;
3、SHELL将输出目录的所有文件,使用cat file* > output_file的方式,得到最终的计算结果
Shell
function run multitask(){ # 开启多个异步线程 SPLITS COUNT=20 # 输入文件总数 sourcefile linescount=
cat ${input_file} | wc -l# 计算出拆分的文件个数 split filelines count=$(( $sourcefile linescount / $SPLITS COUNT )) # 进行文件拆分 split -l $splitfile linescount -a 3 -d ${input file} ${input
dir}/inputFile_
# 执行JAVA程序$JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}"# 合并文件cat ${output_dir}/* > ${output_file}
}
run multitask
## 将输入文件拆分成多个小文件,启动多线程进行处理,输出结果文件#function run_multi_task(){ # 开启多个异步线程 SPLITS_COUNT=20 # 输入文件总数 source_file_lines_count=`cat ${input_file} | wc -l` # 计算出拆分的文件个数 split_file_lines_count=$(( $source_file_lines_count / $SPLITS_COUNT )) # 进行文件拆分 split -l $split_file_lines_count -a 3 -d ${input_file} ${input_dir}/inputFile_ # 执行JAVA程序 $JAVA_CMD -classpath $jar_path "net.crazyant.BackTaskMain" "${input_dir}" "${output_dir}" "${output_err_dir}" # 合并文件 cat ${output_dir}/* > ${output_file}} run_multi_task
这里注意,拆分文件的时候,不能使用split按照大小进行拆分,因为会把输入文件中的行截断;
对应的JAVA程序,则是通过读取文件夹中文件列表的方法,每个文件单独启动一个线程:
Java
public class BackTaskMain { public static void main(String[] args) { String inputDataDir = args[1]; String outputDataDir = args[2]; String errDataDir = args[3]; File inputDir = new File(inputDataDir); File[] inputFiles = inputDir.listFiles(); // 记录开启的线程 Listthreads = new ArrayList (); for (File inputFile : inputFiles) { if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) { continue; } // 针对每个inputFile,生成对应的outputFile和errFile String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out"; String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err"; // 创建Runnable BackRzInterface backRzInterface = new BackRzInterface(); backRzInterface.setInputFilePath(inputFile.getAbsolutePath()); backRzInterface.setOutputFilePath(outputSrcLiceFpath); backRzInterface.setErrorOutputFpath(errorOutputFpath); // 创建Thread,启动线程 Thread singleRunThread = new Thread(backRzInterface); threads.add(singleRunThread); singleRunThread.start(); } for (Thread thread : threads) { try { // 使用thread.join(),等待所有的线程执行完毕 thread.join(); System.out.println(thread.getName() + " has over"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("proccess all over"); }}
public class BackTaskMain { public static void main(String[] args) { String inputDataDir = args[1]; String outputDataDir = args[2]; String errDataDir = args[3]; FileinputDir = new File(inputDataDir); File[] inputFiles = inputDir.listFiles(); // 记录开启的线程 Listthreads = new ArrayList (); for (FileinputFile : inputFiles) { if (inputFile.getName().equals(".") || inputFile.getName().equals("..")) { continue; } // 针对每个inputFile,生成对应的outputFile和errFile String outputSrcLiceFpath = outputDataDir + "/" + inputFile.getName() + ".out"; String errorOutputFpath = errDataDir + "/" + inputFile.getName() + ".err"; // 创建Runnable BackRzInterfacebackRzInterface = new BackRzInterface(); backRzInterface.setInputFilePath(inputFile.getAbsolutePath()); backRzInterface.setOutputFilePath(outputSrcLiceFpath); backRzInterface.setErrorOutputFpath(errorOutputFpath); // 创建Thread,启动线程 ThreadsingleRunThread = new Thread(backRzInterface); threads.add(singleRunThread); singleRunThread.start(); } for (Threadthread : threads) { try { // 使用thread.join(),等待所有的线程执行完毕 thread.join(); System.out.println(thread.getName() + " has over"); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("proccess all over"); }}
通过这种方式,将大文件拆分成小文件,启动多个线程,每个线程处理一个小文件,最终将每个小文件的结果聚合,就得到了最终产出,性能上却大幅提升。
在上面的代码中,BackRzInterface是每个线程启动时要使用的Runnable对象,可以看到用的是每次new的方式创建:
// 创建RunnableBackRzInterface backRzInterface = new BackRzInterface();
这样每个处理线程依赖的BackRzInterface都是独立的,对这个Runnable代码的使用不会存在同步问题。
如果多线程处理中需要使用外部资源,最好想办法使得每个线程单独使用自己独占的资源,相互之间若不会存在冲突,可以实现最大化并发处理。
其他一些例子,比如:
通过以上这些类似的方法,每次将可能需要同步访问的共享资源,通过复制、分片等手段得到不同份,每个线程单独访问自己那一份,避免同步访问,最终实现性能最优。
如果将文件拆分成了多份,依赖的ID、词典等资源也相应提供了多份,但是发现代码中存在无法解决的代码级别同步,该怎么办呢?
相对于想尽办法解决代码中的同步问题来说,多线程和多进程之间的性能差别微乎其微,我们都知道线程会使用进程的资源,所以导致了线程之间存在竞争进程资源,但是对于进程来说,CPU、内存等硬件资源是完全隔离的,这时候将程序运行在多进程而不是多线程,反而能更好的提升性能。
对于一些支持多线程不好的语言,比如PHP,直接用这种多进程计算的方法,速度并不比支持多线程的JAVA、PYTHON语言差:
Shell
SPLITS_COUNT=20
input splitsdir="${input dir}splits" output splitsdir="${output dir}splits"
source filelines_count=
cat ${input_file} | wc -l
split filelines count=$(( $sourcefile linescount / ${SPLITS_COUNT} ))
split -l $split filelines count -a 3 -d ${inputfile} ${input splitsdir}/inputfile_
process idx=1 for fname in $(ls ${inputsplits dir}); do inputfpath=${input splitsdir}/$fname ouput fpath=${outputsplits dir}/$fname # 后台执行所有进程 php "/php/main.php" "${inputfpath}" "${ouput fpath}" & (( processidx++ )) done
wait
cat $output splitsdir/* > ${output_file}
# 要拆分的文件数,也就是要启动的多进程数SPLITS_COUNT=20 input_splits_dir="${input_dir}_splits"output_splits_dir="${output_dir}_splits"# 输入文件行数source_file_lines_count=`cat ${input_file} | wc -l`# 每个文件拆分的行数=总行数除以要拆分的文件个数(也就是对应进程的个数)split_file_lines_count=$(( $source_file_lines_count / ${SPLITS_COUNT} ))# 执行拆分,注意这里使用-l进行行级别拆分更好split -l $split_file_lines_count -a 3 -d ${input_file} ${input_splits_dir}/inputfile_ process_idx=1for fname in $(ls ${input_splits_dir}); do input_fpath=${input_splits_dir}/$fname ouput_fpath=${output_splits_dir}/$fname # 后台执行所有进程 php "/php/main.php" "${input_fpath}" "${ouput_fpath}" & (( process_idx++ )) done # 等待所有后台进程执行结束wait # 合并文件cat $output_splits_dir/* > ${output_file}
上述代码中,使用shell的&符号,可以在后台同时启动多个进程,使用wait语法,可以实现多线程的Thread.join特性,等待所有的进程执行结束。
对于输入文件的大小、计算的复杂度处于单机和集群计算之间的数据处理,使用并发处理最为合适,但是并发的同步处理却会降低多线程的性能,这时可以借助于输入文件复制拆分、依赖资源复制拆分切片等方法,实现每个线程处理自己的独占资源,从而最大化提升计算速度。而对于一些无法避免的代码同步冲突逻辑,可以退化为多进程处理数据,借助于SHELL的后台进程支持,实现进程级别的资源独占,最终大幅提升处理性能。