`
jandyfish
  • 浏览: 15657 次
社区版块
存档分类
最新评论

python增量计算

阅读更多
    在rsync同步数据后,因业务需要以及文件大小限制,需要对数据做增量分析,而每次都需要拿出文件的增量部分。
    linux有差异计算命令diff以及补丁工具patch,都不是很符合预期。这两种工具都是需要对两个文件进行对比,而若如此做的话,其一计算增量耗时,其二需要有一个原数据的副本文件。多余的副本文件会导致额外的存储开销,以及数据移动成本。
    又因同步过来的数据是多个application的数据,需要针对不同的app进行增量计算,提交给相应的计算任务。希望每次在rsync后直接做增量计算,这里就考虑到直接使用python脚本编写。
   
#! /usr/bin/env python
# -*- coding: utf-8 -*-

import os
import getopt
import sys
import fnmatch  

from datetime import datetime,timedelta,date

#判断是否为新一天的开始,数据文件是按日保存,凌晨定时任务需要对前日的数据进行增量计算
def is_newday_begin(app,daystr):
	file_day_file='/var/local/diff_file_storage/day_'+app
	if os.path.exists(file_day_file):
		try:
			day_file=open(file_day_file,'r')
			oldday=day_file.read()
			if oldday != daystr:
				return True
		finally:
			day_file.close()
	return False
	
def read_position(app,daystr,file):
	file_day_file='/var/local/diff_file_storage/day_'+app
	if os.path.exists(file_day_file):
		try:
			day_file=open(file_day_file,'r')
			oldday=day_file.read()
			if oldday != daystr:
				day_file.close()
				day_file=open(file_day_file,'w+')
				day_file.write(daystr)
				write_position(daystr,file,0)
		finally:
			day_file.close()
	else:
		try:
			day_file=open(file_day_file,'w+')
			day_file.write(daystr)
		finally:
			day_file.close() 
		
        
	position=None
	open_file=None
	position_file=file+'_'+daystr
	try:
		if os.path.exists(position_file):
			open_file=open(position_file,'r')
			position = long(open_file.read())
		else:
			position=0
	finally:
		if open_file is not None:
			open_file.close()
	print 'read from ',position_file
	return position

def write_position(daystr,file,position):
	open_file=None
	position_file=file+'_'+daystr
	try:
		open_file=open(position_file,'w+')
		open_file.write(str(position))
	finally:
		if open_file is not None:
			open_file.close()
	print 'write to ',position_file

def read_diff(app,day,outpath,times):
	filepath='/data/dc_files/'+app+day.strftime('/%Y/%m')
	if is_newday_begin(app,day.strftime('%Y-%m-%d')) and times <= 0:
		read_diff(app,day+timedelta(days=-1),outpath,times+1)
		print 'read_diff old day : ',day
	#get old position of file
	file_position=0
	file_position_dir='/var/local/diff_file_storage/position/'
	filenames=os.listdir(filepath)
				
	if not filepath.endswith('/'):
		filepath = filepath + '/'
	filepre = app+'_'+day.strftime('%Y-%m-%d')+'_' 
        target_file=open(outpath,'a')
	try:
		for line in filenames:
			if line.startswith(filepre):
				print 'read_diff: flush file ',line
				file_position_path=file_position_dir+app+'_'+line.replace(filepre,'')
				#read position from temp
				oldposition = read_position(app,day.strftime('%Y-%m-%d'),file_position_path)
				position=oldposition
				print 'read_diff: old position is ',position
			
				#read data,change position
				try:
					source_file=open(filepath+line,'r')
					#target_file=open(outpath,'w')
					source_file.seek(oldposition)
					for temp_line in source_file:
						target_file.write(temp_line)
						position = position+len(temp_line)
					
				finally:
					source_file.close()
					if position==oldposition:
						#os.remove(outpath)
						print outpath,' empty data'	
				#write position back to temp
				write_position(day.strftime('%Y-%m-%d'),file_position_path,position)
				print 'read_diff: new position is ',position
	finally:
		target_file.close()
    

    如上所示,是diff计算的demo代码。对每次增量计算的位移量进行保存,下次再进行增量计算时,就可以从位移量处直接读取。
    比之用diff,path少了副本相应开销,速度也可观。增加main后也可以直接在shell中调用。


    实现为module,则如下代码所示:
   
    #! /usr/bin/env python
# Filename:diffmodule.py
# -*- coding: utf-8 -*-

import os
import getopt
import sys
import fnmatch

#read diff from sfile to ofile by position_file cached
def read_diff(position_file,sfile,ofile):
        old_position=read_position(position_file)
        tmp_position=0
        source_file=None
        target_file=None
        try:
                source_file=open(sfile,'r')
                source_file.seek(old_position)
                target_file=open(ofile,'a')
                for temp_line in source_file:
                        target_file.write(temp_line)
                        tmp_position = tmp_position+len(temp_line)
        finally:
                if source_file is not None:
                        source_file.close()
                if target_file is not None:
                        target_file.close()
        if not tmp_position==0:
                write_position(position_file,tmp_position+old_position)
        return tmp_position

# read position from file
def read_position(position_file):
        position=None
        open_file=None
        try:
                if os.path.exists(position_file):
                        open_file=open(position_file,'r')
                        position = long(open_file.read())
                else:
                        position=0
        finally:
                if open_file is not None:
                        open_file.close()
        return position

#write position to file
def write_position(position_file,position):
        open_file=None
        result = False
        try:
                open_file=open(position_file,'w+')
                open_file.write(str(position))
        finally:
                if open_file is not None:
                        open_file.close()
                        result = True
        return result
    


   如上代码可供参考,在python 2.7.3下测试OK。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics