这篇文章主要介绍了python实用代码片段收集贴,本文收集了如获取一个类的所有子类、计算运行时间、SQLAlchemy简单使用、实现类似Java或C中的枚举等实用功能代码,需要的朋友可以参考下
获取一个类的所有子类
复制代码 代码如下:
def itersubclasses(cls, _seen=None):
"""Generator over all subclasses of a given class in depth first order."""
if not isinstance(cls, type):
raise TypeError(_('itersubclasses must be called with '
'new-style classes, not %.100r') % cls)
_seen = _seen or set()
try:
subs = cls.__subclasses__()
except TypeError: # fails only when cls is type
subs = cls.__subclasses__(cls)
for sub in subs:
if sub not in _seen:
_seen.add(sub)
yield sub
for sub in itersubclasses(sub, _seen):
yield sub
简单的线程配合
复制代码 代码如下:
import threading
is_done = threading.Event()
consumer = threading.Thread(
target=self.consume_results,
args=(key, self.task, runner.result_queue, is_done))
consumer.start()
self.duration = runner.run(
name, kw.get("context", {}), kw.get("args", {}))
is_done.set()
consumer.join() #主线程堵塞,直到consumer运行结束
多说一点,threading.Event()也可以被替换为threading.Condition(),condition有notify(), wait(), notifyAll()。解释如下:
复制代码 代码如下:
The wait() method releases the lock, and then blocks until it is awakened by a notify() or notifyAll() call for the same condition variable in another thread. Once awakened, it re-acquires the lock and returns. It is also possible to specify a timeout.
The notify() method wakes up one of the threads waiting for the condition variable, if any are waiting. The notifyAll() method wakes up all threads waiting for the condition variable.
Note: the notify() and notifyAll() methods don't release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notifyAll() finally relinquishes ownership of the lock.
复制代码 代码如下:
# Consume one item
cv.acquire()
while not an_item_is_available():
cv.wait()
get_an_available_item()
cv.release()
# Produce one item
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()
计算运行时间
复制代码 代码如下:
class Timer(object):
def __enter__(self):
self.error = None
self.start = time.time()
return self
def __exit__(self, type, value, tb):
self.finish = time.time()
if type:
self.error = (type, value, tb)
def duration(self):
return self.finish - self.start
with Timer() as timer:
func()
return timer.duration()
元类
__new__()方法接收到的参数依次是:
当前准备创建的类的对象;
类的名字;
类继承的父类集合;
类的方法集合;
复制代码 代码如下:
class ModelMetaclass(type):
def __new__(cls, name, bases, attrs):
if name=='Model':
return type.__new__(cls, name, bases, attrs)
mappings = dict()
for k, v in attrs.iteritems():
if isinstance(v, Field):
print('Found mapping: %s==>%s' % (k, v))
mappings[k] = v
for k in mappings.iterkeys():
attrs.pop(k)
attrs['__table__'] = name # 假设表名和类名一致
attrs['__mappings__'] = mappings # 保存属性和列的映射关系
return type.__new__(cls, name, bases, attrs)
class Model(dict):
__metaclass__ = ModelMetaclass
def __init__(self, **kw):
super(Model, self).__init__(**kw)
def __getattr__(self, key):
try:
return self[key]
except KeyError:
raise AttributeError(r"'Model' object has no attribute '%s'" % key)
def __setattr__(self, key, value):
self[key] = value
def save(self):
fields = []
params = []
args = []
for k, v in self.__mappings__.iteritems():
fields.append(v.name)
params.append('?')
args.append(getattr(self, k, None))
sql = 'insert into %s (%s) values (%s)' % (self.__table__, ','.join(fields), ','.join(params))
print('SQL: %s' % sql)
print('ARGS: %s' % str(args))
class Field(object):
def __init__(self, name, column_type):
self.name = name
self.column_type = column_type
def __str__(self):
return '<%s:%s>