python实用代码片段收集贴
python实用代码片段收集贴
发布时间:2015-06-06 来源:查字典编辑
摘要:这篇文章主要介绍了python实用代码片段收集贴,本文收集了如获取一个类的所有子类、计算运行时间、SQLAlchemy简单使用、实现类似Ja...

这篇文章主要介绍了python实用代码片段收集贴,本文收集了如获取一个类的所有子类、计算运行时间、SQLAlchemy简单使用、实现类似Java或C中的枚举等实用功能代码,需要的朋友可以参考下

获取一个类的所有子类

复制代码 代码如下:

def itersubclasses(cls, _seen=None):

"""Generator over all subclasses of a given class in depth first order."""

if not isinstance(cls, type):

raise TypeError(_('itersubclasses must be called with '

'new-style classes, not %.100r') % cls)

_seen = _seen or set()

try:

subs = cls.__subclasses__()

except TypeError: # fails only when cls is type

subs = cls.__subclasses__(cls)

for sub in subs:

if sub not in _seen:

_seen.add(sub)

yield sub

for sub in itersubclasses(sub, _seen):

yield sub

简单的线程配合

复制代码 代码如下:

import threading

is_done = threading.Event()

consumer = threading.Thread(

target=self.consume_results,

args=(key, self.task, runner.result_queue, is_done))

consumer.start()

self.duration = runner.run(

name, kw.get("context", {}), kw.get("args", {}))

is_done.set()

consumer.join() #主线程堵塞,直到consumer运行结束

多说一点,threading.Event()也可以被替换为threading.Condition(),condition有notify(), wait(), notifyAll()。解释如下:

复制代码 代码如下:

The wait() method releases the lock, and then blocks until it is awakened by a notify() or notifyAll() call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.

The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notifyAll() method wakes up all threads waiting for the condition variable.

Note: the notify() and notifyAll() methods don't release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.

复制代码 代码如下:

# Consume one item

cv.acquire()

while not an_item_is_available():

cv.wait()

get_an_available_item()

cv.release()

# Produce one item

cv.acquire()

make_an_item_available()

cv.notify()

cv.release()

计算运行时间

复制代码 代码如下:

class Timer(object):

def __enter__(self):

self.error = None

self.start = time.time()

return self

def __exit__(self, type, value, tb):

self.finish = time.time()

if type:

self.error = (type, value, tb)

def duration(self):

return self.finish - self.start

with Timer() as timer:

func()

return timer.duration()

元类

__new__()方法接收到的参数依次是:

当前准备创建的类的对象;

类的名字;

类继承的父类集合;

类的方法集合;

复制代码 代码如下:

class ModelMetaclass(type):

def __new__(cls, name, bases, attrs):

if name=='Model':

return type.__new__(cls, name, bases, attrs)

mappings = dict()

for k, v in attrs.iteritems():

if isinstance(v, Field):

print('Found mapping: %s==>%s' % (k, v))

mappings[k] = v

for k in mappings.iterkeys():

attrs.pop(k)

attrs['__table__'] = name # 假设表名和类名一致

attrs['__mappings__'] = mappings # 保存属性和列的映射关系

return type.__new__(cls, name, bases, attrs)

class Model(dict):

__metaclass__ = ModelMetaclass

def __init__(self, **kw):

super(Model, self).__init__(**kw)

def __getattr__(self, key):

try:

return self[key]

except KeyError:

raise AttributeError(r"'Model' object has no attribute '%s'" % key)

def __setattr__(self, key, value):

self[key] = value

def save(self):

fields = []

params = []

args = []

for k, v in self.__mappings__.iteritems():

fields.append(v.name)

params.append('?')

args.append(getattr(self, k, None))

sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))

print('SQL: %s' % sql)

print('ARGS: %s' % str(args))

class Field(object):

def __init__(self, name, column_type):

self.name = name

self.column_type = column_type

def __str__(self):

return '<%s:%s>

推荐文章
猜你喜欢
附近的人在看
推荐阅读
拓展阅读
相关阅读
网友关注
最新编程语言综合学习
热门编程语言综合学习
编程开发子分类