Internals

This page documents internal, and less commonly-used API. Some of it will be useful for users for more advanced use-cases, like running graphs in production systems. Other parts shouldn't need to be regularly interacted with, but can be useful to know about.

Identity map

One of the key features of TimeDag is avoiding duplicating work.

This is primarily achieved by ensuring that we never construct the 'same' node twice. By 'same', here we mean two nodes that we can prove will always give the same output.

One easy-to-handle case is that of a node that has identical parents & op to another. To avoid this, TimeDag maintains a global identity map, of type TimeDag.IdentityMap.

Currently the only implementation of the identity map is TimeDag.WeakIdentityMap. This contains weak references to nodes, to ensure that we know about all nodes that currently exist, but don't unnecessarily prevent nodes from being garbage collected when we no longer want them.

In practice, all nodes should be constructed indirectly using TimeDag.obtain_node. This will query the global_identity_map(), and only construct a new node if an equivalent one does not already exist.

TimeDag.obtain_node!Function
obtain_node!(id_map::IdentityMap, parents::NTuple{N,Node}, op::NodeOp) -> Node

If a node with parents and op doesn't exist inside id_map, create and insert it.

Return either the new or existing node.

source
TimeDag.WeakNodeType
WeakNode(parents, op)

Represent a node-like object that doesn't hold strong references to its parents.

This exists purely such that hash and == do allow multiple instances of WeakNode to compare equal if they have the same parents and op.

source
TimeDag.WeakIdentityMapType
WeakIdentityMap

Represent a collection of nodes which doesn't hold strong references to any nodes.

This is useful, as it allows the existence of this cache to be somewhat transparent to the user, and they only have to care about holding on to references for nodes that they care about.

This structure contains nodes, but also node weak nodes – these allow us to determine whether we ought to create a given node.

source

Advanced evaluation

This section goes into more detail about how evaluation works. We explain evaluation state, and discuss how to use the API in a Live system.

Evaluation state

Recall from the discussion in Concepts that we have the concept of Explicit state. For a particular node, this state will be a subtype of TimeDag.NodeEvaluationState. For consistency, all nodes have an evaluation state — for nodes that are fundamentally stateless, we use TimeDag.EMPTY_NODE_STATE.

A fresh evaluation state is created by a call to TimeDag.create_evaluation_state. When creating a new TimeDag.NodeOp, a new method of this function should be defined to return the appropriate state. The state is subsequently mutated in calls TimeDag.run_node!, to reflect any changes induced over the time interval.

TimeDag.NodeEvaluationStateType
abstract type NodeEvaluationState end

Represents any and all state that a node must retain between evaluating batches.

Instances of subtypes of NodeEvaluationState are given to run_node!.

source
TimeDag.create_evaluation_stateFunction
create_evaluation_state(node::Node) -> NodeEvaluationState
create_evaluation_state(parents, node::NodeOp) -> NodeEvaluationState

Create an empty evaluation state for the given node, when starting evaluation at the specified time.

source
TimeDag.run_node!Function
run_node!(
    op::NodeOp{T},
    state::NodeEvaluationState,
    time_start::DateTime,
    time_end::DateTime,
    input_blocks::Block...
) -> Block{T}

Evaluate the given node from time_start until time_end, with the initial state. Zero or more blocks will be passed as an input; these correspond to the parents of a node, and are passed in the same order as that returned by parents(node).

We return a new Block of output knots from this node.

Warning

The implementer of run_node! must ensure:

  • No future peeking occurs: i.e. that no output knot is dependent on input knots that occur subsequently.
  • Correct time range: all output timestamps must be in the interval [time_start, time_end).
  • Consistency: Calling run_node! over a single interval should give the same result as calling it multiple times over a decomposition of that same interval. This ensures that the value returned by evaluate is invariant to the batch_interval kwarg provided.
  • Determinism: run_node! should always be fully deterministic. If a pseudo-random number generator is required, it should be held on the evaluation state.
source

When an evaluation is performed, we need to save the state of all the nodes in the graph. We package this into an TimeDag.EvaluationState instance. This object also retains the blocks from the nodes in whose output we're interested.

TimeDag.EvaluationStateType
EvaluationState

All state necessary for the evaluation of some nodes.

This should be created with start_at.

