Sentiment Analysis 2 - Data Streaming with FileDataStream

In this example, we develop a similar model as in the tutorial for Twitter Data 1. Instead of loading data in pandas, we load the data with nimbusml and the model can be simply trained using the input file name. Instead of saving the whole dataset in memory, nimbusml processes the data by passing a DataFileStream in the training/testing process to achieve exponentially speed up.

NimbusML Schema

In this section, instead of using pandas.load_csv to load the input files, we use DataFileStream to train the model. A FileDataStream can be generated from a csv file directly. The file schema indicates the format of the input file, i.e. with/without header, sep = ',' or '\t', the data type for each columns, and all the related information.

# Getting input file path from package
import os
from nimbusml.datasets import get_dataset
from nimbusml import FileDataStream 

train_file = get_dataset('gen_twittertrain').as_filepath()
test_file = get_dataset('gen_twittertest').as_filepath()

print(os.path.basename(train_file))
print(os.path.basename(test_file))
train-twitter.gen-sample.tsv
test-twitter.gen-sample.tsv
# Generating file schema
data_stream_train = FileDataStream.read_csv(train_file, sep='\t')
data_stream_test = FileDataStream.read_csv(test_file, sep='\t')
data_stream_train.schema
DataSchema([DataColumn(name='Sentiment', type='TX', pos=0),
    DataColumn(name='Text', type='TX', pos=1),
    DataColumn(name='Label', type='I8', pos=2)], header=True,
    sep='\t')

The interpretation is:

1. There are three columns in the dataset
2. "name='Sentiment', type='TX', pos=0" indicates that the the new column "Sentiment" is of type "TX" (text) and it is from the column 0 in the origin dataset.
3. "name='Text', type='TX', pos=1" indicates that the new column "Text" is of type "TX" (text) and it is from the column 1 in the origin dataset.
4. "name='Label', type='I8', pos=2" indicates that the new column "Label" is of type "I8" (integer), and it is from the column 2 in the origin dataset.
5. "header=True" indicates that the data has a header
6. "sep = \t" means that the dataset is seperated by "\t".

The pipeline is created in the same way as in the previous tutorial:

Training Pipeline

The pipeline can be developed as in the previous tutorial.

import time
from IPython.display import display
from nimbusml.feature_extraction.text import NGramFeaturizer
from nimbusml.feature_extraction.text.extractor import Ngram
from nimbusml.linear_model import AveragedPerceptronBinaryClassifier
from nimbusml.decomposition import PcaTransformer
from nimbusml import Pipeline, FileDataStream

In NimbusML, for transforms, the user can specify the input column names for each operator to be executed on. If not, all the columns from the previous operator or the origin dataset will be used. In Tutorial 2.2, the column syntax of nimbusml will be discussed in more details.

                                           columns = {"features":"Text"]}

indicates that the operator will use as input columns ["Text"] and the output will be saved to column "features".

                                           columns = ["features"]

indicates that the operator will use as input columns ["features"] and the output will be saved to the same column (overwrite the origin columns).

For learners, the user can specify the "roles" for different columns, such as feature, weight, label, group_id, etc..

t0 = time.time()
ppl = Pipeline([
                NGramFeaturizer(word_feature_extractor=Ngram(weighting = 'TfIdf',
                                                             ngram_length=2),
                                char_feature_extractor=Ngram(weighting = 'Tf',
                                                             ngram_length=3),
                                columns = {"features": "Text"}), 
                PcaTransformer(rank = 80, columns = "features"),
                AveragedPerceptronBinaryClassifier(l2_regularization=0.3,
                                                   number_of_iterations=3,
                                                   feature = ["features"],
                                                   label = "Label"),
               ])

Instead of fitting the model using a pandas dataframe, we use the FileDataStream created by the file name, such as 'input_data/train-twitter.sample.tsv' as a pointer to the input data source.

Then the model can be fitted based on the FileDataStream. The data file will be loaded using NimbusML data loader in a streamline, and the FileDataStream is passed in the pipeline. In general, this process is much faster than loading the whole dataset into memory beforehand, such as using pandas, since all the processes have been optimized to boost the performance.

The pipeline can be trained as following:

ppl.fit(data_stream_train)

print("Training time: "  + str(round(time.time() - t0, 2)))
Automatically adding a MinMax normalization transform, use 'norm=Warn' or 'norm=No' to turn this behavior off.
Training calibrator.
Elapsed time: 00:00:01.4548474
Training time: 5.37

Testing Pipeline

Similarly, the pipeline can be trained using .test() function using the FileDataStream based on the test_file.

metrics, scores = ppl.test(data_stream_test, output_scores = True)
print("Performance metrics: ")
display(metrics)
print("Individual scores: ")

display(scores[0:5]) #using the file stream, hard to visualize intermediate output and origin dataset
print("Total runtime: " + str(round(time.time() - t0)))
Performance metrics: 
AUC Accuracy Positive precision Positive recall Negative precision Negative recall Log-loss Log-loss reduction Test-set entropy (prior Log-Loss/instance) F1 Score AUPRC
0 0.573503 0.662791 0 0 0.662791 1 1.079578 -0.170753 0.922123 NaN 0.361565
Individual scores: 
PredictedLabel Score Probability
0 0 -0.236972 0.071740
1 0 -0.172740 0.356047
2 0 -0.218403 0.120105
3 0 -0.219571 0.116377
4 0 -0.181157 0.299347
Total runtime: 9