Yesterday I explained briefly what happens when the recommendation cluster is bootstrapped. Today I thought I would go into a little bit more detail, explaining how a new node figures out what to prioritise.
First node to join is obviously on its own. I assume that a Zookeeper-like service is available to keep track of members of the cluster. In my current setup this is a dummy nameserver of 10 lines of scala+akka code which fulfills the purpose just fine for testing. This service must be running before any node can potentially join the cluster.
The first thing the node does is to register with the nameserver. The nameserver will reply by sending all the currently registered nodes, including the node issuing the request. If there are no other nodes in the cluster it will become responsible for coordinating the bootstrapping procedure of the cluster. This responsibility includes two things:
- Read the file with itemsets’s signatures and assign internal id’s to each. This id is only used internally by the recommendation cluster.
- Load as many itemsets to memory as the node can handle (1 itemset = 1 worker) – this can either be manually restricted or determined from within the system. It is manual for now since that makes it more predictable and easier to evaluate.
Assuming that the first node is not able to maintain all itemsets in memory on its own it will mark those that are not yet loaded and should be given priority.
As the second node joins the it will, similarly to the first node, register with the nameserver and receive a list of existing nodes. When discovering that existing nodes exists it will:
- Replicate the first node’s registry to learn what itemsets are already loaded and where the workers to serve those itemsets are residing. If it receives registers from more than one node the registers are merged. This way they will eventually be consistent on all nodes.
- Retrieve a list of itemsets to load. Once all nodes have replied it will sort the itemsets according to following formula: itemsets not previously loaded will be prioritised first. If all itemsets are loaded it will sort the itemsets according to their popularity (determined by the number of requests each itemset retrieves).
- Load the prioritised itemsets. One worker per itemset is started and it will ensure to register with both the local node on which it resides, as well as on the other nodes in the cluster.
A cluster with a lot of direct links will be expensive to maintain. Well, there are both sides to the argument. Maintaining more links have to be put in perspective to the frequency of expected failures and the cost of fixing/updating the link once a node recovers. I plan to evaluate this more closely in the next few days. An alternative is to add another level of indirection by storing only at which node the worker belongs. I would expect a small (max 5) nodes with a few hundred itemsets each to be sufficient for most cases. Maintaining the links for such a setup may cause a certain spike (increased latency) when a node dies, but overall be manageable since they otherwise rarely change. The time spent routing is possibly capped by the similarity function anyway. If this proves to be a bottleneck there’s need for more efficient datastructures and some further parallelisation. Actually, when I think about it, there is a lot which can be improved. First get it running…
Failure scenarios are plenty. That said, there are some resiliance to node and/or worker failures.
- The nodes establishes monitors for the workers, such that if one dies, it will be removed from the registry. When the worker is restarted by the worker’s supervisor it automatically re-registers with all nodes.
- If a node dies, the same procedure as above happens except that there is nothing to restart the node.
- Nodes can obviously fail during start-up. The case when the first node joining the cluster fails (before all itemsets are loaded at some node) is, however, not handled.
- A node which fails and rejoins continuously could probably overload the system. There is no protection against thrashing, such as throttling the number of join requests.
- If all available replicas for a particular itemset is down it is needed to respawn them from persistent storage (local file system or wherever they are stored). This is still unresolved and deserves a post on its own.
Now it remains to see if this scales sufficiently and what the impact of node join and leave is, and to rewrite this text more formally so that I can use it in the report.