deffullpath(self,key:str)->pathlib.Path:"""Get the full file path for a given key. Args: key: File key Returns: Full resolved file path Raises: ValueError: If the key attempts to escape the base_path """# Remove leading slashes to prevent path injectionkey=key.lstrip("/")full_path=(self.base_path/key).resolve()# Security check: ensure path is within base_pathifnotstr(full_path).startswith(str(self.base_path)):raiseValueError(f"Invalid key: {key}")returnfull_path
defmetadata_path(self,key:str)->pathlib.Path:"""Get the metadata file path for a given key. Args: key: File key Returns: Metadata file path """file_path=self.fullpath(key)returnfile_path.parent/(file_path.name+self.METADATA_SUFFIX)
asyncdefget_metadata(self,key:str)->builtins.dict[str,t.Any]:"""Get metadata for a file. Args: key: File key Returns: Metadata dictionary (empty if no metadata exists) Raises: FileNotFoundError: If the file does not exist """file_path=self.fullpath(key)ifnotfile_path.exists():raiseFileNotFoundError(f"File not found: {key}")ifnotfile_path.is_file():raiseIsADirectoryError(f"Path is not a file: {key}")metadata_file=self.metadata_path(key)ifnotmetadata_file.exists():return{}asyncwithaiofiles.open(metadata_file,encoding="utf-8")asf:content=awaitf.read()returnjson.loads(content)# type: ignore
asyncdefdownload(self,key:str,*,stream:bool=False,chunk_size:int=DEFAULT_CHUNK_SIZE,**_kwargs:t.Any,)->bytes|t.AsyncIterable[bytes]:"""Download a file. Args: key: File key stream: Whether to return as a stream chunk_size: Size of each chunk in bytes (used if stream is True) **_kwargs: Additional arguments (unused) Returns: File data (bytes or async iterator) Raises: FileNotFoundError: If the file does not exist IsADirectoryError: If the path is a directory """file_path=self.fullpath(key)ifnotfile_path.exists():raiseFileNotFoundError(f"File not found: {key}")ifnotfile_path.is_file():raiseIsADirectoryError(f"Path is not a file: {key}")ifstream:returnself.stream_file(file_path,chunk_size)asyncwithaiofiles.open(file_path,"rb")asf:returnawaitf.read()
asyncdefstream_file(self,file_path:pathlib.Path,chunk_size:int=DEFAULT_CHUNK_SIZE,)->t.AsyncIterable[bytes]:"""Stream a file in chunks. Args: file_path: Path to the file chunk_size: Size of each chunk in bytes Yields: File data chunks """asyncwithaiofiles.open(file_path,"rb")asf:whileTrue:chunk=awaitf.read(chunk_size)ifnotchunk:breakyieldchunk
asyncdefdelete(self,key:str)->None:"""Delete a file and its metadata. Args: key: File key Raises: IsADirectoryError: If the path is a directory """file_path=self.fullpath(key)iffile_path.exists():iffile_path.is_file():awaitaiofiles.os.remove(file_path)# Delete metadata file if existsmetadata_file=self.metadata_path(key)ifmetadata_file.exists():awaitaiofiles.os.remove(metadata_file)else:raiseIsADirectoryError(f"Path is not a file: {key}")
asyncdeflist(self,prefix:str="",page_size:int=10,**_kwargs:t.Any,)->t.AsyncIterable[builtins.list[str]]:"""List files with a given prefix. Args: prefix: Key prefix to filter files page_size: Number of items per page **_kwargs: Additional arguments (unused) Returns: List of file keys matching the prefix (excludes metadata files) """prefix=prefix.lstrip("/")search_path=self.base_path/prefixifprefixelseself.base_pathifnotsearch_path.exists():yield[]else:current_page:builtins.list[str]=[]foriteminsearch_path.rglob("*"):ifitem.is_file()andnotitem.name.endswith(self.METADATA_SUFFIX):relative_path=item.relative_to(self.base_path)current_page.append(str(relative_path))iflen(current_page)>=page_size:yieldcurrent_pagecurrent_page=[]ifcurrent_page:# Yield any remaining itemsyieldcurrent_page
asyncdefexists(self,key:str)->bool:"""Check if a file exists. Args: key: File key Returns: True if the file exists, False otherwise """file_path=self.fullpath(key)returnfile_path.exists()andfile_path.is_file()
asyncdefclear(self,prefix:str="")->None:"""Clear files with a given prefix. Args: prefix: Key prefix to filter files """prefix=prefix.lstrip("/")search_path=self.base_path/prefixifprefixelseself.base_pathifnotsearch_path.exists():return# If search_path is a file, delete it directlyifsearch_path.is_file():awaitaiofiles.os.remove(search_path)# Delete metadata file if existsifnotsearch_path.name.endswith(self.METADATA_SUFFIX):metadata_file=search_path.parent/(search_path.name+self.METADATA_SUFFIX)ifmetadata_file.exists():awaitaiofiles.os.remove(metadata_file)return# Recursively traverse directory and delete filesforiteminsearch_path.rglob("*"):ifitem.is_file():awaitaiofiles.os.remove(item)
asyncdefcopy(self,source_key:str,dest_key:str,**_kwargs:t.Any)->str:"""Copy a file and its metadata. Args: source_key: Source file key dest_key: Destination file key **_kwargs: Additional arguments (unused) Returns: Destination file key Raises: FileNotFoundError: If the source file does not exist IsADirectoryError: If the source path is a directory """source_path=self.fullpath(source_key)dest_path=self.fullpath(dest_key)ifnotsource_path.exists():raiseFileNotFoundError(f"Source file not found: {source_key}")ifnotsource_path.is_file():raiseIsADirectoryError(f"Source path is not a file: {source_key}")# Ensure parent directory of destination existsdest_path.parent.mkdir(parents=True,exist_ok=True)# Copy fileasyncwith(aiofiles.open(source_path,"rb")assrc_file,aiofiles.open(dest_path,"wb")asdest_file,):whileTrue:chunk=awaitsrc_file.read(self.DEFAULT_CHUNK_SIZE)ifnotchunk:breakawaitdest_file.write(chunk)# Copy metadata file if existssource_metadata=self.metadata_path(source_key)ifsource_metadata.exists():dest_metadata=self.metadata_path(dest_key)asyncwith(aiofiles.open(source_metadata,"rb")assrc_meta,aiofiles.open(dest_metadata,"wb")asdest_meta,):whileTrue:chunk=awaitsrc_meta.read(self.DEFAULT_CHUNK_SIZE)ifnotchunk:breakawaitdest_meta.write(chunk)returndest_key