当前位置:Gxlcms > 数据库问题 > Nutch2 WebPage写入数据库的过程分析

Nutch2 WebPage写入数据库的过程分析

时间:2021-07-01 10:21:17 帮助过:12人阅读

print?
  1. //file: org/apache/nutch/storage/WebPage.java  
  2. //line: 42  
  3. public class WebPage extends PersistentBase {  
  4.   public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"baseUrl\",\"type\":\"string\"},{\"name\":\"status\",\"type\":\"int\"},{\"name\":\"fetchTime\",\"type\":\"long\"},{\"name\":\"prevFetchTime\",\"type\":\"long\"},{\"name\":\"fetchInterval\",\"type\":\"int\"},{\"name\":\"retriesSinceFetch\",\"type\":\"int\"},{\"name\":\"modifiedTime\",\"type\":\"long\"},{\"name\":\"prevModifiedTime\",\"type\":\"long\"},{\"name\":\"protocolStatus\",\"type\":{\"type\":\"record\",\"name\":\"ProtocolStatus\",\"fields\":[{\"name\":\"code\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"lastModified\",\"type\":\"long\"}]}},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"contentType\",\"type\":\"string\"},{\"name\":\"prevSignature\",\"type\":\"bytes\"},{\"name\":\"signature\",\"type\":\"bytes\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"parseStatus\",\"type\":{\"type\":\"record\",\"name\":\"ParseStatus\",\"fields\":[{\"name\":\"majorCode\",\"type\":\"int\"},{\"name\":\"minorCode\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}},{\"name\":\"score\",\"type\":\"float\"},{\"name\":\"reprUrl\",\"type\":\"string\"},{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"inlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"markers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\"values\":\"bytes\"}},{\"name\":\"batchId\",\"type\":\"string\"}]}");  
  5. //..  
  6. public Schema getSchema() { return _SCHEMA; }  
  7. //...  
  8. }  
//file: org/apache/nutch/storage/WebPage.java//line: 42public class WebPage extends PersistentBase {  public static final Schema _SCHEMA = Schema.parse("{\"type\":\"record\",\"name\":\"WebPage\",\"namespace\":\"org.apache.nutch.storage\",\"fields\":[{\"name\":\"baseUrl\",\"type\":\"string\"},{\"name\":\"status\",\"type\":\"int\"},{\"name\":\"fetchTime\",\"type\":\"long\"},{\"name\":\"prevFetchTime\",\"type\":\"long\"},{\"name\":\"fetchInterval\",\"type\":\"int\"},{\"name\":\"retriesSinceFetch\",\"type\":\"int\"},{\"name\":\"modifiedTime\",\"type\":\"long\"},{\"name\":\"prevModifiedTime\",\"type\":\"long\"},{\"name\":\"protocolStatus\",\"type\":{\"type\":\"record\",\"name\":\"ProtocolStatus\",\"fields\":[{\"name\":\"code\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"lastModified\",\"type\":\"long\"}]}},{\"name\":\"content\",\"type\":\"bytes\"},{\"name\":\"contentType\",\"type\":\"string\"},{\"name\":\"prevSignature\",\"type\":\"bytes\"},{\"name\":\"signature\",\"type\":\"bytes\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"text\",\"type\":\"string\"},{\"name\":\"parseStatus\",\"type\":{\"type\":\"record\",\"name\":\"ParseStatus\",\"fields\":[{\"name\":\"majorCode\",\"type\":\"int\"},{\"name\":\"minorCode\",\"type\":\"int\"},{\"name\":\"args\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}},{\"name\":\"score\",\"type\":\"float\"},{\"name\":\"reprUrl\",\"type\":\"string\"},{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"outlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"inlinks\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"markers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\"values\":\"bytes\"}},{\"name\":\"batchId\",\"type\":\"string\"}]}");//..public Schema getSchema() { return _SCHEMA; }//...}

