Pipeline
Instead of calling fit! and transform! for each transformer to process time series data, we can use the Pipeline transformer which does this automatically by iterating through the transformers and calling fit! and transform! repeatedly for each transformer in its argument.
Let's start again by using a function to generate a time series dataframe with some missing data.
julia> X = generateDataWithMissing();
julia> first(X,15)
15×2 DataFrame
│ Row │ Date │ Value │
│ │ DateTime │ Float64? │
├─────┼─────────────────────┼───────────┤
│ 1 │ 2014-01-01T00:00:00 │ missing │
│ 2 │ 2014-01-01T00:15:00 │ missing │
│ 3 │ 2014-01-01T00:30:00 │ missing │
│ 4 │ 2014-01-01T00:45:00 │ missing │
│ 5 │ 2014-01-01T01:00:00 │ missing │
│ 6 │ 2014-01-01T01:15:00 │ missing │
│ 7 │ 2014-01-01T01:30:00 │ missing │
│ 8 │ 2014-01-01T01:45:00 │ 0.0521332 │
│ 9 │ 2014-01-01T02:00:00 │ 0.26864 │
│ 10 │ 2014-01-01T02:15:00 │ 0.108871 │
│ 11 │ 2014-01-01T02:30:00 │ 0.163666 │
│ 12 │ 2014-01-01T02:45:00 │ 0.473017 │
│ 13 │ 2014-01-01T03:00:00 │ 0.865412 │
│ 14 │ 2014-01-01T03:15:00 │ missing │
│ 15 │ 2014-01-01T03:30:00 │ missing │Workflow of Pipeline
Let's use the pipeline transformer to aggregate and impute:
using TSML
dtvalgator = DateValgator(Dict(:dateinterval => Dates.Hour(1)))
dtvalnner = DateValNNer(Dict(:dateinterval => Dates.Hour(1)))
mypipeline = @pipeline dtvalgator |> dtvalnner
results = fit_transform!(mypipeline,X)julia> first(results,10)
10×2 DataFrame
│ Row │ Date │ Value │
│ │ DateTime │ Float64? │
├─────┼─────────────────────┼──────────┤
│ 1 │ 2014-01-01T00:00:00 │ 0.108871 │
│ 2 │ 2014-01-01T01:00:00 │ 0.108871 │
│ 3 │ 2014-01-01T02:00:00 │ 0.108871 │
│ 4 │ 2014-01-01T03:00:00 │ 0.473017 │
│ 5 │ 2014-01-01T04:00:00 │ 0.361194 │
│ 6 │ 2014-01-01T05:00:00 │ 0.582318 │
│ 7 │ 2014-01-01T06:00:00 │ 0.918165 │
│ 8 │ 2014-01-01T07:00:00 │ 0.614255 │
│ 9 │ 2014-01-01T08:00:00 │ 0.690462 │
│ 10 │ 2014-01-01T09:00:00 │ 0.92049 │Using the Pipeline transformer, it becomes straightforward to process the time series data. It also becomes trivial to extend TSML functionality by adding more transformers and making sure each support the fit! and transform! interfaces. Any new transformer can then be easily added to the Pipeline workflow without invasively changing the existing codes.
Extending TSML
To illustrate how simple it is to add a new transformer, below extends TSML by adding CSVReader transformer and added in the pipeline to process CSV data:
using TSML
import AutoMLPipeline.AbsTypes.fit!
import AutoMLPipeline.AbsTypes.transform!
mutable struct CSVReader <: Transformer
model
args
function CSVReader(args=Dict())
default_args = Dict(
:filename => "",
:dateformat => ""
)
new(nothing,mergedict(default_args,args))
end
end
function fit!(csvrdr::CSVReader,x::DataFrame=DataFrame(),y::Vector=[])
fname = csvrdr.args[:filename]
fmt = csvrdr.args[:dateformat]
(fname != "" && fmt != "") || error("missing filename or date format")
model = csvrdr.args
end
function transform!(csvrdr::CSVReader,x::DataFrame=DataFrame())
fname = csvrdr.model[:filename]
fmt = csvrdr.model[:dateformat]
df = CSV.read(fname)
ncol(df) == 2 || error("dataframe should have only two columns: Date,Value")
rename!(df,names(df)[1]=>:Date,names(df)[2]=>:Value)
df.Date = DateTime.(df.Date,fmt)
df
endInstead of passing table X that contains the time series, we will add an instance of the CSVReader at the start of the array of transformers in the pipeline to read the csv data. CSVReader transform! function converts the csv time series table into a dataframe, which will be consumed by the next transformer in the pipeline for processing.
fname = joinpath(dirname(pathof(TSML)),"../data/testdata.csv")
csvreader = CSVDateValReader(Dict(:filename=>fname,:dateformat=>"d/m/y H:M"))
fit!(csvreader)
csvdata = transform!(csvreader)julia> first(csvdata,10)
10×2 DataFrame
│ Row │ Date │ Value │
│ │ DateTime │ Float64 │
├─────┼─────────────────────┼─────────┤
│ 1 │ 2014-01-01T00:06:00 │ 10.0 │
│ 2 │ 2014-01-01T00:18:00 │ 10.0 │
│ 3 │ 2014-01-01T00:29:00 │ 10.0 │
│ 4 │ 2014-01-01T00:40:00 │ 9.9 │
│ 5 │ 2014-01-01T00:51:00 │ 9.9 │
│ 6 │ 2014-01-01T01:02:00 │ 10.0 │
│ 7 │ 2014-01-01T01:13:00 │ 9.8 │
│ 8 │ 2014-01-01T01:24:00 │ 10.0 │
│ 9 │ 2014-01-01T01:35:00 │ 9.8 │
│ 10 │ 2014-01-01T01:46:00 │ 10.0 │Let us now include the newly created CSVReader in the pipeline to read the csv data and process it by aggregation and imputation.
mypipeline = @pipeline csvreader |> dtvalgator |> dtvalnner
results = fit_transform!(mypipeline)julia> first(results,10)
10×2 DataFrame
│ Row │ Date │ Value │
│ │ DateTime │ Float64? │
├─────┼─────────────────────┼──────────┤
│ 1 │ 2014-01-01T00:00:00 │ 10.0 │
│ 2 │ 2014-01-01T01:00:00 │ 9.9 │
│ 3 │ 2014-01-01T02:00:00 │ 10.0 │
│ 4 │ 2014-01-01T03:00:00 │ 10.0 │
│ 5 │ 2014-01-01T04:00:00 │ 10.0 │
│ 6 │ 2014-01-01T05:00:00 │ 10.0 │
│ 7 │ 2014-01-01T06:00:00 │ 10.0 │
│ 8 │ 2014-01-01T07:00:00 │ 9.8 │
│ 9 │ 2014-01-01T08:00:00 │ 9.85 │
│ 10 │ 2014-01-01T09:00:00 │ 9.9 │Notice that there is no more the need to pass X in the arguments of fit! and transform because the data is now transmitted by the CSVReader instance to the other transformers in the pipeline.