Fields

  • ordered_node_to_children::OrderedDict{Node,Set{Node}}: a map from every node which we need to run, to its children. The ordering of the elements is such that, if evaluated in this order, all dependencies will be evaluated before they are required.
  • node_to_state::IdDict{Node,NodeEvaluationState}: maintain the state for every node being evaluated.
  • current_time::DateTime: the time to which this state corresponds.
  • evaluated_node_to_blocks::IdDict{Node,Vector{Block}}: the output blocks that we care about.
source

Deconstructing evaluation

Whilst evaluate is the primary API for TimeDag, it is in fact a thin wrapper around a lower level API. Roughly, the steps involved are:

  1. Call TimeDag.start_at to create a new TimeDag.EvaluationState for a collection of nodes. This will work out all the ancestors of the given nodes that also need to be evaluated.
  2. Perform one or more calls to TimeDag.evaluate_until!, depending on the batch interval. Each call will update the evaluation state. Interenally, this calls TimeDag.run_node! for every ancestor node.
  3. Once the end of the evaluation interval has been reached, we extract the blocks for the nodes of interest from the evaluation state. These are concatenated and returned to the user.
TimeDag.start_atFunction
start_at(nodes, time_start::DateTime) -> EvaluationState

Create a sufficient EvaluationState for the evaluation of nodes.

Internally, this will determine the subgraph that needs evaluating, i.e. all the ancestors of nodes, and create a NodeEvaluationState for each of these.

source
TimeDag.evaluate_until!Function
evaluate_until!(state::EvaluationState, time_end::DateTime)

Update the evaluation state by performing the evalution for each node.

source

Live system

Consider the case where some history of data is available, say in a database, and new data is added continually, e.g. as it is recorded from a sensor. Suppose we have built a TimeDag graph representing the computation we wish to perform on this data. We can perform the following steps:

  1. Initialise the state with TimeDag.start_at.
  2. Initialise the model with one (or more) calls to TimeDag.evaluate_until!. This is used to pull through historical data, e.g. to initialise stateful nodes like moving averages.
  3. In real time, poll for new data with repeated calls to TimeDag.evaluate_until!

The performance of this setup is naturally dependent upon the complexity of the model being evaluated. However, if models are appropriately designed to have efficient online updates, then the underlying overhead of TimeDag is sufficiently small for this to be usable with latencies of down to O(milliseconds).

# Some arbitrary data source - here just use random numbers.
x = rand(pulse(Second(1)))

# Compute a rolling mean and standard deviation.
# Corresponds to 24-hour rolling windows, given one data point per second.
n1, n2 = mean(x, 86400), std(x, 86400)

# Initialise state over a long history.
state = TimeDag.start_at([n1, n2], DateTime(2019))
state = TimeDag.evaluate_until!(state, DateTime(2020))

# Simulate an incremental update over a few hours.
@time state = TimeDag.evaluate_until!(state, DateTime(2020, 1, 1, 3))
  0.007820 seconds (75.24 k allocations: 2.387 MiB)

Note that this approach is unlikely to be suitable for lower latency applications (e.g. microseconds). For that case, one may benefit from a "push mode" evaluation, where new data are pushed onto the graph, and only affected nodes are re-evaluated. Such a feature isn't currently planned.

Scheduling

TimeDag currently runs all nodes in a single thread, however this is subject to change in the future.

Alignment implementation

If we want to define a new op that follows alignment semantics, it should derive from one of the following types.

TimeDag.BinaryNodeOpType
BinaryNodeOp{T,A<:Alignment} <: NodeOp{T}

An abstract type representing a node op with two parents, and using alignment A.

source

Instead of implementing TimeDag.run_node! directly, one instead implements some of the following functions. The exact alignment logic is then encapsulated, and doesn't need to be dealt with directly.

TimeDag.operator!Function
operator!(op::UnaryNodeOp{T}, (state,), (time,) x) -> T / Maybe{T}
operator!(op::BinaryNodeOp{T}, (state,), (time,) x, y) -> T / Maybe{T}
operator!(op::NaryNodeOp{N,T}, (state,), (time,) x, y, z...) -> T / Maybe{T}

Perform the operation for this node.

When defining a method of this for a new op, follow these rules:

For stateful operations, this operator should mutate state as required.

The return value out should be of type T iff TimeDag.always_ticks is true, otherwise it should be of type TimeDag.Maybe{T}.

If out <: Maybe{T}, and has !valid(out), this indicates that we do not wish to emit a knot at this time, and it will be skipped. Otherwise, value(out) will be used as the output value.

source
TimeDag.always_ticksFunction
always_ticks(node) -> Bool
always_ticks(op) -> Bool

Returns true iff the return value from operator! can be assumed to always be valid.

