Within your Flow, you may want to perform data science, data analysis or data manipulation on a user-uploaded file. You can use a DataSource element to provide users with an interface of uploading files.
Additionally, you can reference the data uploaded by users in subsequent Steps from the store_interface
, allowing you to use Python packages like pandas to manipulate your data as needed.
Creating a Datasource Element
To create the Datasource element, you'll first need to create a DataUpload class:
class DataUpload(Step):
def run(self, flow_metadata):
pass
Next, you'll create a Card, Section, Page, and Step for the Data Upload:
data_upload_card = Card("Upload", [DataSource("Dataset", ["csv"], description="Upload the csv")])
data_upload_content = Section("Upload your data", [data_upload_card])
data_upload_page = Page("Data Upload", PageType.INPUT, {}, [data_upload_content])
data_upload_step = DataUpload("Data Upload", "Upload Dataset.", "Data Ingestion", StepType.INPUT, data_upload_page)
You'll then add the Data Upload Step to the list of Steps:
predict_steps = [
data_upload_step,
model_step
]
Within the class where you want to access the file, add the following code:
store_interface = StoreInterface(**flow_metadata)
# Get current page
current_page = store_interface.get_page()
# Fetch input data
data = store_interface.get_element_value(data_upload_step.name, "Dataset")
And now, you can access the uploaded CSV as a DataFrame using the variable data
.
from predict_backend.flow.step import Step, StepType
from predict_backend.flow.flow import Flow
from predict_backend.page import Card, Page, PageType, Section
from predict_backend.page.elements import DataSource
from predict_backend.store.store_interface import StoreInterface
import logging
class DataUpload(Step):
def run(self, flow_metadata):
pass
class Model(Step):
def run(self, flow_metadata):
store_interface = StoreInterface(**flow_metadata)
# Get current page
current_page = store_interface.get_page()
# Fetch input data
data = store_interface.get_element_value(data_upload_step.name, "Dataset")
logger.info("Dataset contains {} timesteps!".format(len(data)))
# Use the df here however you need!
system_failure_flow = Flow("System Failure Prediction", "Predict the time to failure.")
data_upload_card = Card("Upload", [DataSource("Dataset", ["csv"], description="Upload the csv")])
data_upload_content = Section("Upload your data", [data_upload_card])
data_upload_page = Page("Data Upload", PageType.INPUT, {}, [data_upload_content])
data_upload_step = DataUpload("Data Upload", "Upload Dataset.", "Data Ingestion", StepType.INPUT, data_upload_page)
model_content = Section("Model", [])
model_page = Page("Model Training", PageType.RESULTS, {}, [model_content])
model_step = Model("Model Training and Results", "Train Model", "Model Analysis", StepType.RESULTS, model_page)
# Put all steps together to build the flow
predict_steps = [
data_upload_step,
model_step
]
system_failure_flow.chain(predict_steps)
What to Expect
Validation of Data Upload:
-
Run your Flow and access it at localhost:3000.
-
You should see a data upload Page. Upload your dataset.
-
If successful, you should see “Upload Complete” pop up in the bottom corner.
Validation of Accessing DataFrame:
-
In the terminal output, you should see “Dataset contains {number} timestamps” printed. The number should be the number of rows in your dataset.
-
Try running simple Pandas manipulations within the class you are using to access your DataFrame.
Additional Resources
-
What is a DataFrame?
-
What is the Pandas Python Package?
Previous Article |
Next Article |