This doc introduces a fine-grained low-level scaling interface of the streaming system, which provides the most fundamental capability of scaling the streaming graph. It should be able to be directly called by the cloud manager to gain the best flexibility, or we may introduce another middleware to implement high-level scaling strategies atop this.

The Semantics of Parallel Unit

The parallel unit is defined as the logical concept of the performance of parallel computing on each worker node or the whole cluster, tightly coupled with the number of actors in a single fragment. Specifically,

  • Each compute node will report its max parallel unit number n to the meta service on start-up. The meta service will allocate n Parallel Unit IDs globally in the cluster and record them. Therefore, given a parallel unit ID, we’ll be able to find which worker node it belongs to.
  • Each parallelism (or actor) of a fragment will occupy a parallel unit. Thus, the max parallelism of a single fragment will be the total parallel unit count in the cluster. However, a parallel unit occupied with fragment A is still available for other fragments; that is, the occupation of the parallel units will be a per-fragment concept, which can be modeled as a fragment to parallel-unit matrix shown below.
  • The parallelism reported by the compute node will be the number of CPU cores by default, or be specified from the command-line arguments if running under a shared container. We may improve this to achieve better resource utilization based on some statistics of our streaming executors in the future. However, it shouldn’t be fewer than the CPU cores considering that a single fragment may become the bottleneck of the streaming graph and then stall the pipeline.

Image

The color shows which parallel unit a fragment is scheduled on.

Since the parallel unit is a logical concept, there’s no difference for putting the singleton Fragment 1 on parallel unit 1 or 2: they’re both scheduled to an async task on worker 1. Assigning an ID for each parallel unit is only to simplify the management of the scaling.

The cloud manager can query the above matrix periodically by GetClusterInfo RPC and make scaling decisions based on this. After it launches a new worker node and registers it to the cluster, the new parallel units can also be reflected on this automatically. When and how to reschedule the fragments can be fully managed by the cloud (except for creating a new materialized view), using the interfaces presented in next sections.

message GetClusterInfoRequest {}
message GetClusterInfoResponse {
  repeated TableFragment table_fragments = 1;
  map<ParallelUnitId, WorkerId> parallel_units_mapping = 2;
  map<FragmentId, repeated ParallelUnitId> fragment_parallism = 3;
}

The Fragment Rescheduling Request

Based on the concept of logical parallel unit, we’re able to model different tasks like scale-in, scale-out, and actor migration in a unified fragment rescheduling request. The RPC should be like below.

messsage RescheduleFragmentRequest {
  message Reschedule {
    repeated ParallelUnitId added_parallel_units = 1;
    repeated ParallelUnitId removed_parallel_units = 2;
  }
  map<FragmentId, Reschedule> reschedules = 1;
}
message RescheduleFragmentResponse {
  bool success = 1;
}
  • For scaling out a fragment, one should specify the added_parallel_units based on the cluster info queried before and leave the removed_parallel_units empty, vice-versa for scaling in.
  • For actor migration, one should ensure the length of added is the same as that of removed.

The caller should batch the rescheduling requests of a single fragment into one RPC. Considering the vnode remapping is done on the meta service, batching requests could make it gain better knowledge of the changes and be able to keep the data locality as better as possible.

For serving this request, the meta service will…

  1. Create actors on the specified parallel units asynchronously (if required) while keep the upstream & downstream not connected, just like what we did in creating materialized views.
  2. Pause the stream and issue a new barrier with UpdateMutation that carries the new-calculated vnode mapping and rescheduling requests. This will be used to update the dispatcher and the merger, and also update the vnode bitmap for the state tables of all actors in this fragment.
  3. Update the meta store of fragments & cluster info and resume the stream.

It should be noted that all of the parallel unit IDs in the request will be checked before applying to the streaming graph. The meta service is not responsible for starting or shutting down compute nodes, which should be handled before or after the rescheduling request by the cloud manager itself instead. Even so, we may allow a compute node to mark itself as RemovedSoon so that the meta service will never schedule newly-created materialized views on it. However, the actor migration or dropping should still be managed by the external service. (See also: Step 1: Mark to-remove CNs as REMOVED_SOON: ScaleService in Meta for scheduler operator)

See Also