The queuing system of CTA is implemented over an object store. The targeted implementation is Ceph, which scales horizontally and provides parallel access to objects. A Ceph cluster also provides excellent resilience against component failures. This is preferred over databases that do not provide a good modeling of multiple independent queues and objects. Databases also struggle shrinking tables that once contained lots of entries, which is the fate of a queue. Classical databases are also single point of failure, contention, and regularly require downtime for software maintenance.
The queuing system of CTA is implemented over an Object Store. This is preferred over databases that do not provide a
good modeling of multiple independent queues and objects. Databases also struggle to shrink tables that once contained
lots of entries, which is the fate of a queue. Classical databases are also a single point of failure and contention,
and regularly require downtime for software maintenance.
The targeted implementation is Ceph, which scales horizontally and provides parallel access to objects. A Ceph cluster
also provides excellent resilience against component failures.
The CTA Scheduler object relies on a SchedulerDatabase object to store the queuing related information.
The techniques employed in the object store have two aspects: first, the in-memory representation of individual objects, the functions used to serialize and de-serialize data between memory and object store, and second the connection of the objects together to constitute a multi-object structure. As the object store only provides per-object transactions, safe multi-object operations require usage of a few conventions. Finally, a garbage collector allows resetting objects left behind by crashed processes, by re-queuing requests, and deleting uncommitted objects.
The techniques employed in the Object Store have several aspects:
\begin{enumerate}
\item The in-memory representation of individual objects and the functions used to serialize and de-serialize data
between memory and Object Store.
\item The connection of the objects together to constitute a multi-object structure. As the Object Store only provides
per-object transactions, safe multi-object operations require usage of a few conventions.
\item Finally, a garbage collector allows resetting objects left behind by crashed processes, by re-queuing requests
and deleting uncommitted objects.
\end{enumerate}
\section{Classes and memory side representation}
The processes of CTA (namely user front end and tape drive), rely on a shared Scheduler object to queue, dequeue and report about data transfer requests. The Scheduler itself relies on an object store based SchedulerDatabase for queuing, and a file Catalogue to keep persistent information about stored files.
The OStoreDB implementation of the SchedulerDatabase interface relies on a collection of classes in the cta::objectstore namespace. Those classes, are responsible for providing the high level functionality specific to each object type, on top of the common methods provided by all objects (lock, fetch, commit, etc..). The common part is inherited from the template ObjectOps. The parameter to this template is the Google protocol buffer type used to serialize the content of the object to persistent storage. The commonalities of all the template instances are inherited from a base class ObjectOpsBase. This base class in used for special operations that can apply to any object type, namely garbage collection. The memory side class hierarchy is shown in figure~\ref{fig:OStoreClasses}.
The processes of CTA (namely user front end and tape drive) rely on a shared Scheduler object to queue, dequeue and
report about data transfer requests. The Scheduler itself relies on an Object Store-based SchedulerDatabase for
queuing, and a file Catalogue to keep persistent information about stored files.
The OStoreDB implementation of the SchedulerDatabase interface relies on a collection of classes in the cta::objectstore
namespace. Those classes are responsible for providing the high level functionality specific to each object type, on top
of the common methods provided by all objects (lock, fetch, commit, etc.). The common part is inherited from the
template ObjectOps. The parameter to this template is the Google protocol buffer type used to serialize the content of
the object to persistent storage. The commonalities of all the template instances are inherited from a base class
ObjectOpsBase. This base class in used for special operations that can apply to any object type, namely garbage
collection. The memory side class hierarchy is shown in figure~\ref{fig:OStoreClasses}.
\begin{figure}[h]
\begin{tikzpicture}
...
...
@@ -44,12 +68,21 @@ The OStoreDB implementation of the SchedulerDatabase interface relies on a colle
\label{fig:OStoreClasses}
\end{figure}
\section{Data model and object store side representation}
To achieve even performance with various amounts of requests queued, the implementation will store the requests into queues, one per tape pool for archival and one per tape for retrieves. The targeted queuing and dequeuing complexity is $\mathcal{O}(1)$, but higher order complexity is necessary for retrieve queues, where requests are stored in tape location order and not arrival order.
\section{Data model and Object Store side representation}
To achieve even performance with various amounts of requests queued, the implementation will store the requests into
queues, one per \gls{tapepool} for archival and one per tape for retrieves. The targeted queuing and dequeuing complexity
is $\mathcal{O}(1)$, but higher order complexity is necessary for retrieve queues, where requests are stored in tape
location order and not arrival order.
The object store contains one queue per tape pool for archival, one queue per tape for retrieval. The status of the drives is also stored, with which tape they are working on. A singleton object is used a lock, as the mount scheduling is executed one drive at a time. The combination of how much is queued for each tape and tape pool, plus what is currently being done by other drives is used to determine the next mount for the drive being scheduled.
The Object Store contains one queue per tape pool for archival, one queue per tape for retrieval. The status of the
drives is also stored, with which tape they are working on. A singleton object is used as a lock, as the mount scheduling
is executed one drive at a time. The combination of how much is queued for each tape and tape pool, plus what is
currently being done by other drives is used to determine the next mount for the drive being scheduled.
Finally each actor on the object store is represented as a Agent object, which is keeps references to objects created and worked on by the actor, preventing object leak. The data model of the object store is shown in figure\ \ref{fig:OStoreDatamodel}.
Finally each actor on the Object Store is represented as a Agent object, which keeps references to objects created and
worked on by the actor, preventing object leak. The data model of the Object Store is shown in
figure~\ref{fig:OStoreDatamodel}.
\begin{figure}[h]
\scalebox{0.9}{
...
...
@@ -116,10 +149,10 @@ Finally each actor on the object store is represented as a Agent object, which i
\end{figure}
\subsection{RootEntry}
The RootEntry is an object with a conventional name in the object store. It is the entry point to the object tree, and is the only object which does not require a lookup. It contains references to each queue, the drive register, the agent register and the scheduling lock. It only needs to be modified when a new queue (archive or retrieve) is created or removed.
The RootEntry is an object with a conventional name in the Object Store. It is the entry point to the object tree, and is the only object which does not require a lookup. It contains references to each queue, the drive register, the agent register and the scheduling lock. It only needs to be modified when a new queue (archive or retrieve) is created or removed.
\subsection{Queues and request objects}
Requests represent a full file request. An archive request in hence composed of one or several transfers --- one for each copy, and all of them should be executed. A retrieve request is also composed of one or several transfers, but only one of them needs to be executed in order for the file to be retrieved.
Requests represent a full file request. An archive request is hence composed of one or several transfers --- one for each copy, and all of them should be executed. A retrieve request is also composed of one or several transfers, but only one of them needs to be executed in order for the file to be retrieved.
\subsubsection{The archive request}
\label{subsec:archiveRequest}
...
...
@@ -136,10 +169,10 @@ The drive register will allow operators and other drives (when scheduling) to ge
The scheduling global lock is a object used for locking the system globally while a drive is deciding its next mount. This is discussed further in section~\ref{sec:perf}.
The agent register is a list of all the agents operating on the object store. The list points to individual agent objects, one per actual process running in the system. This is further discussed in \ref{subsec:agentFailure}.
The agent register is a list of all the agents operating on the Object Store. The list points to individual agent objects, one per actual process running in the system. This is further discussed in \ref{subsec:agentFailure}.
\section{Multi object operations and multi-agent safety}
The object store provided per-object locking. The ObjectOps base template will validate that proper locking has been taken on a given object before accessing it. The usual sequences are \{ initialise (in memory), modify in memory, insert new object in the store\}, \{ lock, fetch, modify in memory, commit \} and \{ lock, fetch, remove \}.
The Object Store provided per-object locking. The ObjectOps base template will validate that proper locking has been taken on a given object before accessing it. The usual sequences are \{ initialise (in memory), modify in memory, insert new object in the store\}, \{ lock, fetch, modify in memory, commit \} and \{ lock, fetch, remove \}.
When a multi-object structure is involved, the process accessing the store should manage to create the object and reference it a way that is semantically atomic for the other processes. This multi-object access is implemented in the OStoreDB object.
...
...
@@ -155,20 +188,20 @@ During object creation or processing (like when a job is selected by the tape se
\label{subsec:agentFailure}
The conventions previously describe ensure that objects are always uniquely referenced inside the object tree, either by a queue or by an agent. Several instances of a dedicated process, the garbage collector, monitor those agent entries. The agent entry contains a heartbeat counter, which allows the garbage collector to determine that the process is not active anymore, and triggers the resetting of the owned objects. Garbage collector processes themselves are also represented as agents, own other agents (they cannot watch themselves) so that the crash of a garbage collector is also covered (the watched agents will be picked up by another garbage collector instance, on another system, or at another time as the garbage collector will be restarted automatically).
The resetting of the objects is type dependent. Each in memory object type implements a garbage collect method, which is called by the garbage collector when collecting a dead process. The object store representation of objects has a common header indicating the type, schema version number and owner (which is a shared notion). This allow the garbage collector to determine the type dynamically and to call the appropriate garbage collection function. Likewise, the owner in the header allows determining whether the object is actually owner by the agent being garbage collected (in which case the object should be reset), or not (in which case the reference was actually stale).
The resetting of the objects is type dependent. Each in memory object type implements a garbage collect method, which is called by the garbage collector when collecting a dead process. The Object Store representation of objects has a common header indicating the type, schema version number and owner (which is a shared notion). This allow the garbage collector to determine the type dynamically and to call the appropriate garbage collection function. Likewise, the owner in the header allows determining whether the object is actually owner by the agent being garbage collected (in which case the object should be reset), or not (in which case the reference was actually stale).
\subsection{Special case of archive and retrieve requests ownership}
\label{subsec:requestsOwnership}
As mentioned in section~\ref{subsec:archiveRequest}, the archive request is a special case, and has several owners, one per tape copy job. This means that determining ownership will require actually parsing object content itself instead of just the header. Besides this detail, the re-queuing of the job is identical to the other cases.
\subsection{Object versioning an schema evolution}
The object version, not currently used is intended for live schema evolution. In order to achieve migration from version A to B of the schema, we need to implement a transtional version of the objects which can read and write version A and B. After global deployment of this version, a central trigger (configuration file, etc..) changes the write version of the instances from A to B, and all objects previously written with schema A will be written back with schema B on the next update. This method allows a zero downtime schema transition, with the drawback that an active traversal of the structure is necessary to ensure complete transition. The schema is not yet implemented.
The object version, not currently used is intended for live schema evolution. In order to achieve migration from version A to B of the schema, we need to implement a transtional version of the objects which can read and write version A and B. After global deployment of this version, a central trigger (configuration file, etc.) changes the write version of the instances from A to B, and all objects previously written with schema A will be written back with schema B on the next update. This method allows a zero downtime schema transition, with the drawback that an active traversal of the structure is necessary to ensure complete transition. The schema is not yet implemented.
\section{Performance considerations}
\label{sec:perf}
Performance numbers have been extracted from the CASTOR runs of 2015. The per tape pool rate has been measured over 10 minutes intervals. The maximum seen was \unit{78}{\hertz}. The initial performance target will hence be \unit{100}{\hertz} per queue and a total \unit{1}{\kilo\hertz} system wide. The maximum size for a queue will be \power{10}{7}, and the system will instruct the user to back-off before crossing this boundary. This limit represents more than a day's worth at the maximum rate. The number of queues existing at a single point in time is estimated to be around \power{10}{3} (as several hundreds can be typically seen in CASTOR).
Using an object store allows independent access to each object, so little contention is expected, besides when accessing queues. As there are one queue per tape pool, cross talk between users of different tape pools should be minimal. The main challenge will hence be to ensure efficient queuing in a given queue when many files get added/dequeued in parallel. As the round trip time to the object store will not be compressible, we will have to add many elements to the queue in one go. On the tape server side, this could be implemented with bulk access to the queue, followed by many threads updating the jobs in parallel, and then updating all the entries in one go in the queue. This would allow accessing an arbitrary amount of jobs over a fixed number of round trip times.
Using an Object Store allows independent access to each object, so little contention is expected, besides when accessing queues. As there are one queue per tape pool, cross talk between users of different tape pools should be minimal. The main challenge will hence be to ensure efficient queuing in a given queue when many files get added/dequeued in parallel. As the round trip time to the Object Store will not be compressible, we will have to add many elements to the queue in one go. On the tape server side, this could be implemented with bulk access to the queue, followed by many threads updating the jobs in parallel, and then updating all the entries in one go in the queue. This would allow accessing an arbitrary amount of jobs over a fixed number of round trip times.
On the front end side, the fact that each xrootd connection lives in a separate thread can be leveraged, by naturally creating the jobs in each thread, and then relying on shared data structures to accumulate elements to queue in one go. This will allow to increase throughput at the expense of an increased (but bound) latency to the end user.