上次提到过Ubuntu上Coreseek+php的安装的安装一文,我个人建议Coreseek最好采用Python作为数据源,相对灵活性很大。这次我就分享一下我写的一个CoreSeek的Python数据源基类。
这个基类的优势在于特别是对于“分库分表”的MySQL来说,支持直接多进程并发读库,性能超强。而且对于Python2.6以下不具有多进程特性的用户来说,这个基类支持通过线程来模拟进程,完全透明!
该库已经在生产环境中使用。
需要MySQLdb类包
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# abs class for CoreSeek indexer data source
#
#   By Litrin J. 2011-07
#
from DBConfig import DBConfig
import MySQLdb
import datetime, time, types
try:
    from multiprocessing import Process, Queue
except:
    from threading import Thread as Process
    from Queue import Queue
class CoreSeek(object):
    '''''
    Abs class for CoreSeek data source.
    '''
    DBName = ""
    #field list, you can use "as" method, just like 'id as uid' to rename the `id` field to uid
    Field = []
    WhereCause = "TRUE"
    #the uniq id in the row for 1 table
    UniqId = None
    Scheme = []
    FieldOrder = []
    SQLGroupBy = ""
    #if the table to long, use it!
    SQLLimit = 0
    #Debug switch
    Debug = False
    MaxProcess = 2
    #private var
    __data = []
    __curItem = []
    __uniqNumber = 0
    __tableList = []
    __processPool = []
    def __init__(self, conf):
        if self.__class__ == CoreSeek:
            raise NotImplementedError, "Cannot create object of class CoreSeek!"
        self.conf = conf
    def __del__(self):
        pass
    def __getattr__(self, key):
        if self.UniqId is None and 'id' == key:
            return self.__uniqNumber
        else:
            return self.__curItem[key]
    def __iter__(self):
        while self.NextDocument() :
            yield self.__curItem
    def __str__(self):
        return str(self.__curItem)
    def GetScheme(self):
        return self.Scheme
    def GetFieldOrder(self):
        return self.FieldOrder
    def Connected(self):
        tableList = DBConfig().getDBTableConfig(self.DBName)
        self.__tableList = Queue(len(tableList))
        for dConfig in tableList:
            self.__tableList.put(dConfig)
        self.__data = Queue()
        if (len(tableList) < self.MaxProcess):
            self.MaxProcess = len(tableList)
        for i in range (0, self.MaxProcess):
            process = Process(target=self.getTableData).run()
            self.__processPool.append(process)
    def NextDocument(self, err=None):
        if (  self.__data.empty() == False ):
            self.__curItem = self.__data.get()
            self.__uniqNumber += 1
            if (self.Debug):
                print self
            return True
        iProcessRuning = 0
        for process in self.__processPool:
            if process is not None and process.is_alive():
                iProcessRuning += 1
        if (iProcessRuning > 0):
            return self.NextDocument()
        else:
            del self.__tableList
            del self.__data
            time.sleep(0.01)
            return False
    def getTableData(self):
        if (self.__tableList.empty() ):
            return False
        dConfig = self.__tableList.get()
        sSQL = self.getSQL(dConfig["tableName"])
        iCountPreLoad = self.SQLLimit
        iRecordLoaded = 0
        if (iCountPreLoad == 0):
            iRecordLoaded = self.doLoadData(dConfig, sSQL)
        else:
            iStep         = 0
            iRecordCount  = iCountPreLoad
            while (iCountPreLoad == iRecordCount):
                sLimit = " LIMIT " + str(iStep * iCountPreLoad ) + ", " + str(iCountPreLoad)
                sLimitSQL = sSQL + sLimit
                iRecordCount = self.doLoadData(dConfig, sLimitSQL)
                if (iRecordCount < iCountPreLoad):
                    break
                iRecordLoaded += iRecordCount
                iStep += 1
        self.getTableData()
        return True
    def doLoadData(self, dConfig, sSQL):
        tableNumber = int(dConfig["tableName"][-2:], 16)
        mysqlHandle = MySQLdb.connect(host=dConfig["host"], user=dConfig["user"],passwd=dConfig["passwd"], db=dConfig["dbName"], charset="UTF8")
        mysqlCursor = mysqlHandle.cursor()
        if (self.Debug):
            print "SQL: " + sSQL
        try:
            mysqlCursor.execute(sSQL)
            lResultList = mysqlCursor.fetchall()
            iRecordCount = len(lResultList)
        except:
            return 0
        for lRow in lResultList:
            dRow = self.getNamedDict(lRow)
            if self.UniqId is not None:
                dRow["tablerecord"] = [tableNumber, dRow[self.UniqId.lower()]]
                dRow["id"] = int (dRow[self.UniqId.lower()]) * 1000 + tableNumber
            dRow = self.buildIndex(dRow)
            self.__data.put(dRow)
        mysqlHandle.close()
        return iRecordCount
    def getSQL(self, sTableName):
        SQL = "SELECT %s FROM %s WHERE %s " % (", ".join(self.Field), sTableName, self.WhereCause)
        if(self.SQLGroupBy != ""):
            SQL += "GROUP BY %s " % self.SQLGroupBy
        return SQL
    def getNamedDict(self, lRow):
        i = 0
        result = {}
        for sFieldName in self.Field:
            sDictKey = sFieldName.lower()
            if (sDictKey.find(" as ") > 0):
                sDictKey = sDictKey[sDictKey.find(' as ')+4: ]
            if (isinstance(lRow[i], types.StringTypes)):
                result[sDictKey] = lRow[i].encode("utf-8")
            elif (type(lRow[i]) == type(datetime.datetime.today())):
                result[sDictKey] = int(time.mktime(lRow[i].timetuple()))
            else:
                result[sDictKey] = lRow[i]
            i+=1
        return result
    def buildIndex(self, *dRow):
        if(self.Debug):
            print dRow
        return dRow
class CoreSeekUtility:
    @staticmethod
    def literallyCut(string, sChar=" ", CharSet="utf-8"):
        uString = string.decode(CharSet)
        iLength = len(uString)
        if (iLength <= 1):
            return string
        lString = [uString[i:i+1] for i in range(0, iLength)]
        sCut = sChar.join(lString)
        return sCut.encode(CharSet)
    @staticmethod
    def dictIndex(dRecord):
        result = ""
        for key in dRecord:
            key = key.upper()
            value = dRecord[key]
            result += key + "=" + str(value) + " "
        return result
数据源示例:
from CoreSeek import CoreSeek, CoreSeekUtility
import time
class Topic(CoreSeek):
    Scheme = [
              ('id' , {'docid' : True ,} ),
              ('index', { 'type' : 'text',} ),
              ('index_uid', { 'type' : 'text',} ),
              ('topicid', { 'type' : 'integer'} ),
              ('type', { 'type' : 'integer'} ),
              ('privacy', { 'type' : 'integer'} ),
              ('body', { 'type' : 'string'} ),
              ('title', { 'type' : 'string'} ),
              ('uid', { 'type' : 'string'} ),
              ('description', { 'type' : 'string'} )
          ]
    FieldOrder = [('index', 'index_uid')]
    Field = ["topicId", "uid", "title", 'body', 'privacy', 'description', 'type']
    DBName = "Topic"
    def buildIndex(self, dRow):
        dRow['index_uid'] = dRow['uid']
        dRow['index'] = "%s %s" % (dRow['title'], dRow['description'])
        return dRow
class TopicDelta(Topic):
    WhereCause = "createTime > %s " % (time.time() - 3600)
if __name__ == "__main__":
    conf = {}
    source = Topic(conf)
    source.Connected()
    while source.NextDocument():
        print source
转载请注明:爱开源 » CoreSeek Python数据源的基类