Pipeline

Pipeline

Instead of calling fit! and transform! for each transformer to process time series data, we can use the Pipeline transformer which does this automatically by iterating through the transformers and calling fit! and transform! repeatedly for each transformer in its argument.

Let's start again by using a function to generate a time series dataframe with some missing data.

X = generateDataWithMissing()
first(X,15)

15 rows × 2 columns

DateValue
Dates…Float64⍰
12014-01-01T00:00:00missing
22014-01-01T00:15:00missing
32014-01-01T00:30:00missing
42014-01-01T00:45:00missing
52014-01-01T01:00:00missing
62014-01-01T01:15:00missing
72014-01-01T01:30:00missing
82014-01-01T01:45:000.0521332
92014-01-01T02:00:000.26864
102014-01-01T02:15:000.108871
112014-01-01T02:30:000.163666
122014-01-01T02:45:000.473017
132014-01-01T03:00:000.865412
142014-01-01T03:15:00missing
152014-01-01T03:30:00missing

Workflow of Pipeline

Let's use the pipeline transformer to aggregate and impute:

using Dates
using TSML
using TSML.TSMLTypes
using TSML.TSMLTransformers
using TSML: Pipeline
using TSML: DateValgator
using TSML: DateValNNer

dtvalgator = DateValgator(Dict(:dateinterval => Dates.Hour(1)))
dtvalnner = DateValNNer(Dict(:dateinterval => Dates.Hour(1)))

mypipeline = Pipeline(
  Dict( :transformers => [
            dtvalgator,
            dtvalnner
         ]
  )
)

fit!(mypipeline,X)
results = transform!(mypipeline,X)
first(results,10)

10 rows × 2 columns

DateValue
Dates…Float64⍰
12014-01-01T00:00:000.108871
22014-01-01T01:00:000.108871
32014-01-01T02:00:000.108871
42014-01-01T03:00:000.473017
52014-01-01T04:00:000.361194
62014-01-01T05:00:000.582318
72014-01-01T06:00:000.918165
82014-01-01T07:00:000.614255
92014-01-01T08:00:000.690462
102014-01-01T09:00:000.92049

Using the Pipeline transformer, it becomes straightforward to process the time series data. It also becomes trivial to extend TSML functionality by adding more transformers and making sure each support the fit! and transform! interfaces. Any new transformer can then be easily added to the Pipeline workflow without invasively changing the existing codes.

Extending TSML

To illustrate how simple it is to add a new transformer, below extends TSML by adding CSVReader transformer and added in the pipeline to process CSV data:

using TSML.TSMLTypes
import TSML.TSMLTypes.fit!
import TSML.TSMLTypes.transform!

using CSV

mutable struct CSVReader <: Transformer
    model
    args
    function CSVReader(args=Dict())
        default_args = Dict(
            :filename => "",
            :dateformat => ""
        )
        new(nothing,mergedict(default_args,args))
    end
end

function fit!(csvrdr::CSVReader,x::T=[],y::Vector=[]) where {T<:Union{DataFrame,Vector,Matrix}}
    fname = csvrdr.args[:filename]
    fmt = csvrdr.args[:dateformat]
    (fname != "" && fmt != "") || error("missing filename or date format")
    model = csvrdr.args
end

function transform!(csvrdr::CSVReader,x::T=[]) where {T<:Union{DataFrame,Vector,Matrix}}
    fname = csvrdr.args[:filename]
    fmt = csvrdr.args[:dateformat]
    df = CSV.read(fname)
    ncol(df) == 2 || error("dataframe should have only two columns: Date,Value")
    rename!(df,names(df)[1]=>:Date,names(df)[2]=>:Value)
    df[:Date] = DateTime.(df[:Date],fmt)
    df
end
transform! (generic function with 31 methods)

Instead of passing table X that contains the time series, we will add an instance of theCSVReader at the start of the array of transformers in the pipeline to read the csv data. CSVReader transform! function converts the csv time series table into a dataframe, which will be consumed by the next transformer in the pipeline for processing.

fname = joinpath(dirname(pathof(TSML)),"../data/testdata.csv")
csvreader = CSVReader(Dict(:filename=>fname,:dateformat=>"d/m/y H:M"))
fit!(csvreader)
csvdata = transform!(csvreader)
first(csvdata,10)

10 rows × 2 columns

DateValue
Dates…Float64
12014-01-01T00:06:0010.0
22014-01-01T00:18:0010.0
32014-01-01T00:29:0010.0
42014-01-01T00:40:009.9
52014-01-01T00:51:009.9
62014-01-01T01:02:0010.0
72014-01-01T01:13:009.8
82014-01-01T01:24:0010.0
92014-01-01T01:35:009.8
102014-01-01T01:46:0010.0

Let us now include the newly created CSVReader in the pipeline to read the csv data and process it by aggregation and imputation.

mypipeline = Pipeline(
  Dict( :transformers => [
            csvreader,
            dtvalgator,
            dtvalnner
         ]
  )
)

fit!(mypipeline)
results = transform!(mypipeline)
first(results,10)

10 rows × 2 columns

DateValue
Dates…Float64⍰
12014-01-01T00:00:0010.0
22014-01-01T01:00:009.9
32014-01-01T02:00:0010.0
42014-01-01T03:00:0010.0
52014-01-01T04:00:0010.0
62014-01-01T05:00:0010.0
72014-01-01T06:00:0010.0
82014-01-01T07:00:009.8
92014-01-01T08:00:009.85
102014-01-01T09:00:009.9

Notice that there is no more the need to pass X in the arguments of fit! and transform because the data is now transmitted by the CSVReader instance to the other transformers in the pipeline.