Saturday, 25 October 2008

Brainstorming a cleaner ETL API for FDO Toolbox

Something that I've been brainstorming on is a cleaner API for doing ETL (Extract, Transform, Load) operations in FDO Toolbox.

Currently, you have a bunch of classes that does a series of pre-defined transformation tasks (eg. Bulk Copy, Join with Database, etc). There is very little you can do in terms of customisation, and there is no real way in the current API to "chain" certain tasks together.

Also, additional transformation classes would have to be written to cater for additional types of transformation.

The proposed solution

The revised ETL API is heavily influenced by the pipe-and-filters pattern. A bulk copy task for example would now be done like this:


//Setup
IConnection src = ...;
IConnection dest = ...;

ISelect select = src.CreateCommand(CommandType.CommandType_Select) as ISelect;
select.SetFeatureClass(daKlass);
IFeatureReader reader = select.Execute();

//Create the ETL pipeline

IFdoInput input = new FdoFeatureInput(reader);
IFdoOutput output = new FdoFeatureOutput(dest, daKlass);

FdoTransformationPipeline pipeline = new FdoTransformationPipeline();

pipeline.RegisterOperation(input); //Input source
pipeline.RegisterOperation(output); //Output source

pipeline.Execute();


That connects an input source with an output source and all features read from the input source are written to the output source.

Pluggability

What this revised API allows for is much finer control of the ETL process by allowing you to "plug in" additional transformation steps into the pipeline.

For example, sometimes copying data from one source to another requires renaming of property names. Modifying the above sample:


IFdoInput input = new FdoFeatureInput(reader);
IFdoOutput output = new FdoFeatureOutput(dest, daKlass);
IFdoRenameFilter ren = new FdoRenameFilter();
ren.AddRenameRule("NAME", "OWNER_NAME");

FdoTransformationPipeline pipeline = new FdoTransformationPipeline();

pipeline.RegisterOperation(input); //Input source
pipeline.RegisterOperation(ren); //Rename filter
pipeline.RegisterOperation(output); //Output source

pipeline.Execute();


That code will rename the property "NAME" of any features passing through to "OWNER_NAME"

Or perhaps, sometimes the actual property values themselves require substitution. Modifying the same sample as above:


IFdoInput input = new FdoFeatureInput(reader);
IFdoOutput output = new FdoFeatureOutput(dest, daKlass);
IFdoSubstitutionFilter sub = new FdoSubstituionFilter();
sub.AddSubstituion("NAME", "SMITH", "BLOGGS");

FdoTransformationPipeline pipeline = new FdoTransformationPipeline();

pipeline.RegisterOperation(input); //Input source
pipeline.RegisterOperation(sub); //Substitution filter
pipeline.RegisterOperation(output); //Output source

pipeline.Execute();


This will now copy from input to output, while replacing all instances of "SMITH" to "BLOGGS" in any "NAME" property that the substitution filter will encounter.

Splitting/Forking Pipelines

The IFdoFork interface allows you to split the pipeline into multiple processing pipelines. So say we wanted to segregate all the features where NAME = "SMITH", the above code snippet would then be:


IFdoInput input = new FdoFeatureInput(reader);
IFdoOutput output = new FdoFeatureOutput(dest, daKlass);

//Create fork
IFdoFork fork = new FdoFork();
//Create hell
IFdoOperation hell = new FdoNullOperation();

//Set the default pipeline
fork.SetDefaultPipeline("Default");
//Add the hell pipeline
fork.RegisterPipeline("Hell", hell);
//Send all features where NAME = 'SMITH' to hell. Send others through to the default pipeline
fork.RegisterAttributeCondition("NAME = 'SMITH'", "Hell");

FdoTransformationPipeline pipeline = new FdoTransformationPipeline();

pipeline.RegisterOperation(input); //Input source
pipeline.RegisterOperation(fork); //Fork, all smiths go to hell, others go to output source
pipeline.RegisterOperation(output); //Output source

pipeline.Execute();


This will read all features from the input source, send all features where NAME = 'SMITH' to the FdoNullOperation (like sending data to /dev/null in *nix) and send the rest on its merry way to the output source.

Extensibility

Here is where the revised API will be a much better. All pipeline operations implement a simple interface called IFdoOperation:


/// <summary>
/// FDO feature operation interface. A operation does a processing task on each
/// feature that is passed to it.
/// </summary>
public interface IFdoOperation
{
/// <summary>
/// Processes the set of features
/// </summary>
/// <param name="features">The set of features to process</param>
/// <returns>The processed set of features</returns>
IEnumerable<FdoFeature> Process(IEnumerable<FdoFeature> features);
}


Thus to create your own transformation operations, you just have to implement this interface and provide your own implementaion of Process()

You might be wonder what on earth is a FdoFeature? FdoFeature is the fundamental unit in the ETL pipeline. It encapsulates what is read from the FDO IFeatureReader from the input source and what gets inserted on the output source.

Finally it is the thing that travels through each operation in the pipeline, getting manipulated along the way. The interface of FdoFeature currently looks like this (conceptually):


public class FdoFeature : ICloneable
{
public FdoFeature();
public void Clear();
public OSGeo.FDO.Expression.ValueExpression this[string propertyName] { get; set; }
public void Rename(string propertyName, string newPropertyName);
public void Remove(string propertyName);
public object Clone();
public OSGeo.FDO.Commands.PropertyValueCollection ToValueCollection();
}


As you can see, it is effectively a ValueExpression dictionary.

Conclusion

As you can see, the revised ETL API is much more cleaner and modular than the current one. There are still a few things to iron out conceptually (eg. how to merge pipelines? Is it even possible?)

Also there is a minor thing of these code samples are living in my head and not in a Visual Studio project :-) So watch this space.

No comments:

Post a Comment