Source code for fsspec_proxy.file_manager

from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
import fsspec.utils
import io
import os
import yaml
import logging

logger = logging.getLogger("fsspec_proxy")
default_config = b"""sources:
 - name: inmemory
   path: memory://mytests
 - name: local
   path: file:///Users
   readonly: true
 - name: "Conda Stats"
   path: "s3://anaconda-package-data/conda/hourly/"
   kwargs:
     anon: True
 - name: "MyAnaconda"
   path: "anaconda://my/"
allow_reload: true
"""

[docs] class FileSystemManager: """ Holds configured fsspec instances """ def __init__(self, config_path=None): """ :param config_path: if NONE, loads config from FSSPEC_PROXY_CONFIG """ self.filesystems = {} config_path = config_path or os.getenv("FSSPEC_PROXY_CONFIG", None) self.config = self.load_config(config_path) self.initialize_filesystems()
[docs] def load_config(self, config_path=None): """Reset the config""" if config_path is None: data = default_config elif not os.path.exists(config_path): return {} else: with open(config_path, "rb") as file: data = file.read() config_content = yaml.safe_load(io.BytesIO(data)) logger.info("new config: %s", config_content) return config_content
[docs] def initialize_filesystems(self): """Create filesystem instances""" new_filesystems = {} for fs_config in self.config.get("sources", []): key = fs_config["name"] fs_path = fs_config["path"] kwargs = fs_config.get("kwargs", {}) try: fs, url2 = fsspec.url_to_fs(fs_path, **kwargs) except Exception: # or we could still list show their names but not the contents logger.error("Instantiating filesystem failed") continue if not fs.async_impl: fs = AsyncFileSystemWrapper(fs) new_filesystems[key] = { "instance": fs, "path": url2, } logger.info("new filesystems: %s", new_filesystems) self.filesystems = new_filesystems
[docs] def get_filesystem(self, key): """Get filesystem instance by configured name""" return self.filesystems.get(key)