Lesson 6 - 實現任意區域的 Func
// 上一節的內容相當多, 而後續緊接著複雜的 multi-stage pipeline// 這節是個插曲, 考量一些簡單的事: 如何在非原點開始的長方形區域做計算
// 定義熟悉的 gradient 函數
Func gradient("gradient");
Var x("x"), y("y");
gradient(x, y) = x + y;
// 打開 tracing 功能, 藉此觀察計算的過程
gradient.trace_stores();
// 先前我們以這樣的方式來實現此 gradient 函數
//
// gradient.realize(8, 8);
//
// 內部做了三件事
// 1) 產生能夠在任何長方形計算 gradient 的程式碼
// 2) 配置新的 8x8 影像
// 3) 執行產生的程式碼來計算所有 x, y 的 gradient
// 從 (0, 0) 到 (7, 7) 並且將結果存入該影像
// 4) 返回新影像作為 realize 呼叫的結果
// 然而當小心地管理記憶體, 且不藉由 Halide 配置新的影像的結果是?
// 使用上能夠以另外的方式呼叫 realize - 能夠傳入想要被填入資料的影像
// 下列程式碼能使用 Func 計算結果並且存入早已配置的影像
printf("Evaluating gradient from (0, 0) to (7, 7)\n");
Buffer result(8, 8);
gradient.realize(result);
// Click to show output ...
// Let's check it did what we expect:
for (int y = 0; y < 8; y++) {
for (int x = 0; x < 8; x++) {
if (result(x, y) != x + y) {
printf("Something went wrong!\n");
return -1;
}
}
}
// 接著在其他位置開始的 5x7 長方形計算中 gradient -- 在 (100, 50) 的位置
// 因此 x, y 涵蓋的範圍將由 (100, 50) 到 (104, 56)
// 首先建立一個代表該長方形的影像
Buffer<int> shifted(5, 7); // 在建構子中告知影像大小
shifted.set_min(100, 50); // 接著告知左上角的起始座標
printf("Evaluating gradient from (100, 50) to (104, 56)\n");
// 請注意並不需要重新編譯新的程式碼,
// 因為已經在第一次呼叫 realize 時產生能計算任意長方形 gradient 的程式碼
gradient.realize(shifted);
// Click to show output ...
// C++ 版本中, 一樣將自 (100, 50) 座標開始存取影像物件
for (int y = 50; y < 57; y++) {
for (int x = 100; x < 105; x++) {
if (shifted(x, y) != x + y) {
printf("Something went wrong!\n");
return -1;
}
}
}
// 'shifted' 影像存放著 Func 自位置 (100, 50) 開始計算的結果,
// 所以要求 shifted(0, 0) 將會產生 out-of-bound 的錯誤, 甚至可能 crash
// 當我們想要以 Func 計算非長方形區域會如何?
// 糟糕, Halide 只能處理長方形 :)
==
Lesson 7 - multi-stage pipeline
// 首先宣告會使用到的變數
Var x("x"), y("y"), c("c");
// 接著會定義用以做模糊影像處理的 multi-stage pipeline
// 流程上先水平後垂直的次序做處理
{
// 輸入為 8-bit 彩色影像}
Buffer<uint8_t> input = load_image("images/rgb.png");
// 將精準度展延到 16-bit, 以避免數理操作上的 overflow
Func input_16("input_16");
input_16(x, y, c) = cast<uint16_t>(input(x, y, c));
// 水平模糊:
Func blur_x("blur_x");
blur_x(x, y, c) = (input_16(x-1, y, c) +
2 * input_16(x, y, c) +
input_16(x+1, y, c)) / 4;
// 垂直模糊:
Func blur_y("blur_y");
blur_y(x, y, c) = (blur_x(x, y-1, c) +
2 * blur_x(x, y, c) +
blur_x(x, y+1, c)) / 4;
// 轉換回 8-bit 精準度
Func output("output");
output(x, y, c) = cast<uint8_t>(blur_y(x, y, c));
// 每個在這 pipeline 中的 Func 會使用類似的函數呼叫語法來呼叫前一級的 Func
// ( Func 物件的 operator() 已 overloaded)
// 一個 Func 可能會呼叫其他已經給予定義的 Func. (champ:重點在於已定義)
// 這限制避免 pipeline 陷入無限迴圈 (champ:也代表著沒有遞迴)
// Halide 的 pipeline 總是個由多個 Func 所組成單向向前的 Graph (champ:也就是 DAG)
// 接著實現它
// Buffer<uint8_t> result = output.realize(input.width(), input.height(), 3);
// 上面這行並沒有動作, 嘗試將註解移除看看會發生什麼
// 由於 blur_x 會在水平方向上超出範圍而 blur_y 會在垂直方向上超出範圍,
// 因此在與輸入影像相同的起始與範圍實現這個 pipeline 的同時
// 將造成輸入影像必須讀入超過範圍的輸入的情形
// Halide 會在開頭注入一段程式碼偵測這問題, 是先計算影響處理所需讀入的輸入
// 在 pipeline 運作之前會先執行這段程式碼, 確認所需輸入是否超出範圍, 若是則停止
// 在內層 loop 並沒有做確認, 因為會造成效能低落
//
// 那該如何處理這問題? 有幾個選擇. 若上下左右都向內縮減一個 pixel,
// 如此就不會遇到要求 Halide 讀取出界. 而這件事在上一節有展示如何達到
Buffer<uint8_t> result(input.width()-2, input.height()-2, 3);
result.set_min(1, 1);
output.realize(result);
// 將結果存下, 可以看到些微模糊的鸚鵡, 並且影像長寬都比輸入影像少2 pixels
save_image(result, "blurry_parrot_1.png");
// 這通常是最快的處理邊界的方法, 避免寫出超出範圍的程式
// 更通常的作法在下一個例子
// 有著輸入邊界條件的相同 pipeline
{
// 輸入為 8-bit 彩色影像}
Buffer<uint8_t> input = load_image("images/rgb.png");
// 這次將輸入以另一個避免讀出界的 Func 包起來
Func clamped("clamped");
// 定義一個將 x 限制於 [0, input.width()-1] 範圍的表示式
Expr clamped_x = clamp(x, 0, input.width()-1);
// clamp(x, a, b) 等義於 max(min(x, b), a).
// 同樣地 clamp y
Expr clamped_y = clamp(y, 0, input.height()-1);
// 藉由限制後的座標來讀取輸入. 這表示無論如何計算, 都不會超出輸入的範圍
// 這是 clamp-to-edge 邊界方式, 為在 Halide 中表示上最簡單的邊界處理
clamped(x, y, c) = input(clamped_x, clamped_y, c);
// 藉由使用自 BoundaryConditions 的輔助函式
// 可以更簡潔地定義 'clamped', 像是:
//
// clamped = BoundaryConditions::repeat_edge(input);
//
// 這對於其他邊界條件是重要的 因為這表示方式 Halide 能夠充份理解與優化
// 若使用得當, 這樣的邊界條件就會如同沒有使用一般的高效率
// 展延到 16-bit 精準度, 如此可以避免數理操作的 overflow.
// 這次我們使用新的 Func 'clampled' 取代直接使用輸入影像
Func input_16("input_16");
input_16(x, y, c) = cast<uint16_t>(clamped(x, y, c));
// 剩下的部份與先前一致
// 水平模糊
Func blur_x("blur_x");
blur_x(x, y, c) = (input_16(x-1, y, c) +
2 * input_16(x, y, c) +
input_16(x+1, y, c)) / 4;
// 垂直模糊
Func blur_y("blur_y");
blur_y(x, y, c) = (blur_x(x, y-1, c) +
2 * blur_x(x, y, c) +
blur_x(x, y+1, c)) / 4;
// 轉回 8-bit
Func output("output");
output(x, y, c) = cast<uint8_t>(blur_y(x, y, c));
// 由於有做邊界處理, 這次計算如同 input 的起始與範圍是安全的
Buffer <uint8_t>result = output.realize(input.width(), input.height(), 3);
// 儲存結果, 看起來應是些微模糊的鸚鵡
// 但這次大小與輸入一致
save_image(result, "blurry_parrot_2.png");
==
Lesson 8 - Multi-stage pipeline 的排程
// 首先定義後續會用到的變數
Var x("x"), y("y");
// 接著來對一個簡單 two-stage pipeline 測試不同的排程選項
// 首先是預設排程:
{
Func producer("producer_default"), consumer("consumer_default");}
// 第一個 stage 是類似於我們熟悉的 gradient 函數的簡單逐點數學運算
// 在座標 (x, y) 的數值是 x 與 y 相乘後代入 sin 函數
producer(x, y) = sin(x * y);
// 接著增加第二個 stage, 動作是將多個來自第一個 stage 的點做數值平均
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 對於這兩個 Func 都打開 tracing 功能
consumer.trace_stores();
producer.trace_stores();
// 接著在 4x4 方塊上計算
printf("\nEvaluating producer-consumer pipeline with default schedule\n");
consumer.realize(4, 4);
// Click to show output ...
// 並沒有對於 producer 計算數值的訊息印出
// 這是因為 producer 完全以 inline 方式於 consumer 中
// 就如同以下的撰寫方式:
// consumer(x, y) = (sin(x * y) +
// sin(x * (y + 1)) +
// sin((x + 1) * y) +
// sin((x + 1) * (y + 1))/4);
// 所以有 producer 的呼叫都被置換為 producer 的內容, 並以變數替換相關參數
// 等義的 C code:
float result[4][4];
for (int y = 0; y < 4; y++) {
for (int x = 0; x < 4; x++) {
result[y][x] = (sin(x*y) +
sin(x*(y+1)) +
sin((x+1)*y) +
sin((x+1)*(y+1)))/4;
}
}
printf("\n");
// 若檢查巢狀的 loop 會發現 producer 全然沒出現.
// 它已經被 inline 置入 consumer 中
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 接著測試下一個簡單的項目 - 在計算任何的 consumer 數值前,
// 事先計算所有在 producer 被需要的數值.
// 這樣的排程稱為 "root"
{
// 首先使用相同的定義}
Func producer("producer_root"), consumer("consumer_root");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 告知 Halide 在計算 consumer 之前, 要計算所有的 producer 數值
producer.compute_root();
// 打開 tracing 功能
consumer.trace_stores();
producer.trace_stores();
// 編譯與執行
printf("\nEvaluating producer.compute_root()\n");
consumer.realize(4, 4);
// Click to show output ...
// 從輸出可以看出:
// A) 有來自 producer 的寫入
// B) 這些全發生在任何的 consumer 寫入之前
// 以下為視覺化結果
// producer 在左, consumer 在右
// 寫入標示為橘色, 讀取標示為藍色
// 等義的 C code:
float result[4][4];
// 配置一些暫存空間給 producer
float producer_storage[5][5];
// Compute the producer.
for (int y = 0; y < 5; y++) {
for (int x = 0; x < 5; x++) {
producer_storage[y][x] = sin(x * y);
}
}
// 計算 consumer 的部份, 這次忽略印出的訊息
for (int y = 0; y < 4; y++) {
for (int x = 0; x < 4; x++) {
result[y][x] = (producer_storage[y][x] +
producer_storage[y+1][x] +
producer_storage[y][x+1] +
producer_storage[y+1][x+1])/4;
}
}
// 注意 consumer 是計算 4x4 正方, 因此 Halide 將自動推斷 producer 需要 5x5 正方資料
// 這與前一節的邊界推斷的邏輯相同, 主要用來偵測與避免自輸入影像作超出邊界讀取
// 若印出 loop 的巢狀, 會看到很類似於上面的 C code
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 接著比較上面兩種方法的效率
// Full inlining (預設的排程):
// - 暫時的記憶體配置: 0
// - 讀取: 0
// - 寫入: 16
// - sin呼叫次數: 64
// producer.compute_root():
// - 暫時的記憶體配置: 25 floats
// - 讀取: 64
// - 寫入: 41
// - sin呼叫次數: 25
// 這是一些要考量的妥協, inline 使用了最少的暫時記憶體與記憶體頻寬
// 但使用了大量昂貴數學計算. 在 producer 中大部份的點被計算了4次
// 而第二種排程 producer.compute_root() 有著最少次的 sin 呼叫,
// 但使用較多的暫時記憶體與記憶體頻寬
//
// 任何情況下都很難做出正確的選擇. 若是記憶體頻寬限制或是沒有太多記憶體,
// (例如: 可能在舊手機上執行), 那麼做重複的數學計算有其意義.
// 反過來說 sin 呼叫是昂貴的, 若是計算資源受限, 較少使用 sin 讓程式更快
// 對程式 vectorize 或是多核平行將偏好做重複的計算, 因為多核增加了計算
// 吞吐量, 而不會增加系統頻寬或是容量
// 如此能夠在 full inlining 與 compute_root 中做選擇.
// 接著測試在以每條 scanline 為基礎的方式在 producer 與 consumer 間計算
{
// 首先使用相同的定義}
Func producer("producer_y"), consumer("consumer_y");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 告知 Halide 在 consumer 的每個 y 座標計算所需的 producer 數值
producer.compute_at(consumer, y);
// equivalent C below.
// 這部分的計算 producer 的 code 就是在 consumer 的 y loop "之中"
// 如同下列等義的 C code.
// 打開 tracing 功能
producer.trace_stores();
consumer.trace_stores();
// 編譯與執行
printf("\nEvaluating producer.compute_at(consumer, y)\n");
consumer.realize(4, 4);
// Click to show output ...
// 下圖為視覺化結果
// 閱讀 log 與查看上圖, 就會了解 producer 與 consumer 間基於每條 scanline 的方式
// 以下為等義的 C code:
float result[4][4];
// 有一個外部的 loop 來掃過 consumer 的 scanline
for (int y = 0; y < 4; y++) {
// 配置空間與計算足夠的 producer 來滿足單一 scanline consumer 的計算
// 這表示一個 5x2 的 producer 長方形
float producer_storage[2][5];
for (int py = y; py < y + 2; py++) {
for (int px = 0; px < 5; px++) {
producer_storage[py-y][px] = sin(px * py);
}
}
// 計算單條 consumer 的 scanline
for (int x = 0; x < 4; x++) {
result[y][x] = (producer_storage[0][x] +
producer_storage[1][x] +
producer_storage[0][x+1] +
producer_storage[1][x+1])/4;
}
}
// 同樣地, 若印出迴圈巢狀結構, 將看到類似上面的 C code.
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 這樣策略在效能上的特性是介於 inline 與 compute root.
// 依然需要配置暫時記憶體, 但少於 compute root 且有著較佳的 locailty
// (在寫入後會需要很快的載入, 對於較大的影像來說, 數值應還在 cache)
// 依然是需要重複的計算, 但少於 full inlining.
// producer.compute_at(consumer, y):
// - 暫時記憶體: 10 floats
// - 讀取: 64
// - 寫入: 56
// - sin呼叫次數: 40
// 這裡可以繼續研究 producer.compute_at(consumer, x),
// 但這與 full inlining (預設排程)非常相似.
// 因此判斷在不同 loop 中配置 producer 的儲存空間
// 以及在不同的 loop 中實際地計算數值, 這將產生一些優化方向
{
Func producer("producer_root_y"), consumer("consumer_root_y");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 告知 Halide 在最外層配置存放所有 producer 的空間
producer.store_root();
// ... 但在每個 consumer 的 y 座標才去作計算
producer.compute_at(consumer, y);
producer.trace_stores();
consumer.trace_stores();
printf("\nEvaluating producer.store_root().compute_at(consumer, y)\n");
consumer.realize(4, 4);
// Click to show output ...
// 下圖為視覺化結果
}
// 閱讀 log 與察看上圖會得知
// 這也是基於介於 producer 與 consumer 間每條 scanline 的方式
// 先計算 5x2 producer 的長方形以滿足第一條 consumer scanline 的計算
// 在此之後儘需要計算 5x1 的長方形來計算新的 consumer scanline
//
// Halide 偵測到對於所有的 scanline 除了第一條之外,
// 能夠重複使用在配置給 producer buffer 中既有的數值
// 以下為等義 C code:
float result[4][4];
// producer.store_root() 表示空間在此:
float producer_storage[5][5];
// 這是用來掃過 consumer 每條 scanline 的外層 loop
for (int y = 0; y < 4; y++) {
// 計算足夠的 producer 以滿足這次 consumer scanline 的計算
for (int py = y; py < y + 2; py++) {
// 若所需的 row 是先前已經被計算過的 producer 數值, 則跳過
if (y > 0 && py == y) continue;
for (int px = 0; px < 5; px++) {
producer_storage[py][px] = sin(px * py);
}
}
// 計算 consumer scanline
for (int x = 0; x < 4; x++) {
result[y][x] = (producer_storage[y][x] +
producer_storage[y+1][x] +
producer_storage[y][x+1] +
producer_storage[y+1][x+1])/4;
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 這個策略的效能特性很好, 數字上接近 compute_root, 除了 locality 更好.
// 使用了最少次數的 sin 呼叫, 並且在寫入數值後快速的被載入
// 因此可能良好地使用 cache.
// producer.store_root().compute_at(consumer, y):
// - 暫時記憶體配置: 10 floats
// - 讀取: 64
// - 寫入: 39
// - sin呼叫次數: 25
// 注意宣稱的配置記憶體數量與 C code 並不一致
// Halide 在此之下有進一步地優化.
// 它將 producer 轉為大小為 2 scanline 的 circular buffer
// 等義的 code 如下:
{
// 實際上寫入 2 條 scanline 而非 5 條
float producer_storage[2][5];
for (int y = 0; y < 4; y++) {
for (int py = y; py < y + 2; py++) {
if (y > 0 && py == y) continue;
for (int px = 0; px < 5; px++) {
// 透過 y 座標 bit-masked 後存入 producer_storage
producer_storage[py & 1][px] = sin(px * py);
}
}
// 計算 consumer scanline
for (int x = 0; x < 4; x++) {
// 透過 y 座標 bit-masked 後, 自 producer_storage 載入資料
result[y][x] = (producer_storage[y & 1][x] +
producer_storage[(y+1) & 1][x] +
producer_storage[y & 1][x+1] +
producer_storage[(y+1) & 1][x+1])/4;
}
}
}
// 藉由儲存空間放置最外層並將計算移到最內層, 還能做到更好的結果
{
Func producer("producer_root_x"), consumer("consumer_root_x");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 存放在最外層, 計算在最內層
producer.store_root().compute_at(consumer, x);
producer.trace_stores();
consumer.trace_stores();
printf("\nEvaluating producer.store_root().compute_at(consumer, x)\n");
consumer.realize(4, 4);
// Click to show output ...
// 下圖為視覺化結果
// consumer 與 producer 間的計算是基於每個點的方式}
// 以下為等義 C code:
float result[4][4];
// producer.store_root() 表示存放空間在此
// 能夠將其以 circular buffer 方式減少為 2 scanlines
float producer_storage[2][5];
// 對於每個 consumer 的 pixel
for (int y = 0; y < 4; y++) {
for (int x = 0; x < 4; x++) {
// 計算足夠的 producer 以滿足這次 consumer pixel 的計算
// 但必須忽略已經計算過的數值:
if (y == 0 && x == 0)
producer_storage[y & 1][x] = sin(x*y);
if (y == 0)
producer_storage[y & 1][x+1] = sin((x+1)*y);
if (x == 0)
producer_storage[(y+1) & 1][x] = sin(x*(y+1));
producer_storage[(y+1) & 1][x+1] = sin((x+1)*(y+1));
result[y][x] = (producer_storage[y & 1][x] +
producer_storage[(y+1) & 1][x] +
producer_storage[y & 1][x+1] +
producer_storage[(y+1) & 1][x+1])/4;
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 效能上的特性是至今最好的.
// 四分之一所需的 producer 數值依然在暫存器中, 因此不計入讀取數
// producer.store_root().compute_at(consumer, x):
// - 配置的暫時記憶體: 10 floats
// - 讀取: 48
// - 寫入: 56
// - sin呼叫次數: 40
// 因此得知了什麼?
// 為何針對此類型不總是 producer.store_root().compute_at(consumer, x)?
//
// 答案在於 parallelism. 在先前兩個策略中, 已經假設先前已計算的數值等待著被使用
// 這假設了時間上在先前發生的 x 或 y 已經結束. 對於已平行化或向量化的 loop 來說,
// 並不是正確的. 若平行化處理後, 若在 store_at 與 compute_at 之間有著平行 loop
// Halide 不會注入跳過計算的優化. 也不會將儲存空間轉為 circular buffer, 這使得
// store_root 失去意義
// 目前已經試過了所有選項. 還能做的新方式是藉由 splitting.
// 能夠 store_at 或 compute_at 在 consumer 的變數 loop 上
// 或是 split x or y 轉為新的內層與外層的子變數然後對此排程
// 這裡合併使用這方式為 tiles
{
Func producer("producer_tile"), consumer("consumer_tile");}
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 以 4x4 tile 的方式來計算 8x8 consumer
Var x_outer, y_outer, x_inner, y_inner;
consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4);
// 計算每個 tile 所需的 producer
producer.compute_at(consumer, x_outer);
// 注意排程是以終端開始撰寫. 這是因為對於 producer 排程需要使用 x_outer
// 這變數是在將 consumer 分為 tile 的時候才導入.
// 是能夠以其他的次序撰寫, 但程式碼將難以閱讀
// 打開 tracing 功能
producer.trace_stores();
consumer.trace_stores();
printf("\nEvaluating:\n"
"consumer.tile(x, y, x_outer, y_outer, x_inner, y_inner, 4, 4);\n"
"producer.compute_at(consumer, x_outer);\n");
consumer.realize(8, 8);
// Click to show output ...
// 以下為視覺化結果
// producer 與 consumer 間的計算現在是基於每個 tile
// 以下為等義的 C code:
float result[8][8];
// 對於每個 consumer tile
for (int y_outer = 0; y_outer < 2; y_outer++) {
for (int x_outer = 0; x_outer < 2; x_outer++) {
// 計算在此 tile 的 x, y 起始座標
int x_base = x_outer*4;
int y_base = y_outer*4;
// 計算足夠的 producer 以滿足此 consumer tile 的計算
// 一個 4x4 consumer tile 需要一個 5x5 producer tile
float producer_storage[5][5];
for (int py = y_base; py < y_base + 5; py++) {
for (int px = x_base; px < x_base + 5; px++) {
producer_storage[py-y_base][px-x_base] = sin(px * py);
}
}
// 計算此 consumer tile
for (int y_inner = 0; y_inner < 4; y_inner++) {
for (int x_inner = 0; x_inner < 4; x_inner++) {
int x = x_base + x_inner;
int y = y_base + y_inner;
result[y][x] =
(producer_storage[y - y_base][x - x_base] +
producer_storage[y - y_base + 1][x - x_base] +
producer_storage[y - y_base][x - x_base + 1] +
producer_storage[y - y_base + 1][x - x_base + 1])/4;
}
}
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 對於會觸及到 x, y 外部的模板問題, 使用 tiling 有其意義.
// 每個 tile 能平行地被獨立計算.
// 而當 tile 夠大時需要重複的工作就不是什麼大問題
// 最後嘗試合併先前做過的 splitting, parallelizing 與 vectoring 的混合策略
// 這方式對於實際上的大型影像運作的不錯
// 若了解這個排程, 就已經掌握了 Halide 中 95% 的排程
{
Func producer("producer_mixed"), consumer("consumer_mixed");
producer(x, y) = sin(x * y);
consumer(x, y) = (producer(x, y) +
producer(x, y+1) +
producer(x+1, y) +
producer(x+1, y+1))/4;
// 將 y 座標以 16-scanline strip 的方式分割
Var yo, yi;
consumer.split(y, yo, yi, 16);
// 使用 thread pool 與 task queeu 來計算這些 strips
consumer.parallel(yo);
// 在 x 方向上作 vectorization
consumer.vectorize(x, 4);
// 將 per-strip producer 資訊存放起來. 這會是 17 條 producer scanlines
// 但希望能折為大小為 2 條 scanline 的 circular buffer
producer.store_at(consumer, yo);
/ 在每個 strip 中計算 consumer 所需的每條 producer scanline,
// 並忽略先前已計算過的 scanline
producer.compute_at(consumer, yi);
// 並且對 producer 作 vectorization (由於在 x86 SSE, sin 是能夠 vectorized 的)
producer.vectorize(x, 4);
// 這次關閉 tracing, 因為需要計算大型影像
// consumer.trace_stores();
// producer.trace_stores();
Buffer halide_result = consumer.realize(160, 160);
// 以下為視覺化結果
// 以下為等義 C code:
float c_result[160][160];
// 對於每個 16-scanline strip (在 Halide 中這層 loop 是平行的)
for (int yo = 0; yo < 160/16 + 1; yo++) {
// 16 無法分割 160, 因此將最後的部分上推
// 讓範圍符合 [0, 159] (見 lesson 05).
int y_base = yo * 16;
if (y_base > 160-16) y_base = 160-16;
// 針對 producer 配置 2-scanline 大小的 circular buffer
float producer_storage[2][161];
// 對於 strip 中 16 條中的每一條 scanline:
for (int yi = 0; yi < 16; yi++) {
int y = y_base + yi;
for (int py = y; py < y+2; py++) {
// 略過在這次工作中已被計算的 scanline
if (yi > 0 && py == y) continue;
// 以 4-wide vector 方式計算這條 producer scanline
for (int x_vec = 0; x_vec < 160/4 + 1; x_vec++) {
int x_base = x_vec*4;
// 4 無法分割 161, 因此將最後的 vector 向左推
// (見 lesson 05).
if (x_base > 161 - 4) x_base = 161 - 4;
// 若在 x86 平台上, Halide 會針對這部分產生 SSE 程式碼:
int x[] = {x_base, x_base + 1, x_base + 2, x_base + 3};
float vec[4] = {sinf(x[0] * py), sinf(x[1] * py),
sinf(x[2] * py), sinf(x[3] * py)};
producer_storage[py & 1][x[0]] = vec[0];
producer_storage[py & 1][x[1]] = vec[1];
producer_storage[py & 1][x[2]] = vec[2];
producer_storage[py & 1][x[3]] = vec[3];
}
}
// 計算這次的 consumer scanline:
for (int x_vec = 0; x_vec < 160/4; x_vec++) {
int x_base = x_vec * 4;
// Again, Halide's equivalent here uses SSE.
int x[] = {x_base, x_base + 1, x_base + 2, x_base + 3};
float vec[] = {
(producer_storage[y & 1][x[0]] +
producer_storage[(y+1) & 1][x[0]] +
producer_storage[y & 1][x[0]+1] +
producer_storage[(y+1) & 1][x[0]+1])/4,
(producer_storage[y & 1][x[1]] +
producer_storage[(y+1) & 1][x[1]] +
producer_storage[y & 1][x[1]+1] +
producer_storage[(y+1) & 1][x[1]+1])/4,
(producer_storage[y & 1][x[2]] +
producer_storage[(y+1) & 1][x[2]] +
producer_storage[y & 1][x[2]+1] +
producer_storage[(y+1) & 1][x[2]+1])/4,
(producer_storage[y & 1][x[3]] +
producer_storage[(y+1) & 1][x[3]] +
producer_storage[y & 1][x[3]+1] +
producer_storage[(y+1) & 1][x[3]+1])/4};
c_result[y][x[0]] = vec[0];
c_result[y][x[1]] = vec[1];
c_result[y][x[2]] = vec[2];
c_result[y][x[3]] = vec[3];
}
}
}
printf("Pseudo-code for the schedule:\n");
consumer.print_loop_nest();
printf("\n");
// Click to show output ...
// 看著程式碼, 讚嘆與絕望
// 檢測 C 與 Halide 的結果
// 藉由這反而發現了 C 實作過程的許多錯誤
// 這應該也能學習到一些
for (int y = 0; y < 160; y++) {
for (int x = 0; x < 160; x++) {
float error = halide_result(x, y) - c_result[y][x];
// It's floating-point math, so we'll allow some slop:
if (error < -0.001f || error > 0.001f) {
printf("halide_result(%d, %d) = %f instead of %f\n",
x, y, halide_result(x, y), c_result[y][x]);
return -1;
}
}
}
}
// 這部分是困難的. 最終是在記憶體頻寬, 重複計算與平行化這三個方向之間的考量
// Halide 無法自動做出正確的決定.
// 相對地它讓探索不同的選項變得簡單, 無需弄亂程式碼.
// 事實上 Halide 保證像是 compute_root 的排程呼叫並不會改變演算本身的意義
// -- 無論如何地排程, 總是會得到相同的結果
// 所以讓自己身經百戰吧 !
// 持續實驗不同的排程並且對效能做紀錄.
// 形成假設並嘗試證明自己錯了
// 不能假設只要以 4-wide vectorization 並且在 8 cores 上執行,
// 以為如此就能得到 32倍的速度, 這樣是不正確的.
// 現代系統複雜到無法不執行程式碼就能可靠正確評估效能
// 建議一開始將所有 stage 以 compute_root 做排程
// 接著自 pipeline 終端往前逐個 stage 作 inline, parallelizing 與 vectorizing
// 如此直到 pipeline 的頭端
// Halide 並不只是將程式碼 vectorizing 與 parallelizing.
// 光是這些優化不足以讓人們走得更遠.
// Halide 重點在於賦予工具能在無需弄亂嘗試得到的結果的情況下
// 快速探索在局部性, 重複計算與平行化等不同方向的考量
//
沒有留言:
張貼留言