这是一个json格式的字符串,由avro负责解析

2.  传递Schema
这一过程在提交job之前的初始化阶段进行
[cpp] view plain copy print?
  1. //file: org/apache/nutch/crawl/InjectorJob.java  
  2. //InjectorJob.run(Map<String,Object>) line: 221     
  3. {   
  4.     DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),  
  5.       String.class, WebPage.class);  
  6. }   
//file: org/apache/nutch/crawl/InjectorJob.java//InjectorJob.run(Map<String,Object>) line: 221   {     DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),      String.class, WebPage.class);} 


一层层的传递persistentClass

[cpp] view plain copy print?
  1. //file: gora-core-0.2.1/org/apache/gora/store/DataStoreFactory.java  
  2. //DataStoreFactory.createDataStore(Class<D>, Class<K>, Class<T>, Configuration, String) line: 135    
  3. {  
  4.     return createDataStore(dataStoreClass, keyClass, persistent, conf, createProps(), schemaName);  
  5. }   
//file: gora-core-0.2.1/org/apache/gora/store/DataStoreFactory.java//DataStoreFactory.createDataStore(Class<D>, Class<K>, Class<T>, Configuration, String) line: 135  {    return createDataStore(dataStoreClass, keyClass, persistent, conf, createProps(), schemaName);} 


gora调用WebPage.getSchema() ,获取了Schema
[cpp] view plain copy print?
  1. //file: gora-core-0.2.1/org/apache/gora/store/DataStoreBase.java  
  2. //SqlStore<K,T>(DataStoreBase<K,T>).initialize(Class<K>, Class<T>, Properties) line: 81   
  3. {     
  4.     schema = this.beanFactory.getCachedPersistent().getSchema();  
  5.     fieldMap = AvroUtils.getFieldMap(schema);  
  6. }   
//file: gora-core-0.2.1/org/apache/gora/store/DataStoreBase.java//SqlStore<K,T>(DataStoreBase<K,T>).initialize(Class<K>, Class<T>, Properties) line: 81 {       schema = this.beanFactory.getCachedPersistent().getSchema();    fieldMap = AvroUtils.getFieldMap(schema);} 



3. 传递数据、序列化
这一过程在Map阶段进行

Map方法创建webpage(row),并在最后输出到context
[cpp] view plain copy print?
  1. //file: org/apache/nutch/crawl/InjectorJob.java  
  2. //InjectorJob$UrlMapper.map(LongWritable, Text, Mapper<LongWritable,Text,String,Contex>) line: 191   
  3. {     
  4.       context.write(reversedUrl, row);  
  5. }   
//file: org/apache/nutch/crawl/InjectorJob.java//InjectorJob$UrlMapper.map(LongWritable, Text, Mapper<LongWritable,Text,String,Contex>) line: 191 {         context.write(reversedUrl, row);} 


hadoop core 逐层传递webpage
[cpp] view plain copy print?
  1. //file: hadoop-src/org/apache/hadoop/mapred/MapTask.java  
  2. //MapTask$NewDirectOutputCollector<K,V>.write(K, V) line: 638      
  3. {  
  4.       reporter.progress();  
  5.       long bytesOutPrev = getOutputBytes(fsStats);  
  6.       out.write(key, value);  
  7. }   
//file: hadoop-src/org/apache/hadoop/mapred/MapTask.java//MapTask$NewDirectOutputCollector<K,V>.write(K, V) line: 638    {      reporter.progress();      long bytesOutPrev = getOutputBytes(fsStats);      out.write(key, value);} 

上面的out对象的类型是GoraRecoreWriter

[cpp] view plain copy print?
  1. //file: gora-core-0.2.1/org/apache/gora/mapreduce/GoraRecordWriter.java  
  2. //GoraRecordWriter<K,T>.write(K, T) line: 60     
  3. {   
  4.     store.put(key, (Persistent) value);  
  5. }  
