Pipeline
Instead of calling fit! and transform! for each transformer to process time series data, we can use the Pipeline transformer which does this automatically by iterating through the transformers and calling fit! and transform! repeatedly for each transformer in its argument.
Let's start again by using a function to generate a time series dataframe with some missing data.
X = generateDataWithMissing()
first(X,15)| Date | Value | |
|---|---|---|
| Dates… | Float64⍰ | |
| 1 | 2014-01-01T00:00:00 | missing |
| 2 | 2014-01-01T00:15:00 | missing |
| 3 | 2014-01-01T00:30:00 | missing |
| 4 | 2014-01-01T00:45:00 | missing |
| 5 | 2014-01-01T01:00:00 | missing |
| 6 | 2014-01-01T01:15:00 | missing |
| 7 | 2014-01-01T01:30:00 | missing |
| 8 | 2014-01-01T01:45:00 | 0.0521332 |
| 9 | 2014-01-01T02:00:00 | 0.26864 |
| 10 | 2014-01-01T02:15:00 | 0.108871 |
| 11 | 2014-01-01T02:30:00 | 0.163666 |
| 12 | 2014-01-01T02:45:00 | 0.473017 |
| 13 | 2014-01-01T03:00:00 | 0.865412 |
| 14 | 2014-01-01T03:15:00 | missing |
| 15 | 2014-01-01T03:30:00 | missing |
Workflow of Pipeline
Let's use the pipeline transformer to aggregate and impute:
using Dates
using TSML
using TSML.TSMLTypes
using TSML.TSMLTransformers
using TSML: Pipeline
using TSML: DateValgator
using TSML: DateValNNer
dtvalgator = DateValgator(Dict(:dateinterval => Dates.Hour(1)))
dtvalnner = DateValNNer(Dict(:dateinterval => Dates.Hour(1)))
mypipeline = Pipeline(
Dict( :transformers => [
dtvalgator,
dtvalnner
]
)
)
fit!(mypipeline,X)
results = transform!(mypipeline,X)
first(results,10)| Date | Value | |
|---|---|---|
| Dates… | Float64⍰ | |
| 1 | 2014-01-01T00:00:00 | 0.108871 |
| 2 | 2014-01-01T01:00:00 | 0.108871 |
| 3 | 2014-01-01T02:00:00 | 0.108871 |
| 4 | 2014-01-01T03:00:00 | 0.473017 |
| 5 | 2014-01-01T04:00:00 | 0.361194 |
| 6 | 2014-01-01T05:00:00 | 0.582318 |
| 7 | 2014-01-01T06:00:00 | 0.918165 |
| 8 | 2014-01-01T07:00:00 | 0.614255 |
| 9 | 2014-01-01T08:00:00 | 0.690462 |
| 10 | 2014-01-01T09:00:00 | 0.92049 |
Using the Pipeline transformer, it becomes straightforward to process the time series data. It also becomes trivial to extend TSML functionality by adding more transformers and making sure each support the fit! and transform! interfaces. Any new transformer can then be easily added to the Pipeline workflow without invasively changing the existing codes.
Extending TSML
To illustrate how simple it is to add a new transformer, below extends TSML by adding CSVReader transformer and added in the pipeline to process CSV data:
using TSML.TSMLTypes
import TSML.TSMLTypes.fit!
import TSML.TSMLTypes.transform!
using CSV
mutable struct CSVReader <: Transformer
model
args
function CSVReader(args=Dict())
default_args = Dict(
:filename => "",
:dateformat => ""
)
new(nothing,mergedict(default_args,args))
end
end
function fit!(csvrdr::CSVReader,x::T=[],y::Vector=[]) where {T<:Union{DataFrame,Vector,Matrix}}
fname = csvrdr.args[:filename]
fmt = csvrdr.args[:dateformat]
(fname != "" && fmt != "") || error("missing filename or date format")
model = csvrdr.args
end
function transform!(csvrdr::CSVReader,x::T=[]) where {T<:Union{DataFrame,Vector,Matrix}}
fname = csvrdr.args[:filename]
fmt = csvrdr.args[:dateformat]
df = CSV.read(fname)
ncol(df) == 2 || error("dataframe should have only two columns: Date,Value")
rename!(df,names(df)[1]=>:Date,names(df)[2]=>:Value)
df[:Date] = DateTime.(df[:Date],fmt)
df
endtransform! (generic function with 31 methods)Instead of passing table X that contains the time series, we will add an instance of theCSVReader at the start of the array of transformers in the pipeline to read the csv data. CSVReader transform! function converts the csv time series table into a dataframe, which will be consumed by the next transformer in the pipeline for processing.
fname = joinpath(dirname(pathof(TSML)),"../data/testdata.csv")
csvreader = CSVReader(Dict(:filename=>fname,:dateformat=>"d/m/y H:M"))
fit!(csvreader)
csvdata = transform!(csvreader)
first(csvdata,10)| Date | Value | |
|---|---|---|
| Dates… | Float64 | |
| 1 | 2014-01-01T00:06:00 | 10.0 |
| 2 | 2014-01-01T00:18:00 | 10.0 |
| 3 | 2014-01-01T00:29:00 | 10.0 |
| 4 | 2014-01-01T00:40:00 | 9.9 |
| 5 | 2014-01-01T00:51:00 | 9.9 |
| 6 | 2014-01-01T01:02:00 | 10.0 |
| 7 | 2014-01-01T01:13:00 | 9.8 |
| 8 | 2014-01-01T01:24:00 | 10.0 |
| 9 | 2014-01-01T01:35:00 | 9.8 |
| 10 | 2014-01-01T01:46:00 | 10.0 |
Let us now include the newly created CSVReader in the pipeline to read the csv data and process it by aggregation and imputation.
mypipeline = Pipeline(
Dict( :transformers => [
csvreader,
dtvalgator,
dtvalnner
]
)
)
fit!(mypipeline)
results = transform!(mypipeline)
first(results,10)| Date | Value | |
|---|---|---|
| Dates… | Float64⍰ | |
| 1 | 2014-01-01T00:00:00 | 10.0 |
| 2 | 2014-01-01T01:00:00 | 9.9 |
| 3 | 2014-01-01T02:00:00 | 10.0 |
| 4 | 2014-01-01T03:00:00 | 10.0 |
| 5 | 2014-01-01T04:00:00 | 10.0 |
| 6 | 2014-01-01T05:00:00 | 10.0 |
| 7 | 2014-01-01T06:00:00 | 10.0 |
| 8 | 2014-01-01T07:00:00 | 9.8 |
| 9 | 2014-01-01T08:00:00 | 9.85 |
| 10 | 2014-01-01T09:00:00 | 9.9 |
Notice that there is no more the need to pass X in the arguments of fit! and transform because the data is now transmitted by the CSVReader instance to the other transformers in the pipeline.