Channel กับการสื่อสารระหว่าง Goroutines
Natcha Luangaroonchai
Go มี built-in concurrency ที่เรียกว่า Goroutines การทำงานคล้ายกับ Thread ที่อยู่ใน Java แต่มีขนาดเล็กและเบากว่า ซึ่ง Go เคลมว่าสามารถรันได้ถึงหนึ่งแสน Goroutines
It is practical to create hundreds of thousands of goroutines in the same address space.
แต่สิ่งที่จะเล่าถึงในบล็อกนี้คือ Goroutines, สิ่งที่ใช้ในการสื่อสารกันระหว่าง Goroutines เรียกว่าตัวแปรแชนแนล มาดูว่าแชนแนลใน Go สามารถใช้อะไรได้บ้าง
ตัวแปรแชนแนล
แชนแนลสามารถสร้างจากตัวแปรประเภทไหนก็ได้โดยใส่คีย์เวิร์ด chan
ข้างหน้าและใช้คำสั่ง make
แบบนี้
bufferedCh := make(chan string, 10)
unbufferedCh := make(chan struct{})
ตัวแปรแชนแนลสามารถสร้างได้สองรูปแบบคือมีบัฟเฟอร์และไม่มีบัฟเฟอร์ ตัวอย่างเช่นตัวแปร bufferedCh
ตอนสร้างระบุประเภทเป็น string
มีขนาดบัฟเฟอร์ 10 ในขณะที่ unbufferedCh
เป็นประเภท struct{}
และไม่ระบุบัฟเฟอร์
ความแตกต่างระหว่างแชนแนลที่มีกับไม่มีบัฟเฟอร์?
แชนแนลที่มีบัฟเฟอร์ให้นึกภาพร้านซูชิจานเวียนที่เชฟสามารถปั้นซูชิทีละหลาย ๆ คำและใส่ลงมาบนสายพานโดยที่จำนวนของซูชิที่อยู่บนสายพานได้ขึ้นอยู่กับความยาวของสายพาน ตราบเท่าที่สายพานยังมีที่ว่างเซฟก็สามารถปั้นซูชิใส่ลงมาได้เรื่อย ๆ เหมือนกันกับแชนแนลที่มีบัฟเฟอร์ถ้ายังมีที่ว่างเราก็ยังสามารถใส่ค่าเข้าไปในแชนแนลได้เรื่อย ๆ โดยไม่ต้องมีคนรอรับ
แชนแนลที่ไม่มีบัฟเฟอร์ให้นึกภาพร้านอาหารญี่ปุ่นโอมากาเสะ (Omakase) ตามปกติของการทานร้านโอมากาเสะเซฟจะปั้นซูชิคำถัดไปเมื่อลูกค้าทานคำที่เสิร์ฟในจานหมดก่อนเพื่อรักษาความสดอร่อยของซูชิ เหมือนกันกับแชนแนลที่ไม่มีบัฟเฟอร์เราจะไม่สามารถส่งข้อมูลถัดไปเข้าตัวแปรแชนแนลได้ถ้าข้อมูลก่อนหน้าไม่ถูกดึงออกไปก่อน
ตัวอย่างโค้ดด้านล่างนี้แสดงให้เห็นดึงการเกิด deadlock เพราะไม่มีการดึงข้อมูลออกจากตัวแปร unbufferedCh
ก่อนส่งค่าเข้าไป
package main
func main() {
unbufferedCh := make(chan struct{})
// deadlock!
unbufferedCh <- struct{}{}
}
แผนภาพด้านล่างนี้แสดงถึงการรับส่งข้อมูลระหว่าง Goroutines ผ่านตัวแปรแชนแนลที่ไม่มีบัฟเฟอร์ สังเกตว่าถ้าเป็นการรับ-ส่งข้อมูลผ่านแชนแนลที่ไม่มีบัฟเฟอร์แต่อยู่ต่าง Goroutines กันจะไม่เกิด deadlock
ลองแก้ไขโค้ดตัวอย่างข้างบนให้สามารถรับ-ส่งข้อมูลได้โดยไม่เกิด deadlock ได้แบบนี้
package main
func main() {
unbufferedCh := make(chan struct{})
go func(receiveOnlyCh <-chan struct{}) {
<-receiveOnlyCh
}(unbufferedCh)
unbufferedCh <- struct{}{}
}
ในขณะที่ตัวแปรแชนแนลที่มีบัฟเฟอร์สามารถนั้นสามารถส่งข้อมูลเข้าไปได้ก่อนโดยไม่ต้องรอให้มีการรับใน Goroutines เดียวกันได้เลยแบบนี้
package main
func main() {
bufferedCh := make(chan struct{}, 10)
bufferedCh <- struct{}{}
bufferedCh <- struct{}{}
<-bufferedCh
<-bufferedCh
}
แต่ถ้าเกิดว่ามีการรอรับมากกว่าที่ส่งเข้าไปใน Goroutines เดียวกันก็ยังเกิด deadlock ได้
package main
func main() {
bufferedCh := make(chan struct{}, 10)
bufferedCh <- struct{}{}
bufferedCh <- struct{}{}
<-bufferedCh
<-bufferedCh
<-bufferedCh
}
แผนภาพแสดงตัวอย่างการรับ-ส่งข้อมูลผ่านแชนแนลที่มีบัฟเฟอร์
ทิศทางของแชนแนล
ตามปกติแล้วตัวแปลแชนแนลสามารถรับ-ส่งข้อมูลได้ทั้งสองทาง แต่ถ้าต้องการให้รับหรือส่งได้ทางเดียวสามารถประกาศทิศทางของแชนแนลได้โดยการระบุด้วยเครื่องหมาย <-
ที่ด้านหน้าหรือหลังคำว่า chan
แบบนี้
package main
func main() {
unbufferdCh := make(chan struct{})
receiveOnly(unbufferdCh)
sendOnly(unbufferdCh)
}
// receiveOnly accepts read-only channel.
func receiveOnly(receiveOnlyCh <-chan struct{}) {}
// sendOnly accepts write-only channel.
func sendOnly(sendOnlyCh chan<- struct{}) {}
ถ้าพยายามจะส่งส่งข้อมูลเข้าแชนแนลที่รับทางเดียว Go จะตรวจเจอตั้งแต่ตอนคอมไพล์และฟ้องข้อผิดพลาดแบบนี้
// receiveOnly accepts read-only channel.
func receiveOnly(receiveOnlyCh <-chan struct{}) {
// try to send data to read-only channel
receiveOnlyCh <- struct{}{}
}
./prog.go:12:2: invalid operation: cannot send to receive-only channel receiveOnlyCh (variable of type <-chan struct{})
Go build failed.
Fan-out, fan-in
หนึ่งในรูปแบบการใช้งานแชนแนลโดยส่งตัวแปรแชนแนลผ่านแต่ละฟังก์ชันเพื่อทำงานกับข้อมูลที่อยู่ข้างในแชนแนล
โดยต้นทางฟังก์ชันที่สร้างแชนแนลและส่งออกเรียกว่า fan-out ส่วนอีกทางฟังก์ชันที่รับแชนแนลเข้ามาและรวมผลลัพธ์ให้เหลือตัวเดียวเรียกว่า fan-in
ลองดูตัวอย่างโค้ดด้านล่างนี้
package main
import (
"log"
"sync"
)
func main() {
input := generate(2, 3, 4)
output1 := square(input)
output2 := square(input)
final := merge(output1, output2)
for n := range final {
log.Println(n)
}
}
func generate(numbers ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numbers {
out <- n
}
close(out)
}()
return out
}
func square(input <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range input {
out <- n * n
}
close(out)
}()
return out
}
func merge(outputs ...<-chan int) <-chan int {
out := make(chan int)
wg := sync.WaitGroup{}
wg.Add(len(outputs))
go func() {
wg.Wait()
close(out)
}()
for _, output := range outputs {
go func(output <-chan int) {
for n := range output {
out <- n
}
wg.Done()
}(output)
}
return out
}
ฟังก์ชัน generate
ทำหน้านี้รับ []int
เข้ามาแล้วส่งต่อให้กับตัวแปรแชนแนล out
จากนั้นฟังก์ชัน square
รับตัวแปรแชนแนล out
จาก generate
แล้วนำค่าที่ได้มายกกำลังสองและรีเทิร์นกลับเป็นแชนแนลเหมือนกัน ณ ตรงนี้ฟังก์ชัน square
จะถูกเรียกสองครั้งโดยแต่ละตัวจะแทนหนึ่ง Goroutines ซึ่งหมายความว่าในการรันแต่ละครั้งจำนวนอินพุตที่ square
ได้รับอาจจะแตกต่างกันไป
สุดท้ายคือฟังก์ชัน merge
ที่ทำหน้าที่เอาผลลัพธ์ที่ได้จาก square
มารวมกันให้ออกเพียงหนึ่งแชนแนลเท่านั้นและส่งไป log.Println
ที่ main
ถ้าลองรันโค้ดดูจะพบว่าลำดับของตัวเลขที่แสดงผลออกมานั้นไม่ได้เรียงกันตามอินพุตที่ใส่เข้าไป สาเหตุมาจากว่าตอนที่ส่งไปยัง square
นั้นตัวเลขได้ถูกสุ่มส่งให้กับ output1
หรือ output2
และทำงานจบไม่พร้อมกันทำให้ผลลัพธ์สุดท้ายอาจจะไม่ได้เรียงกันตามอินพุตที่ใส่ไป
ถ้าจะให้เห็นภาพมากขึ้นลองแก้ไขโปรแกรมให้รับอินพุตเป็นตัวเลขจำนวนหลาย ๆ ตัวแล้วสังเกตผลลัพธ์ที่ได้จากการรันในแต่ละครั้งดู
แผนภาพด้านล่างแสดงให้เห็นว่าการส่งข้อมูลไปยัง square
อินพุตสามารถถูกแบ่งออกเป็นสองกลุ่มคือ 2, 3
และ 4
โดยกลุ่มแรกจะได้ออกมาเป็น output1
และกลุ่มสองจะได้ออกมาเป็น output2
Futures
ฟิวเจอร์เป็นรูปแบบของการใช้งาน Goroutines ที่คาดหวังว่าจะได้ผลลัพธ์กลับมาในอนาคตซึ่งในระหว่างที่รอผลลัพธ์สามารถทำงานอย่างอื่นรอได้ รูปแบบของฟิวเจอร์จะพบได้ใน Scala หรือ Promise ใน JavaScript
ตัวอย่างโค้ดด้านล่างนี้มีการเรียกไปยังเว็บไซต์ JSONPlaceholder เพื่อเอาข้อมูลโพสต์กลับมา โดยฟังก์ชัน request
จะยังไม่ได้รีเทิร์น data
กลับมาทันทีแต่เป็น chan data
แทน
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
)
type data struct {
body struct {
UserID int `json:"userId"`
ID int `json:"id"`
Title string `json:"title"`
Body string `json:"body"`
}
err error
}
func main() {
future, err := request(http.MethodGet, "https://jsonplaceholder.typicode.com/posts/1", nil)
if err != nil {
panic(err)
}
// Do other things
d := <-future
if d.err != nil {
panic(d.err)
}
fmt.Printf("%#v\n", d.body)
}
func request(method string, url string, body io.Reader) (<-chan data, error) {
r, err := http.NewRequest(method, url, body)
if err != nil {
return nil, err
}
future := make(chan data)
go func() {
rs, err := http.DefaultClient.Do(r)
if err != nil {
future <- data{err: err}
return
}
defer rs.Body.Close()
var d data
d.err = json.NewDecoder(rs.Body).Decode(&d.body)
future <- d
close(future)
}()
return future, nil
}
แผนภาพด้านล่างแสดงให้เห็นถึงฟังก์ชัน request
ที่รีเทิร์นแชนแนลกลับมาให้ main
ก่อนเพื่อให้ main
สามารถทำงานอย่างอื่นระหว่างรอผลลัพธ์จาก JSONPlaceholder
อ้างอิง