记得在zipsite项目中,对于文本文件本身就没有序列化的问题,但对于富媒体文件的序列化,我采用的方法是pickle。后来我忽然觉得使用pickle是可以实现简单的分布任务处理的。
废话不表,上代码:
首先是一个“有限事务机”,讲白一点就是所有的任务都必须通过executeNextStep()方法按step1,step2, finish的顺序执行。但增加了两个魔术方法,稍后再说。
process.py
import pickle
class process:
_nextStep = None
_lock = False
name = None
def __init__(self, name):
self.name = name
self.setNextStep('step 1')
def setNextStep(self, nextStep):
if self._lock : return
self._lock = True
if nextStep == 'step 1' : self._nextStepHandle = self.step1
elif nextStep == 'step 2' : self._nextStepHandle = self.step2
elif nextStep == 'finish' : self._nextStepHandle = self.finish
else:
self._nextStepHandle = None
return
self._nextStep = nextStep
self._lock = False
def step1(self):
print "Processing step 1, %s " % self.name
self.setNextStep('step 2')
def step2(self):
print "Processing step 2, %s " % self.name
self.setNextStep('finish')
def finish(self):
print "Processing finished"
self.setNextStep(None)
def executeNextStep(self, *arge, **args):
if self._nextStepHandle is not None:
self._nextStepHandle(*arge, **args)
def __getstate__(self):
return self._nextStep
def __setstate__(self, state):
self.setNextStep(state)
if __name__ == '__main__':
p = process('Litrin')
p.executeNextStep()
p.executeNextStep()
p.executeNextStep()
执行结果为:
Processing step 1, Litrin
Processing step 2, Litrin
Processing finished
服务器端,Server.py
# Echo server program
import socket
import sys
import pickle
from process import process
HOST = None # Symbolic name meaning all available interfaces
PORT = 50007 # Arbitrary non-privileged port
s = None
for res in socket.getaddrinfo(HOST, PORT, socket.AF_UNSPEC,
socket.SOCK_STREAM, 0, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
try:
s = socket.socket(af, socktype, proto)
except socket.error as msg:
s = None
continue
try:
s.bind(sa)
s.listen(1)
except socket.error as msg:
s.close()
s = None
continue
break
if s is None:
print 'could not open socket'
sys.exit(1)
while True:
conn, addr = s.accept()
while 1:
data = conn.recv(64)
if not data: break
Object = pickle.loads(data)
Object.executeNextStep()
conn.send(pickle.dumps(Object))
conn.close()
客户端,client.py
import socket
import time
import pickle
from process import process
class Client(object):
handle = None
host = '127.0.0.1'
port = 50007
def __init__(self):
if self.handle is None:
self.connect()
def __del__(self):
if self.handle is not None:
self.close()
def close(self):
self.handle.close()
def connect(self):
address = (self.host, self.port)
self.handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#try:
self.handle.connect(address)
#except:
# self.error("can't connect to %s" % self.hostName)
# exit()
def send(self):
p = process('litrin')
p.executeNextStep()
tcpMessage = pickle.dumps(p)
self.handle.send(tcpMessage)
tcpRespones = ''
while True:
d = self.handle.recv(16)
tcpRespones += d
if len(d) < 16:
break
# except:
# self.error("can't get date from %s" % self.hostName)
# return False
self.respones = pickle.loads(tcpRespones)
self.respones.executeNextStep()
self.respones.executeNextStep()
if __name__ == '__main__':
a = Client()
a.connect()
a.send()
a.close()
执行之后服务器端后,再执行客户端显示:
Processing step 1, Litrin
Processing finished
而服务器端显示:
Processing step 2, Litrin
这也就意味着process对象在两台主机上完成了分布事务管理。
pickle是python提供的一个对象序列化工具,简单的来说就是会将一个对象用字符序列化以方便保存和传输。pickle依赖两个魔术方法:
- __getstate__(self):返回对象的当前状态描述
- __setstate__(self, state): 将状态导回到对象中
理论上说,并不一定需要服务器端的代码和客户端的代码保持一致就可以实现分布式处理,感觉对于很多需要加密验证的方法上这一招很有效。
以上仅仅只是个Demo,现实中由于网络的传输存在不确定因素,对传输的文本进行校验回调是需要考虑的,同时由于pickle的方式往往会依次向父类递归,状况也会复杂很多。
转载请注明:爱开源 » Python实现简单分布式处理