说明
有时候有一些东西需要迭代处理(比如临时需要采集一些数据),有时候中间会失败,就需要创建一个可持久化的索引。
因此简单的写入到 txt 文件进行存储。好几次之后,经常需要很重复写一些代码,变得很不好看。
所以花了一点点时间抽出来这些重复代码。
Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
|
import os.path
class Pointer:
def __init__(self, file_path: str, pad: int = 20, only_item: bool = False): """ 创建一个可持久化的记录可迭代对象的索引的文件, 从第0行开始,每次返回一个索引对应的item,达到步长,就会写入磁盘持久化索引。
【存储的是上一次返回的索引】
:param file_path: 需要操作的索引文件路径 :param pad: 步长,每过几个索引则写入磁盘 :param only_item: 迭代时,只返回item,不返回index """ self.file_path = file_path self.pad = pad self.only_item = only_item
def read(self) -> int: if os.path.exists(self.file_path): with open(self.file_path, encoding="utf-8", mode="rt") as f: return int(f.read()) else: return -1
def save(self, pointer_num: int): with open(self.file_path, encoding="utf-8", mode="wt") as f: f.write(str(pointer_num)) return pointer_num
def iter(self, iterable): """ 迭代一个可迭代对象,会从已存储索引+1开始迭代。
:param iterable: 一个可迭代对象 """ wrote_index = self.read() continue_index = wrote_index + 1 should_query_flag = False
index = 0 for index, item in enumerate(iterable): if not should_query_flag: if index < continue_index: continue else: should_query_flag = True
if (index + 1) % self.pad == 0: wrote_index = self.save(index) if self.only_item: yield item else: yield index, item
if wrote_index != index: self.save(index)
if __name__ == '__main__': pointer = Pointer("./test_pointer.txt") for i in pointer.iter(range(105)): print(i)
|
使用示例:
1 2 3 4 5 6
| df = pd.read_csv(input_txt) pinter = Pointer(pointer_txt, only_item=True)
logger.info(f"程序启动") for index, row in pinter.iter(df.iterrows()): logger.info(f"读取第几行:{index + 1}")
|
这样就可以实现,程序中断后,再次运行可以继续迭代执行了,不再需要管内部逻辑。
通过设置 pad
步长参数可以减少写入磁盘的次数,但是如果在步长期间发生异常,那么就会丢失期间的已使用索引。
可以自行捕获异常,使用 pointer.save(num)
保存当时已使用的索引。或者直接把步长设为 1,每迭代一个都写入磁盘。