当前位置:Gxlcms > 数据库问题 > 利用java多线程写的一个工具向MongoDb中存储大量数据

利用java多线程写的一个工具向MongoDb中存储大量数据

时间:2021-07-01 10:21:17 帮助过:2人阅读

import java.io.BufferedReader; 2 import java.io.File; 3 import java.io.FileInputStream; 4 import java.io.IOException; 5 import java.io.InputStreamReader; 6 import java.util.LinkedList; 7 import java.util.List; 8 import org.bson.Document; 9 import com.mongodb.MongoClient; 10 import com.mongodb.client.MongoCollection; 11 import com.mongodb.client.MongoDatabase; 12 13 public class Test { 14 public static void main(String[] args) { 15 Pool p = new Pool(); 16 Produce pro = new Produce("p",p);//一个生产者 17 //三个消费者,用来向mogodb中存储数据
       Customer cus = new Customer("c", p); 18 Customer cus2 = new Customer("c2", p); 19 Customer cus3 = new Customer("c3", p); 20 new Thread(pro).start(); 21 new Thread(cus).start(); 22 new Thread(cus2).start(); 23 new Thread(cus3).start(); 24 25 } 26 } 27 28 class Produce implements Runnable{//生产者 29 30 private static final String DIR = "E:\\targets";//扫描文件路径 31 private static final String FILE_SUFFIX = "html";//扫描文件类型 32 private Pool pool=null; 33 private String name = null; 34 public Produce(String name,Pool pool){ 35 this.pool= pool; 36 this.name = name; 37 } 38 @Override 39 public void run() { 40 getFilesInDir(DIR, FILE_SUFFIX); 41 } 42 43 public void getFilesInDir(String dir,String suffix){ 44 if(null!=dir && dir.trim().length()>0){ 45 File file = new File(dir.trim()); 46 if(file.isDirectory()){ 47 File[] flist = file.listFiles(); 48 if(null!=flist){ 49 for(File f:flist){ 50 if(f.isFile()){ 51 if(null==suffix){ 52 pool.putFile(f); 53 } 54 if(null!=suffix &&suffix.trim().length()>0){ 55 if(f.getName().endsWith(suffix.trim())){ 56 pool.putFile(f); 57 System.out.println(name+"pool存东西"+f.getName()); 58 } 59 } 60 }else{ 61 getFilesInDir(f.getAbsolutePath(),suffix); 62 } 63 } 64 } 65 } 66 } 67 } 68 69 } 70 71 class Customer implements Runnable{//消费者 72 private static final String CHARSET = "UTF-8";//文件处理编码格式 73 private static final String HOST = "127.0.0.1";//主机 74 private static final int PORT = 27017;//端口 75 private static final String DATABASE_NAME="mydb";//存储数据库名称 76 private static final String COLLECTION_NAME="mycol";//存储数据库Collection 77 MongoClient client =new MongoClient(HOST,PORT);; 78 private String name=null; 79 private Pool pool=null; 80 public Customer(String name,Pool pool){ 81 this.name = name; 82 this.pool = pool; 83 } 84 @Override 85 public void run() { 86 while(true){ 87 try { 88 File f = pool.fetchFile(); 89 saveToMonGoDb(f); 90 System.out.println(name+"pool取东西"+f.getName()); 91 } catch (Exception e) { 92 e.printStackTrace(); 93 } 94 } 95 } 96 97 private void saveToMonGoDb(File file){//将文件保存到数据库 98 MongoDatabase dataBase = client.getDatabase(DATABASE_NAME); 99 MongoCollection<Document> collection = dataBase.getCollection(COLLECTION_NAME); 100 String _id = file.getName().substring(0,file.getName().lastIndexOf(".")); 101 String content = readFileContext(file, CHARSET); 102 Document document = new Document("_id",_id).append("content", content); 103 collection.insertOne(document); 104 } 105 106 public static String readFileContext(File file,String charset) { 107 StringBuilder sb; 108 BufferedReader reader=null; 109 try { 110 reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charset)); 111 String line = null; 112 sb = new StringBuilder(); 113 while(null!=(line = reader.readLine())){ 114 sb.append(line+"\n"); 115 } 116 return sb.toString(); 117 }catch (Exception e) { 118 e.printStackTrace(); 119 }finally{ 120 try { 121 reader.close(); 122 } catch (IOException e) { 123 e.printStackTrace(); 124 } 125 } 126 return null; 127 } 128 129 } 130 131 class Pool{ 132 volatile int size=0; 133 volatile private List<File> files = new LinkedList<File>(); 134 public synchronized void putFile(File file){ 135 while(size>1000){ 136 try { 137 this.wait(); 138 } catch (InterruptedException e) { 139 e.printStackTrace(); 140 } 141 } 142 files.add(file); 143 ++size; 144 notify(); 145 } 146 147 public synchronized File fetchFile(){ 148 while(size<=0){ 149 try { 150 this.wait(); 151 } catch (InterruptedException e) {e.printStackTrace();} 152 } 153 File file = null; 154 if(files.size()>0){ 155 file = files.remove(0); 156 --size; 157 } 158 notify(); 159 return file; 160 } 161 162 public int getSize(){ 163 return this.size; 164 } 165 }

 

利用java多线程写的一个工具向MongoDb中存储大量数据

标签:

人气教程排行