//file: gora-core-0.2.1/org/apache/gora/mapreduce/GoraRecordWriter.java//GoraRecordWriter<K,T>.write(K, T) line: 60   {     store.put(key, (Persistent) value);}


对象store的实际类型为SqlStore,继承自Gora-core的DataStoreBase类,负责对Mysql的读写。K是主键,T是一个WebPage对象,先写到cache里面。

[cpp] view plain copy print?
  1. //file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java  
  2. //SqlStore<K,T>.put(K, T) line: 616    
  3.    
  4.   public void put(K key, T persistent)  
  5.   {       
  6.       List<Field> fields = schema.getFields();  
  7.   
  8.       for (int i = 0; i < fields.size(); i++) {  
  9.         Field field = fields.get(i);  
  10.         Column column = mapping.getColumn(field.name());  
  11.         insertStatement.setObject(persistent.get(i), field.schema(), column);  
  12.       }  
  13.   
  14.       //jdbc already should cache the ps  
  15.       PreparedStatement insert = insertStatement.toStatement(connection);  
  16.       synchronized (writeCache) {  
  17.         writeCache.add(insert);  
  18.       }  
  19.   
  20.   }  
//file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java//SqlStore<K,T>.put(K, T) line: 616     public void put(K key, T persistent)  {           List<Field> fields = schema.getFields();      for (int i = 0; i < fields.size(); i++) {        Field field = fields.get(i);        Column column = mapping.getColumn(field.name());        insertStatement.setObject(persistent.get(i), field.schema(), column);      }      //jdbc already should cache the ps      PreparedStatement insert = insertStatement.toStatement(connection);      synchronized (writeCache) {        writeCache.add(insert);      }  }


toStatement()里面调用了setField(),序列化操作由avro实现,这里暂不深入
[cpp] view plain copy print?
  1. //file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java  
  2. //SqlStore<K,T>.setField(PreparedStatement, Column, Schema, int, Object) line: 718  
  3. {  
  4.    IOUtils.serialize(os, datumWriter, schema, object);  
  5. }   
//file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java//SqlStore<K,T>.setField(PreparedStatement, Column, Schema, int, Object) line: 718{   IOUtils.serialize(os, datumWriter, schema, object);} 



4. flush操作
[cpp] view plain copy print?
  1. //file: hadoop-src/org/apache/hadoop/mapred/MapTask.java  
  2. //MapTask.runNewMapper(JobConf, TaskSplitIndex, TaskUmbilicalProtocol, TaskReporter) line: 767  
  3. {   
  4.     output.close(mapperContext);  
  5. }  
  6.   
  7. //file: gora-core-0.2.1/org/apache/gora/mapreduce/GoraRecordWriter.java  
  8. //GoraRecordWriter<K,T>.close(TaskAttemptContext) line: 55      
  9. {  
  10.     store.close();  
  11. }  
//file: hadoop-src/org/apache/hadoop/mapred/MapTask.java//MapTask.runNewMapper(JobConf, TaskSplitIndex, TaskUmbilicalProtocol, TaskReporter) line: 767{     output.close(mapperContext);}//file: gora-core-0.2.1/org/apache/gora/mapreduce/GoraRecordWriter.java//GoraRecordWriter<K,T>.close(TaskAttemptContext) line: 55    {    store.close();}

下面是SqlStore.close()内调用的flush()方法:
[cpp] view plain copy print?
  1. //file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java  
  2. //SqlStore<K,T>.flush() line: 342  
  3. {  
  4.     connection.commit();  
  5. }   
//file: gora-sql-0.1.1-incubating/org/apache/gora/sql/store/SqlStore.java//SqlStore<K,T>.flush() line: 342{    connection.commit();} 

至此,webpage被写入Mysql数据库 (底层是调用jdbc)


Nutch2 WebPage写入数据库的过程分析

标签:fetch   prot   time   odi   splay   trie   sync   ati   float   

人气教程排行