-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the concurrent programming design doc #6394
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
# Design Doc: Concurrent Programming with Fluid | ||
|
||
With PaddlePaddle Fluid, users describe a program other than a model. The program is a [`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/framework.proto) protobuf message. TensorFlow/MxNet/Caffe2 applications generate protobuf messages too, but their protobuf messages represent the model, a graph of operators, but not the program that trains/uses the model. | ||
|
||
Many know that when we program TensorFlow, we can specify the device on which each operator runs. This allows us to create a concurrent/parallel AI application. An interesting questions is **how does a `ProgramDesc` represents a concurrent program?** | ||
|
||
The answer relies on the fact that a `ProgramDesc` is similar to an abstract syntax tree (AST) that describes a program. So users just program a concurrent program that they do with any concurrent programming language, e.g., [Go](https://golang.org). | ||
|
||
## An Analogy | ||
|
||
The following table compares concepts in Fluid and Go | ||
|
||
| Go | Fluid | | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The concepts of a programming language is not only for Go, all programming language have. People may not familiar with Go. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other languages don't have goroutine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. You can use the concept "coroutine". But never mind, either is OK. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, coroutine is completely different from goroutine. Nothing similar to each other indeed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. goroutine is preemptive scheduled, coroutine need to give up the occupation of OS thread before it could run other coroutines. goroutine is implemented as a thread pool in the Go runtime library, coroutine is a special data type -- a stack of stack. |
||
|----|-------| | ||
|user-defined functions | [layers](https://github.com/PaddlePaddle/Paddle/tree/develop/python/paddle/v2/fluid) | | ||
| control-flow and built-in functions | [intrinsics/operators](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/operators) | | ||
| goroutines, channels | [class ThreadPool](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/framework/thread_pool.h) | | ||
| runtime | [class Executor](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h) | | ||
|
||
## An Example Concurrent Program | ||
|
||
To review all above concepts in an example, let us take a simple program and writes its distributed version. | ||
|
||
Suppose that we want to parallelize a naive Fluid program (written in Go and calling Fluid's Go binding) that multiplies two tensors. | ||
|
||
```go | ||
import "fluid" | ||
|
||
func paddlepaddle() { | ||
X = fluid.read(...) | ||
W = fluid.Tensor(...) | ||
Y = fluid.mult(X, W) | ||
} | ||
``` | ||
|
||
Please be aware that the Fluid's Go binding provides the default `main` function, which calls the `paddlepaddle` function, which, in this case, is defined in above program and creates the following `ProgramDesc` message. | ||
|
||
```protobuf | ||
message ProgramDesc { | ||
block[0] = Block { | ||
vars = [X, W, Y], | ||
ops = [ | ||
read(output = X) | ||
assign(input = ..., output = W) | ||
mult(input = {X, W}, output = Y) | ||
], | ||
} | ||
} | ||
``` | ||
|
||
Then, the default `main` function calls `fluid.run()`, which creates an instance of the [`class Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h) and calls `Executor.Run(block[0])`, where `block[0]` is the first and only block defined in above `ProgramDesc` message. | ||
|
||
The default `main` function is defined as follows: | ||
|
||
```go | ||
func main() { | ||
paddlepaddle() | ||
fluid.run() | ||
} | ||
``` | ||
|
||
## The Concurrent Version | ||
|
||
By parallelizing the above program, we could support very big tensor X by splitting into small pieces {x_1, x_2, ...} and sent each piece to worker process/node for parallel multiplication. | ||
|
||
In this case, we can write a transpiler that takes a `ProgramDesc` message that represents the above example program and outputs two `ProgramDesc` messages, one for running on the master process/node, and the other one for worker processes/nodes. | ||
|
||
### The Master Program | ||
|
||
The master program could look like the following: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an example of transpiler converting the program to a multi-node version, for multi-threaded version may be quiet different. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would the output from a transpiler differ from that from a human programmer? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @wangkuiyi It should be the same. I mean:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This example is multi-threading -- both ParallelFor and ListenAndDo in this example create threads. This example doesn't use multiple GPUs on the same node. But it would be easily changed to do that -- the worker program can use ParallelFor to further split the x it received and use multiple GPUs on a worker. I can add this part to an additional section. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, thanks! |
||
|
||
```protobuf | ||
message ProgramDesc { | ||
block[0] = Block { | ||
vars = [X, L, Y], | ||
ops = [ | ||
read(output = X) | ||
kube_get_workers_addrs(output = L) | ||
Y = tensor_array(len(L)) | ||
parallel_for(input = X, output = Y, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can put these two parallel.Do(
func(){ parallel_for(...) } // parallel_for A
func(){ parallel_for(...) } // parallel_for B
} In Fluid, it could be pd = fluid.parallel.do()
with pd.block():
with fluid.parallel.for():
# parallel_for A
with pd.block():
with fluid.parallel.for():
# parallel_for B |
||
attrs = {L, block_id(1)}) # referring to block 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From my best knowledge,
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right! |
||
] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering that how to deal with variable synchronization. How we notify the next op the variable is ready? And, how next op wait for the condition? Do we need a low level synchronize mechanism, such as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean how we make sure that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Thanks! |
||
} | ||
block[1] = Block { | ||
parent = 0, | ||
vars = [x, y, index], | ||
ops = [ | ||
slice(input = [X, index], output = x) # index is initialized by parallel_for | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following your suggestion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @gongweibao is right. Because block 1 is a sub-block of block 0, so it could refer to X defined in block 0. The variable index is created in block 1's scope. |
||
send(input = x, attrs = L[index]) | ||
recv(outputs = y, attrs = L[index]) | ||
assign(input = y, output = Y[index]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From my understanding Maybe rather than:
we can:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great idea! It would be great if we can do this -- assign into a tensor of a tensor array. |
||
] | ||
} | ||
} | ||
``` | ||
|
||
The equivalent Fluid program (calling the Go binding) is: | ||
|
||
```go | ||
func main() { //// block 0 | ||
X = fluid.read(...) | ||
L = fluid.k8s.get_worker_addrs() | ||
Y = fluid.tensor_array(len(L)) | ||
fluid.parallel_for(X, L, | ||
func(index int) { //// block 1 | ||
x = X[index] | ||
fluid.send(L[index], x) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. You are right. I ama adding it. |
||
y = fluid.recv(L[index]) | ||
Y[index] = y | ||
}) | ||
} | ||
``` | ||
|
||
An explanation of the above program: | ||
|
||
- `fluid.k8s` is a package that provides access to Kubernetes API. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree that we should have |
||
- `fluid.k8s.get_worker_addrs` returns the list of IP and ports of all pods of the current job except for the current one (the master pod). | ||
- `fluid.tensor_array` creates a [tensor array](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor_array.h). `fluid.parallel_for` creates a `ParallelFor` intrinsic, which, when executed, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we need an operator like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sub-graph is different between GPU parallel and CPU parallel, such as GPU parallel need Broadcast/All-Reduce Op to distribute/sync the paramters, and CPU parallel only need to fetch different data index in different threads. So maybe we can allow the users to describe the Program by himself, and we can also offer a transpiler to convert the CPU sub-graph to GPU sub-graph. |
||
|
||
1. creates `len(L)` scopes, each for the concurrent running of the sub-block (block 1 in this case), and initializes a variable named "index" in the scope to an integer value in the range `[0, len(L)-1]`, and | ||
2. creates `len(L)` threads by calling into the `ThreadPool` singleton, each thread | ||
1. creates an Executor instance, and | ||
2. calls `Executor.Run(block)`, where `block` is block 1 as explained above. | ||
1. Please be aware that block 1 is a sub-block of block 0, so ops in block 1 could refer to variables defined in block 0. | ||
|
||
### The Worker Program | ||
|
||
The worker program looks like | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we also give the example protobuf of the worker program? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding that. |
||
|
||
```go | ||
func main() { | ||
W = Tensor(...) | ||
x = fluid.listen_and_do( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool. How about multiple devices serving? Just mix a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will we support mixed device? If one block contains op both in GPU and CPU (while op), will we take two pool of device and create two There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think |
||
fluid.k8s.self_addr(), | ||
func(input Tensor) { | ||
output = fluid.mult(input, W) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does listen_and_do both receives and sends values? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly. Let me make that clear. |
||
}) | ||
} | ||
``` | ||
|
||
where | ||
|
||
- `fluid.listen_and_do` creates a `ListenAndDo` intrinsic, which, when executed, | ||
1. listens on the current pod's IP address, as returned by `fliud.k8s.self_addr()`, | ||
2. once a connection is established, | ||
1. creates a scope of two parameters, "input" and "output", | ||
2. reads a [Fluid variable](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/variable.h) and saves it into "input", | ||
3. creates an Executor instance and calls `Executor.Run(block)`, where the block is generated by running the lambda specified as the second parameter of `fluid.listen_and_do`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
From the above description, one Here, does this worker program or In addition, the line 134 is If I don't understand it, please forgive me. :) |
||
|
||
## Summarization | ||
|
||
From the above example, we see that: | ||
|
||
1. Fluid enables the imperative programming paradigm by: | ||
1. letting users describe a program, but not a model (a sequence of layers, or a graph of operators), and | ||
2. call the `fluid.run` function that runs the program implicitly. | ||
1. The program is described as a `ProgramDesc` protobuf message. | ||
2. Function `Executor.Run` takes a block, instead of a `ProgramDesc`, as its parameter. | ||
3. `fluid.run` calls `Executor.Run` to run the first block in the `ProgramDesc` message. | ||
4. `Executor.Run`'s implementation is extremely simple -- it doesn't plan the execution nor create threads; instead, it runs on the current thread and execute intrinsics/operators' `Run` method sequentially as they appear in the `Block.ops` array. | ||
5. Intrinsics/operators' `Run` method might create threads. For example, the `ListenAndDo` operator creates a thread to handle each incoming request. | ||
6. Threads are not necessarily OS thread; instead, they could be [green threads](https://en.wikipedia.org/wiki/Green_threads) managed by ThreadPool. Multiple green threads might run on the same OS thread. An example green threads is Go's [goroutines](https://tour.golang.org/concurrency/1). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can't call it as And here it's defined as hybrid threading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the
parallel_do
design and it is great.But for curiosity, Is this design a concurrent programming use case just for distributed computation(data distribution)?
or it includes the future design for the whole concurrent programming support?
If the whole design, there needs some other concepts in addition to
parallel_for
, such aswaiting_for
or the low-levelmutex
andcondition_variable
.And I wonder what is the capacity of fluid's concurrent programming? fully support the low-level concurrent concepts(very flexible) such as
mutex
or follow some classical framework(easy to use) such asmap/reduce
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Layer is the user-defined function, it looks like we need a high order function
map
to do parallel training and inference.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ChunweiYan and @PaddlePaddle/paddle Thanks for the question.
This design aims for the future. I completely understand we are going to have a stack of abstractions of concurrency. I will try to complete this document to include the following:
fluid.parallel.do
andfluid.parallel.for
SelectOp
Channel
a new data type inVarDesc
GoOp
where
GoOp
creates a (green) thread in PaddlePaddle's thread pool.SelectOp
,Channel
, andGoOp
are the BSP primitives. It is mathematically proved that given these primitives, we no longer need semaphore and mutex. In practice, using the BSP primitives, it would be difficult to write concurrent programs that deadlocks.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem Go is under CSP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plus, the comparations of parallel programming models: https://en.wikipedia.org/wiki/Parallel_programming_model.
By the way, "Model parallelism" is some kind of "Task parallelism" on the above page.