当前位置:Gxlcms > 数据库问题 > Java 线程池 +生产者消费者+MySQL读取300 万条数据

Java 线程池 +生产者消费者+MySQL读取300 万条数据

时间:2021-07-01 10:21:17 帮助过:16人阅读

* 线程启动 */ public void update() {
//redis操作类 HashRedisUtil redisUtil
= HashRedisUtil.getInstance(); //生产者消费者 ProducerConsumer pc = new ProducerConsumer(); //数据仓库 Storage s = pc.new Storage(); ExecutorService service = Executors.newCachedThreadPool(); //一个线程进行查询 Producer p = pc.new Producer(s,userMapper); service.submit(p); System.err.println("生产线程正在生产中。。。。。。。。。"); //是个线程进行修改 for(int i=0;i<10;i++){ System.err.println("消费线程"+i+"正在消费中。。。。。。。。。。"); service.submit(pc.new Consumer( redisUtil,userMapper,s)); } }

1.4.2 主要核心类

  1. <span style="color: #0000ff">package</span><span style="color: #000000"> com.ypp.thread;
  2. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.math.BigDecimal;
  3. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.Calendar;
  4. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.HashMap;
  5. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.List;
  6. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.Map;
  7. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.Set;
  8. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.concurrent.BlockingQueue;
  9. </span><span style="color: #0000ff">import</span><span style="color: #000000"> java.util.concurrent.LinkedBlockingQueue;
  10. </span><span style="color: #0000ff">import</span><span style="color: #000000"> org.apache.commons.lang.StringUtils;
  11. </span><span style="color: #0000ff">import</span><span style="color: #000000"> org.apache.log4j.Logger;
  12. </span><span style="color: #0000ff">import</span><span style="color: #000000"> org.joda.time.LocalDateTime;
  13. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.alibaba.fastjson.JSONObject;
  14. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.constants.Constants;
  15. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.mapper.UserMapper;
  16. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.model.User;
  17. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.model.UserAlis;
  18. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.model.UserBaseModel;
  19. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.model.UserVip;
  20. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.util.HashRedisUtil;
  21. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.util.JsonUtils;
  22. </span><span style="color: #0000ff">import</span><span style="color: #000000"> com.ypp.util.PHPSerializer;
  23. </span>
  24. <span style="color: #0000ff">public</span> <span style="color: #0000ff">class</span><span style="color: #000000"> ProducerConsumer {
  25. </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> Logger logger = Logger.getLogger(ProducerConsumer.<span style="color: #0000ff">class</span><span style="color: #000000">);<br> //这个page 是核心, 全局变量, 当生产者生产一次 ,获取200 个用户, 会把这个page++, 下次获取就是后一个200 条用户了
  26. </span><span style="color: #0000ff">private</span> <span style="color: #0000ff">static</span> Integer page = 0<span style="color: #000000">;
  27. //消费者<br>
  28. </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">class</span> Consumer <span style="color: #0000ff">implements</span><span style="color: #000000"> Runnable {
  29. </span><span style="color: #0000ff">private</span><span style="color: #000000"> HashRedisUtil redisUtil;
  30. </span><span style="color: #0000ff">private</span><span style="color: #000000"> UserMapper userMapper;
  31. </span><span style="color: #0000ff">private</span> Storage s = <span style="color: #0000ff">null</span><span style="color: #000000">;
  32. </span><span style="color: #0000ff">public</span><span style="color: #000000"> Consumer(HashRedisUtil redisUtil, UserMapper userMapper, Storage s) {
  33. </span><span style="color: #0000ff">super</span><span style="color: #000000">();
  34. </span><span style="color: #0000ff">this</span>.redisUtil =<span style="color: #000000"> redisUtil;
  35. </span><span style="color: #0000ff">this</span>.userMapper =<span style="color: #000000"> userMapper;
  36. </span><span style="color: #0000ff">this</span>.s =<span style="color: #000000"> s;
  37. }
  38. </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> run() {
  39. </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
  40. </span><span style="color: #0000ff">while</span> (<span style="color: #0000ff">true</span><span style="color: #000000">) {
  41. User users </span>=<span style="color: #000000"> s.pop();
  42. </span><span style="color: #0000ff">long</span> bbb =<span style="color: #000000"> System.currentTimeMillis();
  43. </span><span style="color: #008000">//</span><span style="color: #008000"> 获取一个用户的粉丝列表 并存到redis</span>
  44. <span style="color: #0000ff">try</span><span style="color: #000000"> {
  45. fansUpdate(users.getToken(), users.getUserId(), redisUtil);
  46. } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e1) {
  47. e1.printStackTrace();
  48. }
  49. </span><span style="color: #008000">//</span><span style="color: #008000"> 获取一个用户的关注列表, 并存到redis</span>
  50. <span style="color: #0000ff">try</span><span style="color: #000000"> {
  51. followUpdate(users.getToken(), users.getUserId(), redisUtil);
  52. } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) {
  53. e.printStackTrace();
  54. }
  55. </span><span style="color: #008000">//</span><span style="color: #008000"> 获取一个用户的黑名单, 并存到redis</span>
  56. <span style="color: #0000ff">try</span><span style="color: #000000"> {
  57. blackUpdate(users.getToken(), users.getUserId(), redisUtil);
  58. } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) {
  59. e.printStackTrace();
  60. }
  61. </span><span style="color: #008000">//</span><span style="color: #008000"> 用户基本信息</span>
  62. <span style="color: #0000ff">try</span><span style="color: #000000"> {
  63. userbaseUpdate(users.getToken(), users.getUserId(), redisUtil);
  64. } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) {
  65. e.printStackTrace();
  66. }
  67. </span><span style="color: #0000ff">long</span> ccc =<span style="color: #000000"> System.currentTimeMillis();
  68. System.out.println(</span>"用户:" + users.getToken() + " 全部总共耗时:" + (ccc - bbb) + "毫秒"<span style="color: #000000">);
  69. Thread.sleep(</span>500<span style="color: #000000">);
  70. }
  71. } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (InterruptedException e) {
  72. e.printStackTrace();
  73. }
  74. }
  75. </span><span style="color: #0000ff">public</span> List<User><span style="color: #000000"> getUserInfo(Integer iThread) {
  76. </span><span style="color: #0000ff">return</span> userMapper.findUserInfo((iThread - 1) * 200 + 1<span style="color: #000000">);
  77. }
  78. </span><span style="color: #008000">/**</span><span style="color: #008000">
  79. * 用户基本信息修改
  80. *
  81. * </span><span style="color: #808080">@param</span><span style="color: #008000"> token
  82. * </span><span style="color: #808080">@param</span><span style="color: #008000"> myuserId
  83. * </span><span style="color: #808080">@param</span><span style="color: #008000"> redisUtil
  84. * </span><span style="color: #808080">@throws</span><span style="color: #008000"> Exception
  85. </span><span style="color: #008000">*/</span>
  86. <span style="color: #0000ff">private</span> <span style="color: #0000ff">void</span> userbaseUpdate(String token, String myUserId, HashRedisUtil redisUtil) <span style="color: #0000ff">throws</span><span style="color: #000000"> Exception {
  87. </span>
  88. <span style="color: #000000">
  89. }
  90. </span><span style="color: #008000">/**</span><span style="color: #008000">
  91. * 更新一个用户的黑名单(原来的token改成userID)
  92. *
  93. * </span><span style="color: #808080">@param</span><span style="color: #008000"> token
  94. * </span><span style="color: #808080">@param</span><span style="color: #008000"> string
  95. * </span><span style="color: #808080">@param</span><span style="color: #008000"> redisUtil
  96. * </span><span style="color: #808080">@throws</span><span style="color: #008000"> Exception
  97. </span><span style="color: #008000">*/</span>
  98. <span style="color: #0000ff">private</span> <span style="color: #0000ff">void</span> blackUpdate(String token, String myUserId, HashRedisUtil redisUtil) <span style="color: #0000ff">throws</span><span style="color: #000000"> Exception {
  99. </span><span style="color: #000000">
  100. }
  101. </span><span style="color: #008000">/**</span><span style="color: #008000">
  102. * 获取一个用户的关注
  103. *
  104. * </span><span style="color: #808080">@param</span><span style="color: #008000"> token
  105. * </span><span style="color: #808080">@param</span><span style="color: #008000"> string
  106. * </span><span style="color: #808080">@param</span><span style="color: #008000"> redisUtil
  107. * </span><span style="color: #808080">@throws</span><span style="color: #008000"> Exception
  108. </span><span style="color: #008000">*/</span>
  109. <span style="color: #0000ff">private</span> <span style="color: #0000ff">void</span> followUpdate(String token, String myUserId, HashRedisUtil redisUtil) <span style="color: #0000ff">throws</span><span style="color: #000000"> Exception {
  110. </span><span style="color: #000000">
  111. }
  112. </span><span style="color: #008000">/**</span><span style="color: #008000">
  113. * 获取一个用户的粉丝列表
  114. *
  115. * </span><span style="color: #808080">@param</span><span style="color: #008000"> token
  116. * </span><span style="color: #808080">@param</span><span style="color: #008000"> userId
  117. * </span><span style="color: #808080">@param</span><span style="color: #008000"> redisUtil
  118. * </span><span style="color: #808080">@throws</span><span style="color: #008000"> Exception
  119. </span><span style="color: #008000">*/</span>
  120. <span style="color: #0000ff">private</span> <span style="color: #0000ff">void</span> fansUpdate(String token, String myUserId, HashRedisUtil redisUtil) <span style="color: #0000ff">throws</span><span style="color: #000000"> Exception {</span><span style="color: #000000">
  121. }
  122. </span><span style="color: #000000">
  123. //生产者
  124. </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">class</span> Producer <span style="color: #0000ff">implements</span><span style="color: #000000"> Runnable {
  125. </span><span style="color: #0000ff">private</span> Storage s = <span style="color: #0000ff">null</span><span style="color: #000000">;
  126. </span><span style="color: #0000ff">private</span><span style="color: #000000"> UserMapper mapper ;
  127. </span><span style="color: #0000ff">public</span><span style="color: #000000"> Producer( Storage s, UserMapper mapper) {
  128. </span><span style="color: #0000ff">this</span>.s =<span style="color: #000000"> s;
  129. </span><span style="color: #0000ff">this</span>.mapper =<span style="color: #000000"> mapper;
  130. }
  131. </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> run() {
  132. </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
  133. </span><span style="color: #0000ff">while</span> (<span style="color: #0000ff">true</span><span style="color: #000000">) {
  134. System.err.println(</span>"当前分页是:"+page+"****************************************"<span style="color: #000000">);
  135. List</span><User> list=<span style="color: #000000"> mapper.findUserInfo(page);
  136. s.push(list);
  137. page</span>++<span style="color: #000000">;
  138. }
  139. } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (InterruptedException e1) {
  140. e1.printStackTrace();
  141. }
  142. }
  143. }
  144. <br>//数据仓库
  145. </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">class</span><span style="color: #000000"> Storage {
  146. BlockingQueue</span><User> queues = <span style="color: #0000ff">new</span> LinkedBlockingQueue<User>(200<span style="color: #000000">);
  147. </span><span style="color: #008000">/**</span><span style="color: #008000">
  148. * 生产
  149. *
  150. * </span><span style="color: #808080">@param</span><span style="color: #008000"> p
  151. * 产品
  152. * </span><span style="color: #808080">@throws</span><span style="color: #008000"> InterruptedException
  153. </span><span style="color: #008000">*/</span>
  154. <span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span> push(List<User> p) <span style="color: #0000ff">throws</span><span style="color: #000000"> InterruptedException {
  155. </span><span style="color: #0000ff">for</span><span style="color: #000000">(User user:p){
  156. queues.put(user);
  157. }
  158. }
  159. </span><span style="color: #008000">/**</span><span style="color: #008000">
  160. * 消费
  161. *
  162. * </span><span style="color: #808080">@return</span><span style="color: #008000"> 产品
  163. * </span><span style="color: #808080">@throws</span><span style="color: #008000"> InterruptedException
  164. </span><span style="color: #008000">*/</span>
  165. <span style="color: #0000ff">public</span> User pop() <span style="color: #0000ff">throws</span><span style="color: #000000"> InterruptedException {
  166. </span><span style="color: #0000ff">return</span><span style="color: #000000"> queues.take();
  167. }
  168. }
  169. }</span>

 

Java 线程池 +生产者消费者+MySQL读取300 万条数据

标签:用户   ase   mode   cache   代码   log   shm   nbsp   inf   

人气教程排行