Core
The DSI Core middleware defines the Terminal and Sync concept. An instantiated Terminal is the human/machine DSI interface to connect Reader/Writer plugins and DSI backends. An instantiated Sync supports data movement capabilities between local and remote locations and captures metadata documentation
Core: Terminal
The Terminal class is a structure through which users can interact with Plugins (Readers/Writers) and Backends as “module” objects. Each reader/writer/backend can be “loaded” to be ready for use and users can interact with backends by ingesting, querying, processing, or finding data, as well as generating an interactive notebook of the data.
All relevant functions have been listed below for further clarity. Examples section displays various workflows using this Terminal class.
- Notes for users:
All plugin writers that are loaded must be followed by calling transload() after to execute them. Readers are automatically executed upon loading.
Terminal.load_module: if user wants to relate tables of data from a plugin reader under the same name, they can use the target_table_prefix` input to specify a prefix.
users must note that if accessing data from these tables they must remember the table names will include specified prefix. Ex: collection1__math, collection1_english
Terminal.artifact_handler: ‘notebook’ interaction_type stores data from first loaded backend, not existing DSI abstraction, in new notebook file
Terminal find functions only access the first loaded backend
Terminal.unload_module: removes last loaded backend of specified mod_name. ex: 2 loaded Sqlite backends, second is unloaded
Terminal handles errors from any loaded DSI/user-written modules (plugins/backends). If writing an external plugin/backend, return a caught error as a tuple (error, error_message_string). Do not print in a new class
- class dsi.core.Terminal(debug=0, backup_db=False, runTable=False)
An instantiated Terminal is the DSI human/machine interface.
Terminals are a home for Plugins and an interface for Backends. Backends may be back-reads or back-writes. Plugins may be writers or readers. See documentation for more information.
- __init__(debug=0, backup_db=False, runTable=False)
Initialization helper function to pass through optional parameters for DSI core.
Optional flags can be set and defined:
debug: {0: off, 1: user debug log, 2: user + developer debug log}
When set to 1 or 2, debug info will write to a local debug.log text file with various benchmarks.
backup_db: Undefined False as default. If set to True, this creates a backup database before committing new changes.
runTable: Undefined False as default. When new metadata is ingested, a ‘runTable’ is created, appended, and timestamped when database in incremented. Recommended for in-situ use-cases.
- add_external_python_module(mod_type, mod_name, mod_path)
Adds an external, meaning not from the DSI repo, Python module to the module_collection. Afterwards, load_module can be used to load a DSI module from the added Python module.
Note: mod_type is needed because each Python module only implements plugins or backends.
Check Example 7 in Core:Examples on GitHub Docs to see how to use this function.
- artifact_handler(interaction_type, query=None, **kwargs)
Interact with loaded DSI backends by ingesting or retrieving data from them.
interaction_type:
‘ingest’ or ‘put’: ingests active DSI abstraction into ALL loaded BACK-WRITE backends (BACK-READ backends ignored
if backup_db flag = True in a local Core, a backup is created prior to ingesting data into each loaded backend
‘query’ or ‘get’: retrieves data from first loaded backend based on a specified ‘query’
‘notebook’ or ‘inspect’: generates an interactive Python notebook with all data from first loaded backend
‘process’ or ‘read’: overwrites current DSI abstraction with all data from first loaded BACK-READ backend
query: default None. Specify if interaction_type = ‘query’ and query_artifact function in backend file requires an input
A DSI Core Terminal may load zero or more Backends with storage functionality.
- close()
Immediately closes all active modules: backends, plugin writers, plugin readers
Clears out the current DSI abstraction
NOTE - This step cannot be undone.
- find(query_object)
Find all function that searches for all instances of ‘query_object’ in first loaded backend. Searches among all tables/column/cells
return: List of backend-specific objects that each contain details of a match for ‘query_object’
check file of the first backend loaded to understand the structure of the objects in this list
- find_cell(query_object, row=False)
Find cell function that searches for all cells which match the ‘query_object’ in first loaded backend.
row: default False.
If True, then full row of data where a cell matches ‘query_object’ is included in return
If False, then the value of the cell that matches ‘query_object’ is included in return
return: List of backend-specific objects that each contain value of a cell/full row where a cell matches ‘query_object’
check file of the first backend loaded to understand the structure of the objects in this list
- find_column(query_object, range=False)
Find column function that searches for all columns whose names matches the ‘query_object’ in first loaded backend.
range: default False.
If True, then data-range of all numerical columns which match ‘query_object’ is included in return
If False, then data for each column that matches ‘query_object’ is included in return
return: List of backend-specific objects that each contain data/numerical range about a column matching ‘query_object’.
check file of the first backend loaded to understand the structure of the objects in this list
- find_helper(query_object, return_object, start, find_type)
Users should not call this externally, only to be used by internal core functions.
Helper function to print/log information for all core find functions: find(), find_table(), find_column(), find_cell()
- find_table(query_object)
Find table function that searches for all tables whose names matches the ‘query_object’ in first loaded backend.
return: List of backend-specific objects that each contain all data from a table matching ‘query_object’.
check file of the first backend loaded to understand the structure of the objects in this list
- get_current_abstraction(table_name=None)
Returns the current DSI abstraction as a nested Ordered Dict, where keys are table names and values are the table’s data as an Ordered Dict
The inner table Ordered Dict has column names as keys and list of column data as the values.
table_name: default None. If specified, the return will only be that table’s Ordered Dict, not a nested one.
return: nested Ordered Dict if table_name is None. single Ordered Dict if table_name is not None
- list_available_modules(mod_type)
List available DSI modules of an arbitrary module type.
This method is useful for Core Terminal setup. Plugin and Backend type DSI modules are supported, but this getter can be extended to support any new DSI module types which are added.
Note: self.VALID_MODULES refers to _DSI_ Modules however, DSI Modules are classes, hence the naming idiosynchrocies below.
- list_loaded_modules()
List DSI modules which have already been loaded.
These Plugins and Backends are active or ready to execute a post-processing task.
- load_module(mod_type, mod_name, mod_function, **kwargs)
Load a DSI module from the available Plugin and Backend module collection.
DSI modules may be loaded which are not explicitly listed by the list_available_modules. This flexibility ensures that advanced users can access higher level abstractions. We expect most users will work with module implementations rather than templates, but but all high level class abstractions are accessible with this method.
If a loaded module has mod_type=’plugin’ & mod_function=’reader’, it is automatically activated and then unloaded as well. Therefore, a user does not have to activate it separately with transload() (only used by plugin writers) or call unload_module()
- transload(**kwargs)
Transloading signals to the DSI Core Terminal that Plugin set up is complete.
Activates all loaded plugin writers by generating all their various output files such as an ER Diagram or an image of a table plot
All loaded plugin writers will be unloaded after activation, so there is no need to separately call unload_module() for them
- unload_module(mod_type, mod_name, mod_function)
Unloads a specific DSI module from the active_modules collection.
Mostly to be used for unloading backends, as plugin readers and writers are auto unloaded elsewhere.
- update_abstraction(table_name, table_data)
Updates the DSI abstraction, by overwriting the specified table_name with the input table_data
table_name: name of table that must be in the current abstraction
table_data: table data that must be stored as an Ordered Dict where column names are keys and column data is a list stored as values.
return: None
Core: Sync
The DSI Core middleware also defines data management functionality in Sync
.
The purpose of Sync
is to provide file metadata documentation and data movement capabilities when moving data to/from local and remote locations.
The purpose of data documentation is to capture and archive metadata
(i.e. location of local file structure, their access permissions, file sizes, and creation/access/modification dates)
and track their movement to the remote location for future access.
The primary functions, Copy
, Move
, and Get
serve as mechanisms to copy data, move data, or retrieve data from remote locations by creating a DSI database in the process,
or retrieving an existing DSI database that contains the location(s) of the target data.
- class dsi.core.Sync(project_name='test')
A class defined to assist in data management activities for DSI
Sync is where data movement functions such as copy (to remote location) and sync (local filesystem with remote) exist.
- __init__(project_name='test')
- copy(local_loc, remote_loc, isVerbose=False)
Helper function to stage location and get filesystem information, and copy data over using a preferred API
- dircrawl(filepath)
Crawls the root ‘filepath’ directory and returns files
filepath: source filepath to be crawled
return: returns crawled file-list
- get()
Helper function that searches remote location based on project name, and retrieves DSI database
- populate(local_loc, remote_loc, isVerbose=False)
Helper function to gather filesystem information, local and remote locations to create a filesystem entry in a new or existing database
Examples
Before interacting with the plugins and backends, they must each be loaded.
Examples below display various ways users can incorporate DSI into their data science workflows.
They can be found and run in examples/core/
Example 1: Intro use case
Baseline use of DSI to list Modules
from dsi.core import Terminal
base_terminal = Terminal(debug=0, backup_db = False, runTable=False)
base_terminal.list_available_modules('plugin')
# ['GitInfo', 'Hostname', 'SystemKernel', 'Bueno', 'Csv']
base_terminal.list_available_modules('backend')
# ['Gufi', 'Sqlite', 'Parquet']
print(base_terminal.list_loaded_modules())
# {'writer': [],
# 'reader': [],
# 'back-read': [],
# 'back-write': []}
Example 2: Ingest data
Ingesting data from a Reader to a backend
from dsi.core import Terminal
terminal_ingest = Terminal(debug=0, backup_db = False, runTable=True)
terminal_ingest.load_module('plugin', 'Schema', 'reader', filename="../data/example_schema.json", target_table_prefix = "student")
terminal_ingest.load_module('plugin', 'YAML1', 'reader', filenames=["../data/student_test1.yml", "../data/student_test2.yml"], target_table_prefix = "student")
terminal_ingest.load_module('plugin', 'TOML1', 'reader', filenames=["../data/results.toml", "../data/results1.toml"], target_table_prefix = "results")
terminal_ingest.load_module('backend','Sqlite','back-write', filename='data.db')
terminal_ingest.artifact_handler(interaction_type='ingest')
Example 3: Query data
Querying data from a backend
from dsi.core import Terminal
terminal_query = Terminal(debug=0, backup_db = False, runTable=True)
# Run Example 2 so data.db is not empty and data can be queried
terminal_query.load_module('backend','Sqlite','back-write', filename='data.db')
data = terminal_query.artifact_handler(interaction_type='query', query = "SELECT * FROM runTable;")
print(data)
Example 4: Process data
Processing data from a backend to generate an Entity Relationship diagram using a Writer
from dsi.core import Terminal
terminal_process = Terminal(debug=0, backup_db = False, runTable=True)
# Run Example 2 so data.db is not empty and data can be processed back into the abstraction
terminal_process.load_module('backend','Sqlite','back-read', filename='data.db')
terminal_process.artifact_handler(interaction_type="process")
# runTable is in ER Diagram since flag was set to True in Example 2 when creating data.db
terminal_process.load_module('plugin', 'ER_Diagram', 'writer', filename = 'er_diagram.png')
# After loading a plugin WRITER, need to call transload() to execute it
terminal_process.transload()
Example 5: Generate notebook
Generating a python notebook file (mostly Jupyter notebook) from a backend to view data interactively
from dsi.core import Terminal
terminal_notebook = Terminal(debug=0, backup_db = False, runTable=True)
# Run Example 2 so data.db is not empty and data can be outputted to a Python notebook
terminal_notebook.load_module('backend','Sqlite','back-write', filename='data.db')
terminal_notebook.artifact_handler(interaction_type="notebook")
Example 6: Find data
Finding data from a backend - tables, columns, cells, or all matches
from dsi.core import Terminal
terminal_find = Terminal(debug=0, backup_db = False, runTable=False)
# Run Example 2 so data.db is not empty and data can then be found here
terminal_find.load_module('backend','Sqlite','back-write', filename='data.db')
## TABLE match - return matching table data
data = terminal_find.find_table("people")
for val in data:
print(val.t_name, val.c_name, val.value, val.row_num, val.type)
## COLUMN match - return matching column data
data = terminal_find.find_column("a")
for val in data:
print(val.t_name, val.c_name, val.value, val.row_num, val.type)
## RANGE match (range = True) - return [min, max] of matching cols
data = terminal_find.find_column("avg", range = True)
for val in data:
print(val.t_name, val.c_name, val.value, val.row_num, val.type)
## CELL match - return the cells which match the search term
data = terminal_find.find_cell(5.5)
for val in data:
print(val.t_name, val.c_name, val.value, val.row_num, val.type)
## ROW match (row_return = True) - return the rows where cells match the search term
data = terminal_find.find_cell(5.9, row = True)
for val in data:
print(val.t_name, val.c_name, val.value, val.row_num, val.type)
## ALL match - find all instances where the search term is found: table, column, cell
data = terminal_find.find("a")
for val in data:
print(val.t_name, val.c_name, val.value, val.row_num, val.type)
Example 7: External plugin
Loading an external python plugin reader from a separate file:
from dsi.core import Terminal
term = Terminal(debug=0, backup_db = False, runTable=False)
# Second input is name of plugin class in the other file
# Third input is name of the python file where the Reader/Writer is written
term.add_external_python_module('plugin', 'TextFile', 'text_file_reader.py')
print(term.list_available_modules('plugin')) # includes TextFile at end of list
term.load_module('plugin', 'TextFile', 'reader', filenames = "../data/test.txt")
print(term.get_current_abstraction())
# OrderedDict({'text_file':
# OrderedDict({'Name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
# 'Age': [25, 30, 22, 28, 35],
# 'Location': ['New York', 'Dallas', 'Chicago', 'Miami', 'Boston']})})
text_file_reader
:
from collections import OrderedDict
from pandas import DataFrame, read_csv, concat
from dsi.plugins.file_reader import FileReader
class TextFile(FileReader):
"""
External Plugin to read in an individual or a set of text files.
Assuming all text files have data for same table
"""
def __init__(self, filenames, **kwargs):
"""
`filenames`: one text file or a list of text files to be ingested
"""
super().__init__(filenames, **kwargs)
if isinstance(filenames, str):
self.text_files = [filenames]
else:
self.text_files = filenames
self.text_file_data = OrderedDict()
def add_rows(self) -> None:
"""
Parses text file data and creates an ordered dict whose keys are table names and values are an ordered dict for each table.
"""
total_df = DataFrame()
for filename in self.text_files:
temp_df = read_csv(filename)
total_df = concat([total_df, temp_df], axis=0, ignore_index=True)
self.text_file_data["text_file"] = OrderedDict(total_df.to_dict(orient='list'))
self.set_schema_2(self.text_file_data)