Pipeline
Instead of calling fit!
and transform!
for each transformer to process time series data, we can use the Pipeline
transformer which does this automatically by iterating through the transformers and calling fit!
and transform!
repeatedly for each transformer in its argument.
Let's start again by using a function to generate a time series dataframe with some missing data.
julia> X = generateDataWithMissing();
julia> first(X,15)
15×2 DataFrame Row │ Date Value │ DateTime Float64? ─────┼───────────────────────────────────── 1 │ 2014-01-01T00:00:00 0.9063 2 │ 2014-01-01T00:15:00 missing 3 │ 2014-01-01T00:30:00 missing 4 │ 2014-01-01T00:45:00 missing 5 │ 2014-01-01T01:00:00 missing 6 │ 2014-01-01T01:15:00 0.334152 7 │ 2014-01-01T01:30:00 missing 8 │ 2014-01-01T01:45:00 missing 9 │ 2014-01-01T02:00:00 missing 10 │ 2014-01-01T02:15:00 missing 11 │ 2014-01-01T02:30:00 missing 12 │ 2014-01-01T02:45:00 0.136551 13 │ 2014-01-01T03:00:00 missing 14 │ 2014-01-01T03:15:00 missing 15 │ 2014-01-01T03:30:00 missing
Workflow of Pipeline
Let's use the pipeline transformer to aggregate and impute:
using TSML
dtvalgator = DateValgator(Dict(:dateinterval => Dates.Hour(1)))
dtvalnner = DateValLinearImputer(Dict(:dateinterval => Dates.Hour(1)))
mypipeline = dtvalgator |> dtvalnner
results = fit_transform!(mypipeline,X)
julia> first(results,10)
10×2 DataFrame Row │ Date Value │ DateTime Float64? ─────┼─────────────────────────────── 1 │ 2014-01-01T00:00:00 0.9063 2 │ 2014-01-01T01:00:00 0.334152 3 │ 2014-01-01T02:00:00 0.235352 4 │ 2014-01-01T03:00:00 0.136551 5 │ 2014-01-01T04:00:00 0.478823 6 │ 2014-01-01T05:00:00 0.305311 7 │ 2014-01-01T06:00:00 0.131798 8 │ 2014-01-01T07:00:00 0.450484 9 │ 2014-01-01T08:00:00 0.360583 10 │ 2014-01-01T09:00:00 0.571645
Using the Pipeline
transformer, it becomes straightforward to process the time series data. It also becomes trivial to extend TSML functionality by adding more transformers and making sure each support the fit!
and transform!
interfaces. Any new transformer can then be easily added to the Pipeline
workflow without invasively changing the existing codes.
Extending TSML
To illustrate how simple it is to add a new transformer, below extends TSML by adding CSVReader
transformer and added in the pipeline to process CSV data:
using TSML
import TSML.AbsTypes.fit!
import TSML.AbsTypes.transform!
mutable struct CSVReader <: Transformer
filename::String
model::Dict{Symbol,Any}
function CSVReader(args=Dict())
default_args = Dict(
:filename => "",
:dateformat => ""
)
margs = nested_dict_merge(default_args, args)
new(margs[:filename],margs)
end
end
function fit!(csvrdr::CSVReader,x::DataFrame=DataFrame(),y::Vector=[])
fname = csvrdr.model[:filename]
fmt = csvrdr.model[:dateformat]
(fname != "" && fmt != "") || error("missing filename or date format")
end
function transform!(csvrdr::CSVReader,x::DataFrame=DataFrame())
fname = csvrdr.model[:filename]
fmt = csvrdr.model[:dateformat]
df = CSV.read(fname)
ncol(df) == 2 || error("dataframe should have only two columns: Date,Value")
rename!(df,names(df)[1]=>:Date,names(df)[2]=>:Value)
df.Date = DateTime.(df.Date,fmt)
df
end
Instead of passing table X that contains the time series, we will add an instance of the CSVReader
at the start of the array of transformers in the pipeline to read the csv data. CSVReader transform!
function converts the csv time series table into a dataframe, which will be consumed by the next transformer in the pipeline for processing.
fname = joinpath(dirname(pathof(TSML)),"../data/testdata.csv")
csvreader = CSVDateValReader(Dict(:filename=>fname,:dateformat=>"d/m/y H:M"))
fit!(csvreader)
csvdata = transform!(csvreader)
julia> first(csvdata,10)
10×2 DataFrame Row │ Date Value │ DateTime Float64 ─────┼────────────────────────────── 1 │ 2014-01-01T00:06:00 10.0 2 │ 2014-01-01T00:18:00 10.0 3 │ 2014-01-01T00:29:00 10.0 4 │ 2014-01-01T00:40:00 9.9 5 │ 2014-01-01T00:51:00 9.9 6 │ 2014-01-01T01:02:00 10.0 7 │ 2014-01-01T01:13:00 9.8 8 │ 2014-01-01T01:24:00 10.0 9 │ 2014-01-01T01:35:00 9.8 10 │ 2014-01-01T01:46:00 10.0
Let us now include the newly created CSVReader
in the pipeline to read the csv data and process it by aggregation and imputation.
mypipeline = csvreader |> dtvalgator |> dtvalnner
results = fit_transform!(mypipeline)
julia> first(results,10)
10×2 DataFrame Row │ Date Value │ DateTime Float64? ─────┼─────────────────────────────── 1 │ 2014-01-01T00:00:00 10.0 2 │ 2014-01-01T01:00:00 9.9 3 │ 2014-01-01T02:00:00 10.0 4 │ 2014-01-01T03:00:00 10.0 5 │ 2014-01-01T04:00:00 10.0 6 │ 2014-01-01T05:00:00 10.0 7 │ 2014-01-01T06:00:00 10.0 8 │ 2014-01-01T07:00:00 9.8 9 │ 2014-01-01T08:00:00 9.85 10 │ 2014-01-01T09:00:00 9.9
Notice that there is no more the need to pass X in the arguments of fit!
and transform
because the data is now transmitted by the CSVReader
instance to the other transformers in the pipeline.