I have always believed that programming skills are not essential. Before starting to type code, one should find the correct method. As long as the method is correct, the code can always be written. The specific steps implemented in different languages may vary slightly, but the underlying method is consistent.
The first pure Python version was just for learning and research purposes and has been deprecated. The version using SQLite3 is still being used and so far no issues have been found, so there haven't been any improvements made.
#FileDict.py
import os
import bisect
import numpy as np
import struct
class FileDict:
def __init__(self, file_path):
self.data_file = file_path + '.data'
self.index_file = file_path + '.index'
self.ram_index = []
if not os.path.exists(self.data_file):
open(self.data_file, 'wb').close()
if not os.path.exists(self.index_file):
open(self.index_file, 'wb').close()
self.load_index(self.index_file)
def load_index(self, index_file):
if os.path.getsize(index_file) > 0:
with open(index_file, 'rb') as f:
self.ram_index = np.fromfile(f, dtype=np.int64).reshape(-1, 2).tolist()
self.ram_index.sort()
def __getitem__(self, key):
h_key = hash(key)
i = bisect.bisect_left(self.ram_index, [h_key, 0])
if i != len(self.ram_index) and self.ram_index[i][0] == h_key:
with open(self.data_file, 'rb') as f:
f.seek(self.ram_index[i][1])
flag, stored_key, value = self.read_record(f)
if flag != b'D' and stored_key == key:
return value
raise KeyError(key)
def __setitem__(self, key, value):
h_key = hash(key)
i = bisect.bisect_left(self.ram_index, [h_key, 0])
if i != len(self.ram_index) and self.ram_index[i][0] == h_key:
with open(self.data_file, 'r+b') as f:
f.seek(self.ram_index[i][1])
flag, stored_key, value1 = self.read_record(f)
if flag == b'V' and stored_key == key:
f.seek(self.ram_index[i][1])
f.write(b'D')
f.truncate()
pos=self.append_record(f, key, value)
self.ram_index[i]=[h_key,pos ]
with open(self.index_file, 'ab') as f:
f.write(np.array([[h_key, self.ram_index[i][1]]], dtype=np.int64).tobytes())
elif flag != b'D' and stored_key == key and value == value1:
pass
else:
f.seek(self.ram_index[i][1])
f.truncate()
bisect.insort_left(self.ram_index, [h_key, self.append_record(f, key, value)])
with open(self.index_file, 'ab') as f:
f.write(np.array([[h_key, self.ram_index[i][1]]], dtype=np.int64).tobytes())
else:
with open(self.data_file, 'ab') as f:
bisect.insort_left(self.ram_index, [h_key, self.append_record(f, key, value)])
with open(self.index_file, 'ab') as f:
f.write(np.array([[h_key, self.ram_index[i][1]]], dtype=np.int64).tobytes())
def __delitem__(self, key):
h_key = hash(key)
i = bisect.bisect_left(self.ram_index, [h_key, 0])
if i != len(self.ram_index) and self.ram_index[i][0] == h_key:
with open(self.data_file, 'r+b') as f:
f.seek(self.ram_index[i][1])
f.write(b'D')
else:
raise KeyError(key)
def __iter__(self):
with open(self.data_file, 'rb') as f:
while True:
flag, key, _ = self.read_record(f)
if not flag:
break
if flag != b'D':
yield key
def items(self):
with open(self.data_file, 'rb') as f:
while True:
flag, key, value = self.read_record(f)
if not flag:
break
if flag != b'D':
yield key, value
def from_dict(self, dict):
new_data_file = self.data_file + '.rebuild'
new_index = []
with open(new_data_file, 'wb') as dest:
for key, value in dict.items():
hash_key = hash(key)
new_index.append([hash_key, self.append_record(dest, key, value)])
os.replace(new_data_file, self.data_file)
self.ram_index = new_index
with open(self.index_file, 'wb') as f:
np.array(new_index, dtype=np.int64).tofile(f)
self.load_index(self.index_file)
def add_items(self, items):
temp_file = self.data_file + '.temp'
with open(temp_file, 'wb') as dest:
with open(self.data_file, 'rb') as src:
flag = True
while flag:
flag1, key, value = self.read_record(src)
if flag!=b"D":
if key in items:
self.append_record(dest, key, items[key])
else:
self.append_record(dest, key, value)
for key, value in items.items():
self.append_record(dest, key, value)
os.replace(temp_file, self.data_file)
with open(self.index_file, 'wb') as f:
np.array(self.ram_index, dtype=np.int64).tofile(f)
self.load_index(self.index_file)
def read_record(self, f):
flag = f.read(1)
if not flag:
return None, None, None
key_len = struct.unpack('Q', f.read(8))[0]
key = f.read(key_len).decode('utf-8')
value_len = struct.unpack('Q', f.read(8))[0]
value = f.read(value_len).decode('utf-8')
return flag, key, value
def append_record(self, f, key, value):
position = f.tell()
k=key.encode('utf-8')
v=value.encode('utf-8')
f.write(b'V')
f.write(struct.pack('Q', len(k)))
f.write(k)
f.write(struct.pack('Q', len(v)))
f.write(v)
return position
def rebuild(self):
new_data_file = self.data_file + '.rebuild'
new_index = []
with open(self.data_file, 'rb') as src, open(new_data_file, 'wb') as dst:
while True:
flag, key, value = self.read_record(src)
if not flag:
break
if flag != b'D':
new_index.append([hash(key), self.append_record(dst, key, value)])
os.replace(new_data_file, self.data_file)
self.ram_index = new_index
with open(self.index_file, 'wb') as f:
np.array(new_index, dtype=np.int64).tofile(f)
self.load_index(self.index_file)
FileDict3.py is specifically used for handling text-based data. It has been optimized for handling large amounts of data while ensuring data security. By changing the table parameter, it is possible to simultaneously access multiple dict classes in a single file, making it suitable for handling complex situations.
#FileDict3.py
import sqlite3
import time
import atexit
class FileDict:
def __init__(self, file_path, buffer_size=1000, buffer_idle_time=1, table='filedict'):
self.file_path = file_path
self.table = table
self.conn = sqlite3.connect(file_path,check_same_thread=False)
self.conn.execute('CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))
self.buffer = []
self.buffer_size = buffer_size
self.last_commit_time = time.time()
self.buffer_idle_time = buffer_idle_time
atexit.register(self.close)
def get(self, key):
try:return self.__getitem__(key)
except KeyError: return None
def __getitem__(self, key):
self._check_buffer()
cursor = self.conn.execute('SELECT value FROM {} WHERE key = ?'.format(self.table), (key,))
result = cursor.fetchone()
if result is None:
raise KeyError(key)
return result[0]
def Tables(self):
cursor = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
table_names = [t[0] for t in tables]
return table_names
def __setitem__(self, key, value):
try:
self.check_key(key)
self.buffer.append(('set', key, value))
self._check_buffer()
except sqlite3.IntegrityError:
self.buffer.append(('update', key, value))
self._check_buffer()
def __delitem__(self, key):
self.buffer.append(('del', key))
self._check_buffer()
def __iter__(self):
self._check_buffer()
cursor = self.conn.execute('SELECT key FROM {}'.format(self.table))
for row in cursor:
yield row[0]
def items(self):
self._check_buffer()
cursor = self.conn.execute('SELECT key, value FROM {}'.format(self.table))
return cursor.fetchall()
def from_dict(self, dict):
self.check_dict(dict)
self.conn.execute('DROP TABLE IF EXISTS {}'.format(self.table))
self.conn.execute('CREATE TABLE {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))
self.conn.executemany('INSERT INTO {} (key, value) VALUES (?, ?)'.format(self.table), dict.items())
self.conn.commit()
def add_items(self, items):
for key, value in items.items():
try:
self.check_key(key)
self.buffer.append(('set', key, value))
self._check_buffer()
except sqlite3.IntegrityError:
self.buffer.append(('update', key, value))
self._check_buffer()
self._check_buffer()
def _check_buffer(self):
if not self.buffer:
return
idle_time = time.time() - self.last_commit_time
if len(self.buffer) >= self.buffer_size or idle_time >= self.buffer_idle_time:
self._commit()
def _commit(self):
if not self.buffer:
return
cursor = self.conn.cursor()
for op in self.buffer:
if op[0] == 'set':
cursor.execute('INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)'.format(self.table), (op[1], op[2]))
elif op[0] == 'update':
cursor.execute('UPDATE {} SET value = ? WHERE key = ?'.format(self.table), (op[2], op[1]))
elif op[0] == 'del':
cursor.execute('DELETE FROM {} WHERE key = ?'.format(self.table), (op[1],))
self.buffer = []
self.last_commit_time = time.time()
self.conn.commit()
def check_dict(self, dictionary):
for key in dictionary:
self.check_key(key)
def check_key(self, key):
if not isinstance(key, str):
raise TypeError('Keys must be strings.')
if not key:
raise ValueError('Keys cannot be empty strings.')
def search_keys(self, pattern, like=True, values=False):
self._check_buffer()
operator = 'LIKE' if like else '='
cursor = self.conn.cursor()
cursor.execute(f"SELECT key FROM {self.table} WHERE key {operator} ?", (pattern,))
return [row[0] for row in cursor.fetchall()]
def close(self):
self._commit()
try:
self.conn.commit()
except:
pass
self.conn.close()
FileSQL3.py is used for storing binary data. It automatically splits large files into smaller chunks and when reading, it simulates a File-like object for easy positioning and reading. It also has a caching mechanism, so there is no loss in performance during continuous reading. Currently, it is mainly used in conjunction with FastAPI to access a server on my home LAN network and play multimedia files. The speed is comparable to accessing it directly at home. However, it is still not perfect overall and is not suitable for storing large files that require frequent modifications.
#FileSQL3.py
import os,json
import sqlite3
import uuid
from datetime import datetime
import mimetypes
mimetypes.add_type("video/webm" ,'.mkv')
mimetypes.add_type("audio/flac",".flac")
DEFAULT_BLOCK_SIZE = 1024*1024*8
class FileS:
def __init__(self, meta, conn):
self.size = meta['length']
self.create = meta['created']
self.modified = meta['modified']
self.mimetype = meta['mimetype']
self.encoding = meta['encoding']
self.parts = json.loads(meta['parts'])
self.conn = conn
self.position = 0
self.buffer = [b'', -1, -1]
def read(self, size=-1):
if size < 0:
size = self.size - self.position
data = b''
while size > 0:
if size < DEFAULT_BLOCK_SIZE and self.buffer[1] <= self.position < self.buffer[2]:
chunk = self.buffer[0]
start = self.position - self.buffer[1]
end = min(start + size, self.buffer[2] - self.buffer[1])
data += chunk[start:end]
size -= end - start
self.position += end - start
else:
part = self._get_next_part()
if not part:
break
cur = self.conn.cursor()
cur.execute('SELECT data FROM datas WHERE uuid=?', (part['uuid'],))
chunk = cur.fetchone()[0]
if size >= DEFAULT_BLOCK_SIZE:
start = self.position % DEFAULT_BLOCK_SIZE
end = start + DEFAULT_BLOCK_SIZE
data += chunk[start:end]
size -= end - start
self.position += end - start
else:
chunk_start = self.position // DEFAULT_BLOCK_SIZE * DEFAULT_BLOCK_SIZE
chunk_end = min(chunk_start + DEFAULT_BLOCK_SIZE, part['end'])
chunk_pos_start = chunk_start - part['start']
chunk_pos_end = chunk_end - part['start']
self.buffer = [chunk[chunk_pos_start:chunk_pos_end], chunk_start, chunk_end]
start = self.position - chunk_start
end = min(start + size, chunk_end - chunk_start)
data += chunk[chunk_pos_start+start:chunk_pos_start+end]
size -= end - start
self.position += end - start
return data
def _get_next_part(self):
for part in self.parts:
if self.position < part['end'] and self.position >= part['start']:
return part
return None
def seek(self, position):
self.position = position
self.buffer = [b'', -1, -1]
def tell(self):
return self.position
class FileSQL3:
def __init__(self, db_path):
self.conn = sqlite3.connect(db_path,check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self._init_tables()
def _init_tables(self):
cur = self.conn.cursor()
cur.execute('''CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
created TEXT,
modified TEXT,
length INTEGER,
encoding TEXT,
mimetype TEXT,
description TEXT,
parts TEXT)''')
cur.execute('''CREATE TABLE IF NOT EXISTS datas (
uuid TEXT PRIMARY KEY,
data BLOB,
path TEXT,
start INTEGER,
end INTEGER)''')
self.conn.commit()
def get(self, file_path):
cur = self.conn.cursor()
cur.execute('SELECT * FROM files WHERE path=?', (file_path,))
meta = cur.fetchone()
if meta:
return FileS(dict(meta), self.conn)
def put(self, file_path, p_path=None, description=None, block_size=DEFAULT_BLOCK_SIZE):
if not p_path:
p_path = file_path
with open(file_path, "rb") as f:
file_size = os.path.getsize(file_path)
file_created = datetime.fromtimestamp(os.path.getctime(file_path)).isoformat()
file_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
parts = []
start = 0
while start < file_size:
end = min(start + block_size, file_size)
data = f.read(block_size)
data_uuid = str(uuid.uuid4())
parts.append({'uuid': data_uuid, 'start': start, 'end': end})
cur = self.conn.cursor()
cur.execute('INSERT INTO datas (uuid, data, path, start, end) VALUES (?, ?, ?, ?, ?)',
(data_uuid, data, p_path, start, end))
start = end
parts_json = json.dumps(parts)
try:
cur = self.conn.cursor()
mt, ec = mimetypes.guess_type(file_path)
cur.execute('''INSERT INTO files (path, created, modified, length, encoding, mimetype, description, parts)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(p_path, file_created, file_modified, file_size, ec, mt, description, parts_json))
except sqlite3.IntegrityError:
cur.execute('DELETE FROM files WHERE path=?', (p_path,))
cur.execute('''INSERT INTO files (path, created, modified, length, encoding, mimetype, description, parts)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
(p_path, file_created, file_modified, file_size, ec, mt, description, parts_json))
self.conn.commit()
def update_files_table(self, path, **fields):
cur = self.conn.cursor()
query = "UPDATE files SET "
query += ', '.join([f"{k} = ?" for k in fields.keys()])
query += " WHERE path = ?"
cur.execute(query, (*fields.values(), path))
self.conn.commit()
def search(self, search_string):
cur = self.conn.cursor()
cur.execute('SELECT path FROM files WHERE path LIKE ?', (search_string ,))
return [row['path'] for row in cur.fetchall()]
def delete(self, file_path):
cur = self.conn.cursor()
cur.execute('SELECT parts FROM files WHERE path=?', (file_path,))
parts = cur.fetchone()
if parts:
for part in json.loads(parts['parts']):
cur.execute('DELETE FROM datas WHERE uuid=?', (part['uuid'],))
cur.execute('DELETE FROM files WHERE path=?', (file_path,))
self.conn.commit()
沒有留言:
發佈留言