Pipeline

Instead of calling fit! and transform! for each transformer to process time series data, we can use the Pipeline transformer which does this automatically by iterating through the transformers and calling fit! and transform! repeatedly for each transformer in its argument.

Let's start again by using a function to generate a time series dataframe with some missing data.

julia> X = generateDataWithMissing();
julia> first(X,15)15×2 DataFrame Row │ Date Value │ DateTime Float64? ─────┼───────────────────────────────────── 1 │ 2014-01-01T00:00:00 0.9063 2 │ 2014-01-01T00:15:00 missing 3 │ 2014-01-01T00:30:00 missing 4 │ 2014-01-01T00:45:00 missing 5 │ 2014-01-01T01:00:00 missing 6 │ 2014-01-01T01:15:00 0.334152 7 │ 2014-01-01T01:30:00 missing 8 │ 2014-01-01T01:45:00 missing 9 │ 2014-01-01T02:00:00 missing 10 │ 2014-01-01T02:15:00 missing 11 │ 2014-01-01T02:30:00 missing 12 │ 2014-01-01T02:45:00 0.136551 13 │ 2014-01-01T03:00:00 missing 14 │ 2014-01-01T03:15:00 missing 15 │ 2014-01-01T03:30:00 missing

Workflow of Pipeline

Let's use the pipeline transformer to aggregate and impute:

using TSML

dtvalgator = DateValgator(Dict(:dateinterval => Dates.Hour(1)))
dtvalnner = DateValLinearImputer(Dict(:dateinterval => Dates.Hour(1)))

mypipeline = dtvalgator |> dtvalnner

results = fit_transform!(mypipeline,X)
julia> first(results,10)10×2 DataFrame
 Row │ Date                 Value
     │ DateTime             Float64?
─────┼───────────────────────────────
   1 │ 2014-01-01T00:00:00  0.9063
   2 │ 2014-01-01T01:00:00  0.334152
   3 │ 2014-01-01T02:00:00  0.235352
   4 │ 2014-01-01T03:00:00  0.136551
   5 │ 2014-01-01T04:00:00  0.478823
   6 │ 2014-01-01T05:00:00  0.305311
   7 │ 2014-01-01T06:00:00  0.131798
   8 │ 2014-01-01T07:00:00  0.450484
   9 │ 2014-01-01T08:00:00  0.360583
  10 │ 2014-01-01T09:00:00  0.571645

Using the Pipeline transformer, it becomes straightforward to process the time series data. It also becomes trivial to extend TSML functionality by adding more transformers and making sure each support the fit! and transform! interfaces. Any new transformer can then be easily added to the Pipeline workflow without invasively changing the existing codes.

Extending TSML

To illustrate how simple it is to add a new transformer, below extends TSML by adding CSVReader transformer and added in the pipeline to process CSV data:

using TSML
import TSML.AbsTypes.fit!
import TSML.AbsTypes.transform!

mutable struct CSVReader <: Transformer
    filename::String
    model::Dict{Symbol,Any}

    function CSVReader(args=Dict())
        default_args = Dict(
            :filename => "",
            :dateformat => ""
        )
        margs = nested_dict_merge(default_args, args)
        new(margs[:filename],margs)
    end
end

function fit!(csvrdr::CSVReader,x::DataFrame=DataFrame(),y::Vector=[])
    fname = csvrdr.model[:filename]
    fmt = csvrdr.model[:dateformat]
    (fname != "" && fmt != "") || error("missing filename or date format")
end

function transform!(csvrdr::CSVReader,x::DataFrame=DataFrame())
    fname = csvrdr.model[:filename]
    fmt = csvrdr.model[:dateformat]
    df = CSV.read(fname)
    ncol(df) == 2 || error("dataframe should have only two columns: Date,Value")
    rename!(df,names(df)[1]=>:Date,names(df)[2]=>:Value)
    df.Date = DateTime.(df.Date,fmt)
    df
end

Instead of passing table X that contains the time series, we will add an instance of the CSVReader at the start of the array of transformers in the pipeline to read the csv data. CSVReader transform! function converts the csv time series table into a dataframe, which will be consumed by the next transformer in the pipeline for processing.

fname = joinpath(dirname(pathof(TSML)),"../data/testdata.csv")
csvreader = CSVDateValReader(Dict(:filename=>fname,:dateformat=>"d/m/y H:M"))
fit!(csvreader)
csvdata = transform!(csvreader)
julia> first(csvdata,10)10×2 DataFrame
 Row │ Date                 Value
     │ DateTime             Float64
─────┼──────────────────────────────
   1 │ 2014-01-01T00:06:00     10.0
   2 │ 2014-01-01T00:18:00     10.0
   3 │ 2014-01-01T00:29:00     10.0
   4 │ 2014-01-01T00:40:00      9.9
   5 │ 2014-01-01T00:51:00      9.9
   6 │ 2014-01-01T01:02:00     10.0
   7 │ 2014-01-01T01:13:00      9.8
   8 │ 2014-01-01T01:24:00     10.0
   9 │ 2014-01-01T01:35:00      9.8
  10 │ 2014-01-01T01:46:00     10.0

Let us now include the newly created CSVReader in the pipeline to read the csv data and process it by aggregation and imputation.

mypipeline = csvreader |> dtvalgator |> dtvalnner

results = fit_transform!(mypipeline)
julia> first(results,10)10×2 DataFrame
 Row │ Date                 Value
     │ DateTime             Float64?
─────┼───────────────────────────────
   1 │ 2014-01-01T00:00:00     10.0
   2 │ 2014-01-01T01:00:00      9.9
   3 │ 2014-01-01T02:00:00     10.0
   4 │ 2014-01-01T03:00:00     10.0
   5 │ 2014-01-01T04:00:00     10.0
   6 │ 2014-01-01T05:00:00     10.0
   7 │ 2014-01-01T06:00:00     10.0
   8 │ 2014-01-01T07:00:00      9.8
   9 │ 2014-01-01T08:00:00      9.85
  10 │ 2014-01-01T09:00:00      9.9

Notice that there is no more the need to pass X in the arguments of fit! and transform because the data is now transmitted by the CSVReader instance to the other transformers in the pipeline.