当前位置:Gxlcms > Python > Python使用multiprocessing实现一个最简单的分布式作业调度系统

Python使用multiprocessing实现一个最简单的分布式作业调度系统

时间:2021-07-01 10:21:17 帮助过:57人阅读

mutilprocess像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。

介绍

Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个机器的多个进程中,依靠网络通信。

想到这,就在想是不是可以使用此模块来实现一个简单的作业调度系统。

实现

Job

首先创建一个Job类,为了测试简单,只包含一个job id属性

job.py

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. class Job:
  4. def __init__(self, job_id):
  5. self.job_id = job_id

Master

Master用来派发作业和显示运行完成的作业信息

master.py

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from Queue import Queue
  4. from multiprocessing.managers import BaseManager
  5. from job import Job

class Master:

  1. def __init__(self):
  2. # 派发出去的作业队列
  3. self.dispatched_job_queue = Queue()
  4. # 完成的作业队列
  5. self.finished_job_queue = Queue()
  6. def get_dispatched_job_queue(self):
  7. return self.dispatched_job_queue
  8. def get_finished_job_queue(self):
  9. return self.finished_job_queue
  10. def start(self):
  11. # 把派发作业队列和完成作业队列注册到网络上
  12. BaseManager.register('get_dispatched_job_queue', callable=self.get_dispatched_job_queue)
  13. BaseManager.register('get_finished_job_queue', callable=self.get_finished_job_queue)
  14. # 监听端口和启动服务
  15. manager = BaseManager(address=('0.0.0.0', 8888), authkey='jobs')
  16. manager.start()
  17. # 使用上面注册的方法获取队列
  18. dispatched_jobs = manager.get_dispatched_job_queue()
  19. finished_jobs = manager.get_finished_job_queue()
  20. # 这里一次派发10个作业,等到10个作业都运行完后,继续再派发10个作业
  21. job_id = 0
  22. while True:
  23. for i in range(0, 10):
  24. job_id = job_id + 1
  25. job = Job(job_id)
  26. print('Dispatch job: %s' % job.job_id)
  27. dispatched_jobs.put(job)
  28. while not dispatched_jobs.empty():
  29. job = finished_jobs.get(60)
  30. print('Finished Job: %s' % job.job_id)
  31. manager.shutdown()
  32. if __name__ == "__main__":
  33. master = Master()
  34. master.start()

Slave

Slave用来运行master派发的作业并将结果返回

slave.py

  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import time
  4. from Queue import Queue
  5. from multiprocessing.managers import BaseManager
  6. from job import Job

class Slave:

  1. def __init__(self):
  2. # 派发出去的作业队列
  3. self.dispatched_job_queue = Queue()
  4. # 完成的作业队列
  5. self.finished_job_queue = Queue()

def start(self):

  1. # 把派发作业队列和完成作业队列注册到网络上
  2. BaseManager.register('get_dispatched_job_queue')
  3. BaseManager.register('get_finished_job_queue')
  4. # 连接master
  5. server = '127.0.0.1'
  6. print('Connect to server %s...' % server)
  7. manager = BaseManager(address=(server, 8888), authkey='jobs')
  8. manager.connect()
  9. # 使用上面注册的方法获取队列
  10. dispatched_jobs = manager.get_dispatched_job_queue()
  11. finished_jobs = manager.get_finished_job_queue()
  12. # 运行作业并返回结果,这里只是模拟作业运行,所以返回的是接收到的作业
  13. while True:
  14. job = dispatched_jobs.get(timeout=1)
  15. print('Run job: %s ' % job.job_id)
  16. time.sleep(1)
  17. finished_jobs.put(job)
  18. if __name__ == "__main__":
  19. slave = Slave()
  20. slave.start()

测试

分别打开三个linux终端,第一个终端运行master,第二个和第三个终端用了运行slave,运行结果如下

master

  1. $ python master.py
  2. Dispatch job: 1
  3. Dispatch job: 2
  4. Dispatch job: 3
  5. Dispatch job: 4
  6. Dispatch job: 5
  7. Dispatch job: 6
  8. Dispatch job: 7
  9. Dispatch job: 8
  10. Dispatch job: 9
  11. Dispatch job: 10
  12. Finished Job: 1
  13. Finished Job: 2
  14. Finished Job: 3
  15. Finished Job: 4
  16. Finished Job: 5
  17. Finished Job: 6
  18. Finished Job: 7
  19. Finished Job: 8
  20. Finished Job: 9
  21. Dispatch job: 11
  22. Dispatch job: 12
  23. Dispatch job: 13
  24. Dispatch job: 14
  25. Dispatch job: 15
  26. Dispatch job: 16
  27. Dispatch job: 17
  28. Dispatch job: 18
  29. Dispatch job: 19
  30. Dispatch job: 20
  31. Finished Job: 10
  32. Finished Job: 11
  33. Finished Job: 12
  34. Finished Job: 13
  35. Finished Job: 14
  36. Finished Job: 15
  37. Finished Job: 16
  38. Finished Job: 17
  39. Finished Job: 18
  40. Dispatch job: 21
  41. Dispatch job: 22
  42. Dispatch job: 23
  43. Dispatch job: 24
  44. Dispatch job: 25
  45. Dispatch job: 26
  46. Dispatch job: 27
  47. Dispatch job: 28
  48. Dispatch job: 29
  49. Dispatch job: 30

slave1

  1. $ python slave.py
  2. Connect to server 127.0.0.1...
  3. Run job: 1
  4. Run job: 2
  5. Run job: 3
  6. Run job: 5
  7. Run job: 7
  8. Run job: 9
  9. Run job: 11
  10. Run job: 13
  11. Run job: 15
  12. Run job: 17
  13. Run job: 19
  14. Run job: 21
  15. Run job: 23

slave2

  1. $ python slave.py
  2. Connect to server 127.0.0.1...
  3. Run job: 4
  4. Run job: 6
  5. Run job: 8
  6. Run job: 10
  7. Run job: 12
  8. Run job: 14
  9. Run job: 16
  10. Run job: 18
  11. Run job: 20
  12. Run job: 22
  13. Run job: 24

以上内容是小编给大家介绍的Python使用multiprocessing实现一个最简单的分布式作业调度系统,希望对大家有所帮助!

人气教程排行