If true, operator!(::Node{T}, ...) should return a T. If false, operator!(::Node{T}, ...) should return a Maybe{T}.

Note, that for sensible performance characteristics, this should be knowable from typeof(op)

source
TimeDag.stateless_operatorFunction
stateless_operator(node) -> Bool
stateless_operator(op) -> Bool

Returns true iff operator(op, ...) would never look at or modify the evaluation state.

If this returns true, create_operator_evaluation_state will not be used.

Note that if an op has stateless(op) returning true, then it necessarily should also return true here. The default implementation is to return stateless(op), meaning that if one is creating a node that is fully stateless, one need only define stateless.

For optimal performance, this should be knowable from the type of op alone.

source
TimeDag.time_agnosticFunction
time_agnostic(node) -> Bool
time_agnostic(op) -> Bool

Returns true iff op does not care about the time of the input knot(s).

For optimal performance, this should be knowable from the type of op alone.

source
TimeDag.value_agnosticFunction
value_agnostic(node) -> Bool
value_agnostic(op) -> Bool

Returns true iff op does not care about the value(s) of the input knot(s).

For optimal performance, this should be knowable from the type of op alone.

source
TimeDag.initial_leftFunction
initial_left(op::BinaryNodeOp) -> L

Specify the initial value to use for the first parent of the given op.

Needs to be defined if has_initial_values returns true, and alignment is UNION. For other alignments it is not required.

source
TimeDag.initial_rightFunction
initial_right(op::BinaryNodeOp) -> R

Specify the initial value to use for the second parent of the given op.

Needs to be defined if has_initial_values(op) returns true, and alignment is UNION or LEFT. For INTERSECT alignment it is not required.

source
TimeDag.create_operator_evaluation_stateFunction
create_operator_evaluation_state(parents, op::NodeOp) -> NodeEvaluationState

Create an empty evaluation state for the given node, when starting evaluation at the specified time.

Note that this is state that will be passed to operator. The overall node may additionally wrap this state with further state, if this is necessary for e.g. alignment.

source

For simple cases, the following node ops can be useful.

Tip

Rather than using the structures below directly, you probably want to use TimeDag.apply, wrap, or wrapb.

TimeDag.SimpleUnaryType
SimpleUnary{f,TimeAgnostic,T}

Represents a stateless unary operator that will always emit a value.

The value of the TimeAgnostic type parmater is coupled to time_agnostic.

source
TimeDag.SimpleBinaryType
SimpleBinary{f,TimeAgnostic,T,A}

Represents a stateless binary operator that will always emit a value.

The value of the TimeAgnostic type parmater is coupled to time_agnostic.

source
TimeDag.SimpleNaryType
SimpleNary{f,TimeAgnostic,N,T,A}

Represents a stateless Nary operator that will always emit a value.

The value of the TimeAgnostic type parmater is coupled to time_agnostic.

source

Maybe

TimeDag.MaybeType
Maybe{T}()
Maybe(value::T)

A structure which can hold a value of type T, or represent the absence of a value.

The API is optimised for speed over memory usage, by allowing a function that may otherwise return Union{T, Nothing} to instead always return Maybe{T}, and hence be type-stable.

source
TimeDag.valueFunction
value(x::Maybe{T}) -> T

Returns the value stored in x, or throws an ArgumentError if !valid(x).

Note that, in a tight loop, it is preferable to use a combination of calls to valid and unsafe_value, as it will generate more optimal code.

source
TimeDag.unsafe_valueFunction
unsafe_value(x::Maybe{T}) -> T

Returns the value stored in x.

It is "unsafe" when !valid(x), in that the return value of this function is undefined. If T is a reference type, calling this function will result in an UndefRefError being thrown.

source

Other

TimeDag.output_typeFunction
output_type(f, arg_types...)

Return the output type of the specified function. Tries to be fast where possible.

Warning

This uses Base.promote_op, which is noted to be fragile. The problem is that whilst one might hope that typeof(f(map(oneunit, arg_types)...)) could be used, in practice there are a lot of types which do not define oneunit.

Ultimately this represents a tension between the desire of TimeDag to know the type of the output of a node without yet knowing the concrete values of the input type.

source
TimeDag.duplicateFunction
duplicate(x)

Return an object that is equal to x, but fully independent of it.

Note that for any parts of x that TimeDag considers to be immutable (e.g. Blocks), this can return the identical object.

Conceptually this is otherwise very similar to deepcopy(x).

source