时间:2021-07-01 10:21:17 帮助过:82人阅读
但通过测试,发现多进程时还是容易出现重复写入文件或者打印正常漏写入文件的问题。
我的日志需求比较简单,能够区分文件,正确的写入日志文件。
引入文件锁;日志写入函数封装到一个操作_Logger类中; 日志名称和写入级别封装到一个业务类Logger中。
本范例基于python3实现。本范例20个进程并发,分别写入3个文件,每s每个文件写入超过100行数据,日志文件中没有数据冗余,也没有数据遗漏。
- # -*-coding:utf-8-*-
- """
- Author:yinshunyao
- Date:2017/3/5 0005下午 10:50
- """
- # import logging
- import os
- import time
- # 利用第三方系统锁实现文件锁定和解锁
- if os.name == 'nt':
- import win32con, win32file, pywintypes
- LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
- LOCK_SH = 0 # The default value
- LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY
- __overlapped = pywintypes.OVERLAPPED()
- def lock(file, flags):
- hfile = win32file._get_osfhandle(file.fileno())
- win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped)
- def unlock(file):
- hfile = win32file._get_osfhandle(file.fileno())
- win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)
- elif os.name == 'posix':
- from fcntl import LOCK_EX
- def lock(file, flags):
- fcntl.flock(file.fileno(), flags)
- def unlock(file):
- fcntl.flock(file.fileno(), fcntl.LOCK_UN)
- else:
- raise RuntimeError("File Locker only support NT and Posix platforms!")
- class _Logger:
- file_path = '' #初始化日志路径
- @staticmethod
- def init():
- if not _Logger.file_path:
- _Logger.file_path = '%s/Log' % os.path.abspath(os.path.dirname(__file__))
- return True
- @staticmethod
- def _write(messge, file_name):
- if not messge:
- return True
- messge = messge.replace('\t', ',')
- file = '{}/{}'.format(_Logger.file_path, file_name)
- while True:
- try:
- f = open(file, 'a+')
- lock(f, LOCK_EX)
- break
- except:
- time.sleep(0.01)
- continue
- # 确保缓冲区内容写入到文件
- while True:
- try:
- f.write(messge + '\n')
- f.flush()
- break
- except:
- time.sleep(0.01)
- continue
- while True:
- try:
- unlock(f)
- f.close()
- return True
- except:
- time.sleep(0.01)
- continue
- @staticmethod
- def write(message, file_name, only_print=False):
- if not _Logger.init(): return
- print(message)
- if not only_print:
- _Logger._write(message, file_name)
- class Logger:
- def __init__(self, logger_name, file_name=''):
- self.logger_name = logger_name
- self.file_name = file_name # 根据消息级别,自定义格式,生成消息
- def _build_message(self, message, level):
- try:
- return '[%s]\t[%5s]\t[%8s]\t%s' \
- % (time.strftime('%Y-%m-%d %H:%M:%S'), level, self.logger_name, message)
- except Exception as e:
- print('解析日志消息异常:{}'.format(e))
- return ''
- def warning(self, message):
- _Logger.write(self._build_message(message, 'WARN'), self.file_name)
- def warn(self, message):
- _Logger.write(self._build_message(message, 'WARN'), self.file_name)
- def error(self, message):
- _Logger.write(self._build_message(message, 'ERROR'), self.file_name)
- def info(self, message):
- _Logger.write(self._build_message(message, 'INFO'), self.file_name, True)
- def debug(self, message):
- _Logger.write(self._build_message(message, 'DEBUG'), self.file_name)
- # 循环打印日志测试函数
- def _print_test(count):
- logger = Logger(logger_name='test{}'.format(count), file_name='test{}'.format(count % 3))
- key = 0
- while True:
- key += 1
- # print('{}-{}'.format(logger, key))
- logger.debug('%d' % key)
- logger.error('%d' % key)
- if __name__ == '__main__':
- from multiprocessing import Pool, freeze_support
- freeze_support() # 进程池进行测试
- pool = Pool(processes=20)
- count = 0
- while count < 20:
- count += 1
- pool.apply_async(func=_print_test, args=(count,))
- else:
- pool.close()
- pool.join()
以上就是详解python日志打印和写入并发实现代码的详细内容,更多请关注Gxl网其它相关文章!