利用java多线程写的一个工具向MongoDb中存储大量数据
时间:2021-07-01 10:21:17
帮助过:2人阅读
import java.io.BufferedReader;
2 import java.io.File;
3 import java.io.FileInputStream;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.util.LinkedList;
7 import java.util.List;
8 import org.bson.Document;
9 import com.mongodb.MongoClient;
10 import com.mongodb.client.MongoCollection;
11 import com.mongodb.client.MongoDatabase;
12
13 public class Test {
14 public static void main(String[] args) {
15 Pool p =
new Pool();
16 Produce pro =
new Produce("p"
,p);//一个生产者
17 //三个消费者,用来向mogodb中存储数据
Customer cus =
new Customer("c"
, p);
18 Customer cus2 =
new Customer("c2"
, p);
19 Customer cus3 =
new Customer("c3"
, p);
20 new Thread(pro).start();
21 new Thread(cus).start();
22 new Thread(cus2).start();
23 new Thread(cus3).start();
24
25 }
26 }
27
28 class Produce
implements Runnable{//生产者
29
30 private static final String DIR = "E:\\targets";
//扫描文件路径
31 private static final String FILE_SUFFIX = "html";
//扫描文件类型
32 private Pool pool=
null;
33 private String name =
null;
34 public Produce(String name,Pool pool){
35 this.pool=
pool;
36 this.name =
name;
37 }
38 @Override
39 public void run() {
40 getFilesInDir(DIR, FILE_SUFFIX);
41 }
42
43 public void getFilesInDir(String dir,String suffix){
44 if(
null!=dir && dir.trim().length()>0
){
45 File file =
new File(dir.trim());
46 if(file.isDirectory()){
47 File[] flist =
file.listFiles();
48 if(
null!=
flist){
49 for(File f:flist){
50 if(f.isFile()){
51 if(
null==
suffix){
52 pool.putFile(f);
53 }
54 if(
null!=suffix &&suffix.trim().length()>0
){
55 if(f.getName().endsWith(suffix.trim())){
56 pool.putFile(f);
57 System.out.println(name+"pool存东西"+
f.getName());
58 }
59 }
60 }
else{
61 getFilesInDir(f.getAbsolutePath(),suffix);
62 }
63 }
64 }
65 }
66 }
67 }
68
69 }
70
71 class Customer
implements Runnable{//消费者
72 private static final String CHARSET = "UTF-8";
//文件处理编码格式
73 private static final String HOST = "127.0.0.1";
//主机
74 private static final int PORT = 27017;
//端口
75 private static final String DATABASE_NAME="mydb";
//存储数据库名称
76 private static final String COLLECTION_NAME="mycol";
//存储数据库Collection
77 MongoClient client =
new MongoClient(HOST,PORT);;
78 private String name=
null;
79 private Pool pool=
null;
80 public Customer(String name,Pool pool){
81 this.name =
name;
82 this.pool =
pool;
83 }
84 @Override
85 public void run() {
86 while(
true){
87 try {
88 File f =
pool.fetchFile();
89 saveToMonGoDb(f);
90 System.out.println(name+"pool取东西"+
f.getName());
91 }
catch (Exception e) {
92 e.printStackTrace();
93 }
94 }
95 }
96
97 private void saveToMonGoDb(File file){//将文件保存到数据库
98 MongoDatabase dataBase =
client.getDatabase(DATABASE_NAME);
99 MongoCollection<Document> collection =
dataBase.getCollection(COLLECTION_NAME);
100 String _id = file.getName().substring(0,file.getName().lastIndexOf("."
));
101 String content =
readFileContext(file, CHARSET);
102 Document document =
new Document("_id",_id).append("content"
, content);
103 collection.insertOne(document);
104 }
105
106 public static String readFileContext(File file,String charset) {
107 StringBuilder sb;
108 BufferedReader reader=
null;
109 try {
110 reader =
new BufferedReader(
new InputStreamReader(
new FileInputStream(file), charset));
111 String line =
null;
112 sb =
new StringBuilder();
113 while(
null!=(line =
reader.readLine())){
114 sb.append(line+"\n"
);
115 }
116 return sb.toString();
117 }
catch (Exception e) {
118 e.printStackTrace();
119 }
finally{
120 try {
121 reader.close();
122 }
catch (IOException e) {
123 e.printStackTrace();
124 }
125 }
126 return null;
127 }
128
129 }
130
131 class Pool{
132 volatile int size=0
;
133 volatile private List<File> files =
new LinkedList<File>
();
134 public synchronized void putFile(File file){
135 while(size>1000
){
136 try {
137 this.wait();
138 }
catch (InterruptedException e) {
139 e.printStackTrace();
140 }
141 }
142 files.add(file);
143 ++
size;
144 notify();
145 }
146
147 public synchronized File fetchFile(){
148 while(size<=0
){
149 try {
150 this.wait();
151 }
catch (InterruptedException e) {e.printStackTrace();}
152 }
153 File file =
null;
154 if(files.size()>0
){
155 file = files.remove(0
);
156 --
size;
157 }
158 notify();
159 return file;
160 }
161
162 public int getSize(){
163 return this.size;
164 }
165 }
利用java多线程写的一个工具向MongoDb中存储大量数据
标签: