sqlachmeyをmultiprocessingで使う

マルチコア環境で、バッチ処理を早くするため、sqlaclhemyでmultiprocessingを行いたい。

実験環境

  1. ubuntu9.04
  2. python2.6.2
  3. sqlalchemy 0.55

ポイント

  1. sqlalcemyは、Sessionではなく、scoped_sessionを使う
  2. multiprocessingでは、Lockを使う。

良く分からない。

  1. sqlalchemyのsession,scoped_sessionの区別が良く分からない。

ソースコード

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import *
from sqlalchemy import *
from sqlalchemy.orm import *
import time
##engineとmetadata作る
def mkMeta(url,echo=False):
    engine=create_engine(url,echo=echo)
    return MetaData(bind=engine),engine
#table_test テーブルmapping用の、TableTestオブジェクト
class TableTest(object):
    def __init__(self,name):
        self.name=name
#テーブルテスト定義,実際には、nameしか使ってない。
def table_def(meta):
    table_test=Table('table_test',meta,
                     Column('id', Integer, primary_key=True),
                     Column('name', String(40), nullable=False),
                     Column('created_at', DateTime, nullable=False, default=func.now()),
                     Column('updated_at', DateTime, nullable=False, default=func.now(), onupdate=func.now()),
                     )
    #マッピングします。
    mapper(TableTest,table_test)
    if not table_test.exists():
        table_test.create()

def mksession(engine):
    Session=sessionmaker(bind=engine)
    #Sessionを作ったあと、scoped_sessionを作る。
    return scoped_session(Session)
    #return Session

db_uri='mysql://root@localhost/ss_test'
def createData(sess,lock,pid,offset=0,limit=0):
    '''
Processで実行される関数
offsetとlimit,session,lockを受け取る。
sess,lockは事前に作ってある。 
最初に検索して、無かったらレコードを追加する処理を書いた。
    '''
    t1=time.time()
    print 'start pid=%d sess=%s' % (pid,str(sess))
    for i in range(offset,limit):
        nm='pid=%d,i=%d' % (pid,i)
        lock.acquire()
        ql=sess.query(TableTest).filter(TableTest.name==nm)
        c=ql.count()
        lock.release()
        if c==0:
            t=TableTest(nm)
            lock.acquire()
            sess.add(t)
            lock.release()

    t1=time.time()-t1
    print 'end pid=%d time=%d ms' % (pid,t1*1000)
def main():
    t1=time.time()
    m,e=mkMeta(db_uri,echo=False)
    table_def(m)
    sz=3000
    Session=mksession(e)
    sess=Session()
    lock=Lock()
    pl=[]
    #プロセスを生成、事前に生成した、scopped_sessionとlockを引き渡す。
    for i in range(10):
        p=Process(target=createData,args=(sess,lock,i,i*sz,(i+1)*sz))
        p.start()
        pl.append(p)
    #生成したプロセスが終わるのを待ちます。
    for p in pl:
        print 'pre join',p,p.is_alive()
        p.join()
        print 'post join',p,p.is_alive()
    #いろいろと終わったので、処理します。
    print 'sess start'
    sess.flush()
    print 'done sess flush'
    sess.commit()
    print 'done sess commit'
    sess.close()
    print 'done sess close'
    t1=time.time()-t1
    print 'time=%d ms'  % (t1*1000)
if __name__=='__main__':main()

トップ   差分 バックアップ リロード   一覧 単語検索 最終更新   ヘルプ   最終更新のRSS
Last-modified: 2015-02-01 (日) 14:38:23 (933d)