Translate

Dict -like Python Class works on file

This is code from a few months ago.The initial development of this feature was because I found data types like dict to be very convenient to use and could serve as the foundation for many advanced functionalities. If there was a stable and reliable dictionary class work on file, it would easily solve most data access problems and greatly speed up the development process.
I have always believed that programming skills are not essential. Before starting to type code, one should find the correct method. As long as the method is correct, the code can always be written. The specific steps implemented in different languages may vary slightly, but the underlying method is consistent.

The first pure Python version was just for learning and research purposes and has been deprecated. The version using SQLite3 is still being used and so far no issues have been found, so there haven't been any improvements made.


#FileDict.py

import os

import bisect

import numpy as np

import struct

class FileDict:

    def __init__(self, file_path):

        self.data_file = file_path + '.data'

        self.index_file = file_path + '.index'

        self.ram_index = []

        if not os.path.exists(self.data_file):

            open(self.data_file, 'wb').close()

        if not os.path.exists(self.index_file):

            open(self.index_file, 'wb').close()

        self.load_index(self.index_file)

    def load_index(self, index_file):

        if os.path.getsize(index_file) > 0:

            with open(index_file, 'rb') as f:

                self.ram_index = np.fromfile(f, dtype=np.int64).reshape(-1, 2).tolist()

                self.ram_index.sort()

    def __getitem__(self, key):

        h_key = hash(key)

        i = bisect.bisect_left(self.ram_index, [h_key, 0])

        if i != len(self.ram_index) and self.ram_index[i][0] == h_key:

            with open(self.data_file, 'rb') as f:

                f.seek(self.ram_index[i][1])

                flag, stored_key, value = self.read_record(f)

                if flag != b'D' and stored_key == key:

                    return value

        raise KeyError(key)

    def __setitem__(self, key, value):

        h_key = hash(key)

        i = bisect.bisect_left(self.ram_index, [h_key, 0])

        if i != len(self.ram_index) and self.ram_index[i][0] == h_key:

            with open(self.data_file, 'r+b') as f:

                f.seek(self.ram_index[i][1])

                flag, stored_key, value1 = self.read_record(f)

                if flag == b'V' and stored_key == key:

                    f.seek(self.ram_index[i][1])

                    f.write(b'D')

                    f.truncate()

                    pos=self.append_record(f, key, value)

                    self.ram_index[i]=[h_key,pos ]

                    with open(self.index_file, 'ab') as f:

                        f.write(np.array([[h_key, self.ram_index[i][1]]], dtype=np.int64).tobytes())

                elif flag != b'D' and stored_key == key and value == value1:

                    pass 

                else:

                    f.seek(self.ram_index[i][1])

                    f.truncate()

                    bisect.insort_left(self.ram_index, [h_key, self.append_record(f, key, value)])

                    with open(self.index_file, 'ab') as f:

                        f.write(np.array([[h_key, self.ram_index[i][1]]], dtype=np.int64).tobytes())

            

                    

        else:

            with open(self.data_file, 'ab') as f:

                bisect.insort_left(self.ram_index, [h_key, self.append_record(f, key, value)])

            with open(self.index_file, 'ab') as f:

                f.write(np.array([[h_key, self.ram_index[i][1]]], dtype=np.int64).tobytes())

            

    def __delitem__(self, key):

        h_key = hash(key)

        i = bisect.bisect_left(self.ram_index, [h_key, 0])

        if i != len(self.ram_index) and self.ram_index[i][0] == h_key:

            with open(self.data_file, 'r+b') as f:

                f.seek(self.ram_index[i][1])

                f.write(b'D')

        else:

            raise KeyError(key)

    def __iter__(self):

        with open(self.data_file, 'rb') as f:

            while True:

                flag, key, _ = self.read_record(f)

                if not flag:

                    break

                if flag != b'D':

                    yield key

    def items(self):

        with open(self.data_file, 'rb') as f:

            while True:

                flag, key, value = self.read_record(f)

                if not flag:

                    break

                if flag != b'D':

                    yield key, value

    

    def from_dict(self, dict):

        new_data_file = self.data_file + '.rebuild'

        new_index = []

        with open(new_data_file, 'wb') as dest:

            for key, value in dict.items():

                hash_key = hash(key)

                new_index.append([hash_key, self.append_record(dest, key, value)])

        os.replace(new_data_file, self.data_file)

        self.ram_index = new_index

        with open(self.index_file, 'wb') as f:

            np.array(new_index, dtype=np.int64).tofile(f)

        self.load_index(self.index_file)

    def add_items(self, items):

        temp_file = self.data_file + '.temp'

        with open(temp_file, 'wb') as dest:

            with open(self.data_file, 'rb') as src:

                flag = True

                while flag:

                    flag1, key, value = self.read_record(src)

                    if flag!=b"D":

                        if key in items:

                            self.append_record(dest, key, items[key])

                            

                        else:

                            self.append_record(dest, key, value)

            for key, value in items.items():

                self.append_record(dest, key, value)

        os.replace(temp_file, self.data_file)

        with open(self.index_file, 'wb') as f:

            np.array(self.ram_index, dtype=np.int64).tofile(f)

        self.load_index(self.index_file)    

    def read_record(self, f):

        flag = f.read(1)

        if not flag:

            return None, None, None

        key_len = struct.unpack('Q', f.read(8))[0]

        key = f.read(key_len).decode('utf-8')

        value_len = struct.unpack('Q', f.read(8))[0]

        value = f.read(value_len).decode('utf-8')

        return flag, key, value

    def append_record(self, f, key, value):

        position = f.tell()

        k=key.encode('utf-8')

        v=value.encode('utf-8')

        f.write(b'V')

        f.write(struct.pack('Q', len(k)))

        f.write(k)

        f.write(struct.pack('Q', len(v)))

        f.write(v)

        return position

    def rebuild(self):

        new_data_file = self.data_file + '.rebuild'

        new_index = []

        with open(self.data_file, 'rb') as src, open(new_data_file, 'wb') as dst:

            while True:

                flag, key, value = self.read_record(src)

                if not flag:

                    break

                if flag != b'D':

                    new_index.append([hash(key), self.append_record(dst, key, value)])

        os.replace(new_data_file, self.data_file)

        self.ram_index = new_index

        with open(self.index_file, 'wb') as f:

            np.array(new_index, dtype=np.int64).tofile(f)

        self.load_index(self.index_file)


