Internals
This page documents internal, and less commonly-used API. Some of it will be useful for users for more advanced use-cases, like running graphs in production systems. Other parts shouldn't need to be regularly interacted with, but can be useful to know about.
Identity map
One of the key features of TimeDag
is avoiding duplicating work.
This is primarily achieved by ensuring that we never construct the 'same' node twice. By 'same', here we mean two nodes that we can prove will always give the same output.
One easy-to-handle case is that of a node that has identical parents & op to another. To avoid this, TimeDag
maintains a global identity map, of type TimeDag.IdentityMap
.
Currently the only implementation of the identity map is TimeDag.WeakIdentityMap
. This contains weak references to nodes, to ensure that we know about all nodes that currently exist, but don't unnecessarily prevent nodes from being garbage collected when we no longer want them.
In practice, all nodes should be constructed indirectly using TimeDag.obtain_node
. This will query the global_identity_map()
, and only construct a new node if an equivalent one does not already exist.
TimeDag.IdentityMap
— TypeIdentityMap
An abstract identity map.
Any implementation of this type needs to implement obtain_node!
.
TimeDag.obtain_node!
— Functionobtain_node!(id_map::IdentityMap, parents::NTuple{N,Node}, op::NodeOp) -> Node
If a node with parents
and op
doesn't exist inside id_map
, create and insert it.
Return either the new or existing node.
TimeDag.global_identity_map
— Functionglobal_identity_map() -> IdentityMap
Get the global IdentityMap instance used in TimeDag.
TimeDag.WeakNode
— TypeWeakNode(parents, op)
Represent a node-like object that doesn't hold strong references to its parents.
This exists purely such that hash
and ==
do allow multiple instances of WeakNode
to compare equal if they have the same parents
and op
.
TimeDag.WeakIdentityMap
— TypeWeakIdentityMap
Represent a collection of nodes which doesn't hold strong references to any nodes.
This is useful, as it allows the existence of this cache to be somewhat transparent to the user, and they only have to care about holding on to references for nodes that they care about.
This structure contains nodes, but also node weak nodes – these allow us to determine whether we ought to create a given node.
Advanced evaluation
This section goes into more detail about how evaluation works. We explain evaluation state, and discuss how to use the API in a Live system.
Evaluation state
Recall from the discussion in Concepts that we have the concept of Explicit state. For a particular node, this state will be a subtype of TimeDag.NodeEvaluationState
. For consistency, all nodes have an evaluation state — for nodes that are fundamentally stateless, we use TimeDag.EMPTY_NODE_STATE
.
A fresh evaluation state is created by a call to TimeDag.create_evaluation_state
. When creating a new TimeDag.NodeOp
, a new method of this function should be defined to return the appropriate state. The state is subsequently mutated in calls TimeDag.run_node!
, to reflect any changes induced over the time interval.
TimeDag.NodeEvaluationState
— Typeabstract type NodeEvaluationState end
Represents any and all state that a node must retain between evaluating batches.
Instances of subtypes of NodeEvaluationState
are given to run_node!
.
TimeDag.EmptyNodeEvaluationState
— TypeEmptyNodeEvaluationState()
A NodeEvaluationState
which has no content, to be used by non-stateful nodes.
In practice, the common instance TimeDag.EMPTY_NODE_STATE
can be used.
TimeDag.EMPTY_NODE_STATE
— Constantconst EMPTY_NODE_STATE
A singleton instance of TimeDag.EmptyNodeEvaluationState
.
TimeDag.create_evaluation_state
— Functioncreate_evaluation_state(node::Node) -> NodeEvaluationState
create_evaluation_state(parents, node::NodeOp) -> NodeEvaluationState
Create an empty evaluation state for the given node, when starting evaluation at the specified time.
TimeDag.stateless
— Functionstateless(node) -> Bool
stateless(op) -> Bool
Returns true iff op
can be assumed to be stateless; that is, if the node evaluation state is TimeDag.EMPTY_NODE_STATE
.
TimeDag.run_node!
— Functionrun_node!(
op::NodeOp{T},
state::NodeEvaluationState,
time_start::DateTime,
time_end::DateTime,
input_blocks::Block...
) -> Block{T}
Evaluate the given node from time_start
until time_end
, with the initial state
. Zero or more blocks will be passed as an input; these correspond to the parents of a node, and are passed in the same order as that returned by parents(node)
.
We return a new Block
of output knots from this node.
The implementer of run_node!
must ensure:
- No future peeking occurs: i.e. that no output knot is dependent on input knots that occur subsequently.
- Correct time range: all output timestamps must be in the interval
[time_start, time_end)
. - Consistency: Calling
run_node!
over a single interval should give the same result as calling it multiple times over a decomposition of that same interval. This ensures that the value returned byevaluate
is invariant to thebatch_interval
kwarg provided. - Determinism:
run_node!
should always be fully deterministic. If a pseudo-random number generator is required, it should be held on the evaluation state.
When an evaluation is performed, we need to save the state of all the nodes in the graph. We package this into an TimeDag.EvaluationState
instance. This object also retains the blocks from the nodes in whose output we're interested.
TimeDag.EvaluationState
— TypeEvaluationState
All state necessary for the evaluation of some nodes.
This should be created with start_at
.
Fields
ordered_node_to_children::OrderedDict{Node,Set{Node}}
: a map from every node which we need to run, to its children. The ordering of the elements is such that, if evaluated in this order, all dependencies will be evaluated before they are required.node_to_state::IdDict{Node,NodeEvaluationState}
: maintain the state for every node being evaluated.current_time::DateTime
: the time to which this state corresponds.evaluated_node_to_blocks::IdDict{Node,Vector{Block}}
: the output blocks that we care about.
Deconstructing evaluation
Whilst evaluate
is the primary API for TimeDag
, it is in fact a thin wrapper around a lower level API. Roughly, the steps involved are:
- Call
TimeDag.start_at
to create a newTimeDag.EvaluationState
for a collection of nodes. This will work out all the ancestors of the given nodes that also need to be evaluated. - Perform one or more calls to
TimeDag.evaluate_until!
, depending on the batch interval. Each call will update the evaluation state. Interenally, this callsTimeDag.run_node!
for every ancestor node. - Once the end of the evaluation interval has been reached, we extract the blocks for the nodes of interest from the evaluation state. These are concatenated and returned to the user.
TimeDag.start_at
— Functionstart_at(nodes, time_start::DateTime) -> EvaluationState
Create a sufficient EvaluationState
for the evaluation of nodes
.
Internally, this will determine the subgraph that needs evaluating, i.e. all the ancestors of nodes
, and create a NodeEvaluationState
for each of these.
TimeDag.evaluate_until!
— Functionevaluate_until!(state::EvaluationState, time_end::DateTime)
Update the evaluation state by performing the evalution for each node.
Live system
Consider the case where some history of data is available, say in a database, and new data is added continually, e.g. as it is recorded from a sensor. Suppose we have built a TimeDag
graph representing the computation we wish to perform on this data. We can perform the following steps:
- Initialise the state with
TimeDag.start_at
. - Initialise the model with one (or more) calls to
TimeDag.evaluate_until!
. This is used to pull through historical data, e.g. to initialise stateful nodes like moving averages. - In real time, poll for new data with repeated calls to
TimeDag.evaluate_until!
The performance of this setup is naturally dependent upon the complexity of the model being evaluated. However, if models are appropriately designed to have efficient online updates, then the underlying overhead of TimeDag
is sufficiently small for this to be usable with latencies of down to O(milliseconds).
# Some arbitrary data source - here just use random numbers.
x = rand(pulse(Second(1)))
# Compute a rolling mean and standard deviation.
# Corresponds to 24-hour rolling windows, given one data point per second.
n1, n2 = mean(x, 86400), std(x, 86400)
# Initialise state over a long history.
state = TimeDag.start_at([n1, n2], DateTime(2019))
state = TimeDag.evaluate_until!(state, DateTime(2020))
# Simulate an incremental update over a few hours.
@time state = TimeDag.evaluate_until!(state, DateTime(2020, 1, 1, 3))
0.004887 seconds (75.23 k allocations: 2.387 MiB)
Note that this approach is unlikely to be suitable for lower latency applications (e.g. microseconds). For that case, one may benefit from a "push mode" evaluation, where new data are pushed onto the graph, and only affected nodes are re-evaluated. Such a feature isn't currently planned.
Scheduling
TimeDag
currently runs all nodes in a single thread, however this is subject to change in the future.
Alignment implementation
If we want to define a new op that follows alignment semantics, it should derive from one of the following types.
TimeDag.UnaryNodeOp
— TypeUnaryNodeOp{T} <: NodeOp{T}
An abstract type representing a node op with a single parent.
TimeDag.BinaryNodeOp
— TypeBinaryNodeOp{T,A<:Alignment} <: NodeOp{T}
An abstract type representing a node op with two parents, and using alignment A
.
TimeDag.NaryNodeOp
— TypeNaryNodeOp{N,T,A<:Alignment} <: NodeOp{T}
An abstract type representing a node op with N
parents, and using alignment A
.
This type should be avoided for N < 3
, since in these cases it would be more appropriate to use either TimeDag.UnaryNodeOp
or TimeDag.BinaryNodeOp
.
Instead of implementing TimeDag.run_node!
directly, one instead implements some of the following functions. The exact alignment logic is then encapsulated, and doesn't need to be dealt with directly.
TimeDag.operator!
— Functionoperator!(op::UnaryNodeOp{T}, (state,), (time,) x) -> T / Maybe{T}
operator!(op::BinaryNodeOp{T}, (state,), (time,) x, y) -> T / Maybe{T}
operator!(op::NaryNodeOp{N,T}, (state,), (time,) x, y, z...) -> T / Maybe{T}
Perform the operation for this node.
When defining a method of this for a new op, follow these rules:
state
should be omitted iffTimeDag.stateless_operator
.time
should be omitted iffTimeDag.time_agnostic
.- All values
x, y, z...
should be omittted iffTimeDag.value_agnostic
.
For stateful operations, this operator should mutate state
as required.
The return value out
should be of type T
iff TimeDag.always_ticks
is true, otherwise it should be of type TimeDag.Maybe
{T}
.
If out <: Maybe{T}
, and has !valid(out)
, this indicates that we do not wish to emit a knot at this time, and it will be skipped. Otherwise, value(out)
will be used as the output value.
TimeDag.always_ticks
— Functionalways_ticks(node) -> Bool
always_ticks(op) -> Bool
Returns true iff the return value from operator!
can be assumed to always be valid.
If true
, operator!(::Node{T}, ...)
should return a T
. If false
, operator!(::Node{T}, ...)
should return a Maybe{T}
.
Note, that for sensible performance characteristics, this should be knowable from typeof(op)
TimeDag.stateless_operator
— Functionstateless_operator(node) -> Bool
stateless_operator(op) -> Bool
Returns true iff operator(op, ...)
would never look at or modify the evaluation state.
If this returns true, create_operator_evaluation_state
will not be used.
Note that if an op
has stateless(op)
returning true, then it necessarily should also return true here. The default implementation is to return stateless(op)
, meaning that if one is creating a node that is fully stateless, one need only define stateless
.
For optimal performance, this should be knowable from the type of op
alone.
TimeDag.time_agnostic
— Functiontime_agnostic(node) -> Bool
time_agnostic(op) -> Bool
Returns true iff op
does not care about the time of the input knot(s).
For optimal performance, this should be knowable from the type of op
alone.
TimeDag.value_agnostic
— Functionvalue_agnostic(node) -> Bool
value_agnostic(op) -> Bool
Returns true iff op
does not care about the value(s) of the input knot(s).
For optimal performance, this should be knowable from the type of op
alone.
TimeDag.has_initial_values
— Functionhas_initial_values(op) -> Bool
If this returns true, it indicates that initial values for the op
's parents are specified.
See the documentation on Initial values for further information.
TimeDag.initial_left
— Functioninitial_left(op::BinaryNodeOp) -> L
Specify the initial value to use for the first parent of the given op
.
Needs to be defined if has_initial_values
returns true, and alignment is UNION
. For other alignments it is not required.
TimeDag.initial_right
— Functioninitial_right(op::BinaryNodeOp) -> R
Specify the initial value to use for the second parent of the given op
.
Needs to be defined if has_initial_values(op)
returns true, and alignment is UNION
or LEFT
. For INTERSECT
alignment it is not required.
TimeDag.initial_values
— Functioninitial_values(op::NaryNodeOp) -> Tuple
Specify the initial values to use for all parents of the given op
.
Needs to be defined for nary ops for which has_initial_values
returns true.
TimeDag.create_operator_evaluation_state
— Functioncreate_operator_evaluation_state(parents, op::NodeOp) -> NodeEvaluationState
Create an empty evaluation state for the given node, when starting evaluation at the specified time.
Note that this is state that will be passed to operator
. The overall node may additionally wrap this state with further state, if this is necessary for e.g. alignment.
For simple cases, the following node ops can be useful.
Rather than using the structures below directly, you probably want to use TimeDag.apply
, wrap
, or wrapb
.
TimeDag.SimpleUnary
— TypeSimpleUnary{f,TimeAgnostic,T}
Represents a stateless unary operator that will always emit a value.
The value of the TimeAgnostic
type parmater is coupled to time_agnostic
.
TimeDag.SimpleBinary
— TypeSimpleBinary{f,TimeAgnostic,T,A}
Represents a stateless binary operator that will always emit a value.
The value of the TimeAgnostic
type parmater is coupled to time_agnostic
.
TimeDag.SimpleBinaryUnionInitial
— TypeSimpleBinaryUnionInitial{f,TimeAgnostic,T,L,R}
Represents a stateless binary operator that will always emit a value.
The value of the TimeAgnostic
type parmater is coupled to time_agnostic
.
Unlike SimpleBinary
, this also contains initial values for its parent nodes. See Initial values for more details.
TimeDag.SimpleBinaryLeftInitial
— TypeSimpleBinaryLeftInitial{f,T,R}
Represents a stateless binary operator that will always emit a value.
The value of the TimeAgnostic
type parmater is coupled to time_agnostic
.
Unlike SimpleBinary
, this also contains initial values for its right parent. See Initial values for more details.
TimeDag.SimpleNary
— TypeSimpleNary{f,TimeAgnostic,N,T,A}
Represents a stateless N
ary operator that will always emit a value.
The value of the TimeAgnostic
type parmater is coupled to time_agnostic
.
TimeDag.SimpleNaryInitial
— TypeSimpleNaryInitial{f,TimeAgnostic,N,T,A,Types}
Represents a stateless binary operator that will always emit a value.
The value of the TimeAgnostic
type parmater is coupled to time_agnostic
.
Unlike SimpleNary
, this also contains initial values. See Initial values for more details.
Maybe
TimeDag.Maybe
— TypeMaybe{T}()
Maybe(value::T)
A structure which can hold a value of type T
, or represent the absence of a value.
The API is optimised for speed over memory usage, by allowing a function that may otherwise return Union{T, Nothing}
to instead always return Maybe{T}
, and hence be type-stable.
TimeDag.valid
— Functionvalid(x::Maybe) -> Bool
Return true iff x
holds a value.
TimeDag.value
— Functionvalue(x::Maybe{T}) -> T
Returns the value stored in x
, or throws an ArgumentError
if !valid(x)
.
Note that, in a tight loop, it is preferable to use a combination of calls to valid
and unsafe_value
, as it will generate more optimal code.
TimeDag.unsafe_value
— Functionunsafe_value(x::Maybe{T}) -> T
Returns the value stored in x
.
It is "unsafe" when !valid(x)
, in that the return value of this function is undefined. If T
is a reference type, calling this function will result in an UndefRefError
being thrown.
Other
TimeDag.output_type
— Functionoutput_type(f, arg_types...)
Return the output type of the specified function. Tries to be fast where possible.
This uses Base.promote_op
, which is noted to be fragile. The problem is that whilst one might hope that typeof(f(map(oneunit, arg_types)...))
could be used, in practice there are a lot of types which do not define oneunit
.
Ultimately this represents a tension between the desire of TimeDag
to know the type of the output of a node without yet knowing the concrete values of the input type.
TimeDag.duplicate
— Functionduplicate(x)
Return an object that is equal to x
, but fully independent of it.
Note that for any parts of x
that TimeDag
considers to be immutable (e.g. Block
s), this can return the identical object.
Conceptually this is otherwise very similar to deepcopy(x)
.