FileDict3.py is specifically used for handling text-based data. It has been optimized for handling large amounts of data while ensuring data security. By changing the table parameter, it is possible to simultaneously access multiple dict classes in a single file, making it suitable for handling complex situations.


#FileDict3.py

import sqlite3

import time

import atexit

class FileDict:

    def __init__(self, file_path, buffer_size=1000, buffer_idle_time=1, table='filedict'):

        self.file_path = file_path

        self.table = table

        self.conn = sqlite3.connect(file_path,check_same_thread=False)

        self.conn.execute('CREATE TABLE IF NOT EXISTS {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))

        self.buffer = []

        self.buffer_size = buffer_size

        self.last_commit_time = time.time()

        self.buffer_idle_time = buffer_idle_time

        atexit.register(self.close)

    def get(self, key):

        try:return self.__getitem__(key)

        except KeyError: return None

    

    def __getitem__(self, key):

        self._check_buffer()

        cursor = self.conn.execute('SELECT value FROM {} WHERE key = ?'.format(self.table), (key,))

        result = cursor.fetchone()

        if result is None:

            raise KeyError(key)

        return result[0]

    def Tables(self):

        cursor = self.conn.execute("SELECT name FROM sqlite_master WHERE type='table';")

        tables = cursor.fetchall()

        table_names = [t[0] for t in tables]

        return table_names

    def __setitem__(self, key, value):

        try:

            self.check_key(key)

            self.buffer.append(('set', key, value))

            self._check_buffer()

        except sqlite3.IntegrityError:

            self.buffer.append(('update', key, value))

            self._check_buffer()

    def __delitem__(self, key):

        self.buffer.append(('del', key))

        self._check_buffer()

    def __iter__(self):

        self._check_buffer()

        cursor = self.conn.execute('SELECT key FROM {}'.format(self.table))

        for row in cursor:

            yield row[0]

    def items(self):

        self._check_buffer()

        cursor = self.conn.execute('SELECT key, value FROM {}'.format(self.table))

        return cursor.fetchall()

    def from_dict(self, dict):

        self.check_dict(dict)

        self.conn.execute('DROP TABLE IF EXISTS {}'.format(self.table))

        self.conn.execute('CREATE TABLE {} (key TEXT PRIMARY KEY, value TEXT)'.format(self.table))

        self.conn.executemany('INSERT INTO {} (key, value) VALUES (?, ?)'.format(self.table), dict.items())

        self.conn.commit()

    def add_items(self, items):

        for key, value in items.items():

            try:

                self.check_key(key)

                self.buffer.append(('set', key, value))

                self._check_buffer()

            except sqlite3.IntegrityError:

                self.buffer.append(('update', key, value))

                self._check_buffer()

        self._check_buffer()

    def _check_buffer(self):

        if not self.buffer:

            return

        idle_time = time.time() - self.last_commit_time

        if len(self.buffer) >= self.buffer_size or idle_time >= self.buffer_idle_time:

            self._commit()

    def _commit(self):

        if not self.buffer:

            return 

        cursor = self.conn.cursor()

        for op in self.buffer:

            if op[0] == 'set':

                cursor.execute('INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)'.format(self.table), (op[1], op[2]))

            elif op[0] == 'update':

                cursor.execute('UPDATE {} SET value = ? WHERE key = ?'.format(self.table), (op[2], op[1]))

            elif op[0] == 'del':

                cursor.execute('DELETE FROM {} WHERE key = ?'.format(self.table), (op[1],))

        self.buffer = []

        self.last_commit_time = time.time()

        self.conn.commit()

    def check_dict(self, dictionary):

        for key in dictionary:

            self.check_key(key)

    def check_key(self, key):

        if not isinstance(key, str):

            raise TypeError('Keys must be strings.')

        if not key:

            raise ValueError('Keys cannot be empty strings.')

    def search_keys(self, pattern, like=True, values=False):

        self._check_buffer()

        operator = 'LIKE' if like else '='

        cursor = self.conn.cursor()

        cursor.execute(f"SELECT key FROM {self.table} WHERE key {operator} ?", (pattern,))

        return [row[0] for row in cursor.fetchall()]

    def close(self):       

        self._commit()

        try:

            self.conn.commit()

        except:

            pass

        self.conn.close()


FileSQL3.py is used for storing binary data. It automatically splits large files into smaller chunks and when reading, it simulates a File-like object for easy positioning and reading. It also has a caching mechanism, so there is no loss in performance during continuous reading. Currently, it is mainly used in conjunction with FastAPI to access a server on my home LAN network and play multimedia files. The speed is comparable to accessing it directly at home. However, it is still not perfect overall and is not suitable for storing large files that require frequent modifications.


#FileSQL3.py

import os,json

import sqlite3

import uuid

from datetime import datetime

import mimetypes

mimetypes.add_type("video/webm" ,'.mkv')

mimetypes.add_type("audio/flac",".flac")

   

DEFAULT_BLOCK_SIZE = 1024*1024*8 

class FileS:

    def __init__(self, meta, conn):

        self.size = meta['length']

        self.create = meta['created']

        self.modified = meta['modified']

        self.mimetype = meta['mimetype']

        self.encoding = meta['encoding']

        self.parts = json.loads(meta['parts'])

        self.conn = conn

        self.position = 0

        self.buffer = [b'', -1, -1]

    def read(self, size=-1):

        if size < 0:

            size = self.size - self.position

        data = b''

        while size > 0:

            if size < DEFAULT_BLOCK_SIZE and self.buffer[1] <= self.position < self.buffer[2]:

                chunk = self.buffer[0]

                start = self.position - self.buffer[1]

                end = min(start + size, self.buffer[2] - self.buffer[1])

                data += chunk[start:end]

                size -= end - start

                self.position += end - start

            else:

                

                part = self._get_next_part()

                if not part:

                    break

                cur = self.conn.cursor()

                cur.execute('SELECT data FROM datas WHERE uuid=?', (part['uuid'],))

                chunk = cur.fetchone()[0]

                if size >= DEFAULT_BLOCK_SIZE:

                    start = self.position % DEFAULT_BLOCK_SIZE

                    end = start + DEFAULT_BLOCK_SIZE

                    data += chunk[start:end]

                    size -= end - start

                    self.position += end - start

                else:

                    

                    chunk_start = self.position // DEFAULT_BLOCK_SIZE * DEFAULT_BLOCK_SIZE

                    chunk_end = min(chunk_start + DEFAULT_BLOCK_SIZE, part['end'])

                    chunk_pos_start = chunk_start - part['start']

                    chunk_pos_end = chunk_end - part['start']

                    self.buffer = [chunk[chunk_pos_start:chunk_pos_end], chunk_start, chunk_end]

                    start = self.position - chunk_start

                    end = min(start + size, chunk_end - chunk_start)

                    data += chunk[chunk_pos_start+start:chunk_pos_start+end]

                    size -= end - start

                    self.position += end - start

        return data

    def _get_next_part(self):

        for part in self.parts:

            if self.position < part['end'] and self.position >= part['start']:

                return part

        return None

    def seek(self, position):

        self.position = position

        self.buffer = [b'', -1, -1]

    def tell(self):

        return self.position

class FileSQL3:

    def __init__(self, db_path):

        self.conn = sqlite3.connect(db_path,check_same_thread=False)

        self.conn.row_factory = sqlite3.Row

        self._init_tables()

    def _init_tables(self):

        cur = self.conn.cursor()

        cur.execute('''CREATE TABLE IF NOT EXISTS files (

                        path TEXT PRIMARY KEY,

                        created TEXT,

                        modified TEXT,

                        length INTEGER,

                        encoding TEXT,

                        mimetype TEXT,

                        description TEXT,

                        parts TEXT)''')

        cur.execute('''CREATE TABLE IF NOT EXISTS datas (

                        uuid TEXT PRIMARY KEY,

                        data BLOB,

                        path TEXT,

                        start INTEGER,

                        end INTEGER)''')

        self.conn.commit()

    def get(self, file_path):

        cur = self.conn.cursor()

        cur.execute('SELECT * FROM files WHERE path=?', (file_path,))

        meta = cur.fetchone()

        if meta:

            return FileS(dict(meta), self.conn)

    def put(self, file_path, p_path=None, description=None, block_size=DEFAULT_BLOCK_SIZE):

        if not p_path:

            p_path = file_path

        

        with open(file_path, "rb") as f:

            file_size = os.path.getsize(file_path)

            file_created = datetime.fromtimestamp(os.path.getctime(file_path)).isoformat()

            file_modified = datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()

            parts = []

            start = 0

            while start < file_size:

                end = min(start + block_size, file_size)

                data = f.read(block_size)

                data_uuid = str(uuid.uuid4())

                parts.append({'uuid': data_uuid, 'start': start, 'end': end})

                cur = self.conn.cursor()

                cur.execute('INSERT INTO datas (uuid, data, path, start, end) VALUES (?, ?, ?, ?, ?)',

                            (data_uuid, data, p_path, start, end))

                start = end

            parts_json = json.dumps(parts)

            try:

                cur = self.conn.cursor()

                mt, ec = mimetypes.guess_type(file_path)

                

                cur.execute('''INSERT INTO files (path, created, modified, length, encoding, mimetype, description, parts)

                            VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',

                            (p_path, file_created, file_modified, file_size, ec, mt, description, parts_json))

            except sqlite3.IntegrityError:

                cur.execute('DELETE FROM files WHERE path=?', (p_path,))

                cur.execute('''INSERT INTO files (path, created, modified, length, encoding, mimetype, description, parts)

                            VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',

                            (p_path, file_created, file_modified, file_size, ec, mt, description, parts_json))

            self.conn.commit()

    def update_files_table(self, path, **fields):

        cur = self.conn.cursor()

        query = "UPDATE files SET "

        query += ', '.join([f"{k} = ?" for k in fields.keys()])

        query += " WHERE path = ?"

        cur.execute(query, (*fields.values(), path))

        self.conn.commit()

    def search(self, search_string):

        cur = self.conn.cursor()

        cur.execute('SELECT path FROM files WHERE path LIKE ?', (search_string ,))

        return [row['path'] for row in cur.fetchall()]

    def delete(self, file_path):

        cur = self.conn.cursor()

        cur.execute('SELECT parts FROM files WHERE path=?', (file_path,))

        parts = cur.fetchone()

        if parts:

            for part in json.loads(parts['parts']):

                cur.execute('DELETE FROM datas WHERE uuid=?', (part['uuid'],))

            cur.execute('DELETE FROM files WHERE path=?', (file_path,))

            self.conn.commit()


沒有留言:

發佈留言