Trang

Saturday, June 7, 2014

Tìm hiểu Thread trong JAVA

Đồng bộ hóa(Synchronized) Thread
Khi nhiều thread cùng tương tác với một đối tượng, bạn cần phải điều khiển chúng một cách cẩn thận để tránh tranh chấp tài nguyên. Bài này giới thiệu những vấn đề có giới thiệu các lỗi thường gặp trong ứng dụng của bạn, khi nào và làm thế nào để sử dụng từ khóa synchronized để điều khiển việc truy cập vào các đối tượng và các biến cùng một thời điểm.
Việc bổ xung từ khóa synchronized vào khai báo phương thức nhằm đảm bảo chỉ có một thread được phép ở bên trong phương thức tại một thời điểm. Trước khi bạn học cách làm thế nào để định nghĩa một method đồng bộ hóa trong chương trình của bạn, hãy xem những gì xảy ra nếu việc đồng bộ hoá không được sử dụng trong một chương trình.
Ví dụ 1: mô phỏng 2 thread cùng truy cập đồng thời vào một phương thức của cùng một đối tượng.
1: public class BothInMethod extends Object {
2:     private String objID;
3:
4:     public BothInMethod(String objID) {
5:         this.objID = objID;
6:     }
7:
8:     public void doStuff(int val) {
9:         print(“entering doStuff()”);
10:         int num = val * 2 + objID.length();
11:         print(“in doStuff() – local variable num=” + num);
12:
13:         // slow things down to make observations
14:         try { Thread.sleep(2000); }
catch ( InterruptedException x ) { }
15:
16:         print(“leaving doStuff()”);
17:     }
18:
19:     public void print(String msg) {
20:         threadPrint(“objID=” + objID + “ – “ + msg);
21:     }
22:
23:     public static void threadPrint(String msg) {
24:         String threadName = Thread.currentThread().getName();
25:         System.out.println(threadName + “: “ + msg);
26:     }
27:
28:     public static void main(String[] args) {
29:         final BothInMethod bim = new BothInMethod(“obj1”);
30:
31:         Runnable runA = new Runnable() {
32:                 public void run() {
33:                     bim.doStuff(3);
34:                 }
35:             };
36:
37:         Thread threadA = new Thread(runA, “threadA”);
38:         threadA.start();
39:
40:         try { Thread.sleep(200); }
catch ( InterruptedException x ) { }
41:
42:         Runnable runB = new Runnable() {
43:                 public void run() {
44:                     bim.doStuff(7);
45:                 }
46:             };
47:
48:         Thread threadB = new Thread(runB, “threadB”);
49:         threadB.start();
50:     }
51: }
Trong phương thức main(), đối tượng BothInMethod được khởi tạo với một identifier là obj1(dòng 29). Tiếp theo, 2 thread được tạo ra để truy cập đồng thời vào phương thức doStuff(). Thread đầu tiên có tên là threadA, cái thứ hai là threadB. Sau khi threadA bắt đầu(dòng 38), nó gọi doStuff() và truyền vào giá trị 3(dòng 33). Khoảng 200 mili giây sau, threadB được bắt đầu và gọi phương thức doStuff() trên cùng một đối tượng và truyền vào giá trị 7.
Cả threadA và threadB cùng ở trong phương thức doStuff()(từ dòng 8->17) tại cùng một thời điểm; threadA vào trước, sau 200 mili giây thì threadB vào. Bên trong doStuff(), biến cục bộ num được tính toán thông qua tham số val và biến thành viên objID(dòng 10). Bởi vì, threadA và threadB đều gán một biến val khác nhau, nên giá trị biến num sẽ khác nhau cho mỗi thread. Phương thức sleep() được sử dụng trong doStuff() nhằm làm chậm lại để đảm bảo rằng cả hai thread đều ở trong cùng một phương thức của cùng một đối tượng một cách đồng thời.
Lưu ý: Nếu hai hay nhiều thread đều ở trong một phương thức đồng thời, thì mỗi thread phải có mỗi bảo sao chép các biến cục bộ.
Kết quả từ ví dụ trên:
threadA: objID=obj1 – entering doStuff()
threadA: objID=obj1 – in doStuff() – local variable num=10
threadB: objID=obj1 – entering doStuff()
threadB: objID=obj1 – in doStuff() – local variable num=18
threadA: objID=obj1 – leaving doStuff()
threadB: objID=obj1 – leaving doStuff()
Các thread được đồng bộ hoá trong Java sử dụng thông qua một monitor. Hãy nghĩ rằng, một monitor là một object cho phép một thread truy cập vào một tài nguyên. Chỉ có một thread sử dụng một monitor vào bất kỳ một khoảng thời gian nào. Các lập trình viên nói rằng, các thread sở hữu monitor vào thời gian đó. Monitor cũng được gọi là một semaphore.
Như chúng ta đã biết, khái niệm semaphore (monitor được Tony Hoare đề xuất) thường được sử dụng để điều khiển đồng bộ các hoạt động truy cập vào những tài nguyên dùng chung. Một luồng muốn truy cập vào một tài nguyên dùng chung (như biến dữ liệu) thì trước tiên nó phải yêu cầu để có được monitor riêng. Khi có được monitor thì luồng như có được “chìa khóa” để “mở cửa” vào miền “tranh chấp” (tài nguyên dùng chung) để sử dụng những tài nguyên đó.
Cơ chế monitor thực hiện hai nguyên tắc đồng bộ chính:
  • Không một luồng nào khác được phân monitor khi có một luồng đã yêu cầu và đang chiếm giữ. Những luồng có yêu cầu monitor sẽ phải chờ cho đến khi monitor được giải phóng.
  • Khi có một luồng giải phóng (ra khỏi) monitor, một trong số các luồng đang chờ monitor có thể truy cập vào tài nguyên dùng chung tương ứng với monitor đó.
Cuối cùng, tác nhiệm của việc yêu cầu một monitor xảy ra đằng sau “màn chắn” trong Java. Java xử lý tất cả các chi tiết đó cho bạn. Bạn phải đồng bộ hoá các thread trong chương trình của bạn nếu như có nhiều hơn một thread sử dụng cùng một tài nguyên.
Trong lập trình có hai cách để thực hiện đồng bộ:
  • Các hàm (hàm) được đồng bộ
  • Các khối được đồng bộ
1.Các hàm được đồng bộ(synchronized method)
Nếu method được đồng bộ hoá là một instance method(phân biệt với static method đối với class), thì method được đồng bộ hóa sẽ kích hoạt lock đi kèm với đối tượng của phương thức đó. Ngược lại, nếu method được đồng bộ hóa là static thì nó kích hoạt lock đi kèm với class định nghĩa method được đồng bộ hoá.
Ví dụ 2: Mô phỏng sử dụng từ khóa synchronized, và chỉ có một thread ở trong một phương thức tại một thời điểm.
1: public class OnlyOneInMethod extends Object {
2:     private String objID;
3:
4:     public OnlyOneInMethod(String objID) {
5:         this.objID = objID;
6:     }
7:
8:     public synchronized void doStuff(int val) {
9:         print(“entering doStuff()”);
10:         int num = val * 2 + objID.length();
11:         print(“in doStuff() – local variable num=” + num);
12:
13:         // slow things down to make observations
14:         try { Thread.sleep(2000); }
catch ( InterruptedException x ) { }
15:
16:         print(“leaving doStuff()”);
17:     }
18:
19:     public void print(String msg) {
20:         threadPrint(“objID=” + objID + “ – “ + msg);
21:     }
22:
23:     public static void threadPrint(String msg) {
24:         String threadName = Thread.currentThread().getName();
25:         System.out.println(threadName + “: “ + msg);
26:     }
27:
28:     public static void main(String[] args) {
29:         final OnlyOneInMethod ooim = new OnlyOneInMethod(“obj1”);
30:
31:         Runnable runA = new Runnable() {
32:                 public void run() {
33:                     ooim.doStuff(3);
34:                 }
35:             };
36:
37:         Thread threadA = new Thread(runA, “threadA”);
38:         threadA.start();
39:
40:         try { Thread.sleep(200); }
catch ( InterruptedException x ) { }
41:
42:         Runnable runB = new Runnable() {
43:                 public void run() {
44:                     ooim.doStuff(7);
45:                 }
46:             };
47:
48:         Thread threadB = new Thread(runB, “threadB”);
49:         threadB.start();
50:     }
51: }
Ở ví dụ trên, ta thấy trong phương thức doStuff() chỉ có một thread tại một thời điểm; threadA đi vào(dòng 1) và đi ra(dòng 3) khỏi phương thức doStuff() trước khi threadB được phép vào(dòng 4). Việc sử dụng modifier synchronized bảo vệ phương thức doStuff() và chỉ cho phép một thread ở trong nó tại một thời điểm.
Dưới đây là kết quả của ví dụ 2, bạn có thể so sánh kết quả của ví dụ 1 để thấy sự khác biệt.
threadA: objID=obj1 – entering doStuff()
threadA: objID=obj1 – in doStuff() – local variable num=10
threadA: objID=obj1 – leaving doStuff()
threadB: objID=obj1 – entering doStuff()
threadB: objID=obj1 – in doStuff() – local variable num=18
threadB: objID=obj1 – leaving doStuff()
2.Các khối được đồng bộ(synchronized statement block)
Các khối được đồng bộ có thể được sử dụng khi toàn bộ phương thức không cần synchronized hoặc khi muốn nhận lock trên một đối tượng khác. Cách sử dụng nó như sau:
synchronized(obj){
// block code
}
Ví dụ:
class Client{
BankAccount account;
// …
public void updateTransaction(){
synchronized(account){       // (1) Khối đồng bộ
account.update();        // (2)
}
}
}
Ngoài ra, có thể sử dụng statement block để thay thế cho phương thức được synchronized như sau:
public synchronized void setPoint(int x, int y) {
this.x = x;
this.y = y;
}
Thay thế bằng statement block như sau:
public void setPoint(int x, int y) {
synchronized ( this ) {
this.x = x;
this.y = y;
}
}

3.Static synchronized method:
Ngoài lock ở cấp độ đối tượng(object-level) cho mỗi instance của lớp, còn có lock ở cấp độ lớp (class-level) được chia sẽ cho tất cả instance của một lớp cụ thể. Mỗi lớp nạp bởi JavaVM có đúng một lock class-level. Nếu một phương thức được khai báo cả 2 từ khóa static và synchronized, một thread phải có được quyền truy cập vào lock class-level trước khi đi vào phương thức.
Lock class-level có thể được sử dụng để truy cập độc quyền vào các biến thành viên static. Cũng như lock object-level cần được ngăn chặn sự sửa đổi dữ liệu của các biến thành viên non-static, lock class-level cũng cần được ngăn chặn sự sửa đổi của các biến thành viên static. Thậm chí là khi không có biến nào liên quan, thì modifier synchronized cũng có thể được sử dụng trên các phương thức static nhằm đảm bảo chỉ có một thread nằm bên trong một phương thức tại một thời điểm.
Để hiểu rõ thêm về vấn đề này, ta xét ví dụ sau:
1: public class StaticNeedSync extends Object {
2:     private static int nextSerialNum = 10001;
3:
4:     public static int getNextSerialNum() {
5:         int sn = nextSerialNum;
6:
7:         // Simulate a delay that is possible if the thread
8:         // scheduler chooses to swap this thread off the
9:         // processor at this point. The delay is exaggerated
10:         // for demonstration purposes.
11:         try { Thread.sleep(1000); }
12:         catch ( InterruptedException x ) { }
13:
14:         nextSerialNum++;
15:         return sn;
16:     }
17:
18:     private static void print(String msg) {
19:         String threadName = Thread.currentThread().getName();
20:         System.out.println(threadName + “: “ + msg);
21:     }
22:
23:     public static void main(String[] args) {
24:         try {
25:             Runnable r = new Runnable() {
26:                     public void run() {
27:                         print(“getNextSerialNum()=” +
28:                                 getNextSerialNum());
29:                     }
30:                 };
31:
32:             Thread threadA = new Thread(r, “threadA”);
33:             threadA.start();
34:
35:             Thread.sleep(1500);
36:
37:             Thread threadB = new Thread(r, “threadB”);
38:             threadB.start();
39:
40:             Thread.sleep(500);
41:
42:             Thread threadC = new Thread(r, “threadC”);
43:             threadC.start();
44:
45:             Thread.sleep(2500);
46:
47:             Thread threadD = new Thread(r, “threadD”);
48:             threadD.start();
49:         } catch ( InterruptedException x ) {
50:             // ignore
51:         }
52:     }
53: }
Lớp StaticNeedSync có một biến thành viên nextSerialNum với khai báo private static, được dùng để lưu giá trị các số tiếp theo sẽ được tạo ra (dòng 2). Phương thức getNextSerialNum ()  (dòng 4-16) là được khai báo public và static. Khi gọi phương thức này, nó có giá trị hiện tại của nextSerialNum và lưu trữ nó trong một biến cục bộ sn (dòng 5). Sau đó, thread gọi phương thức này tạm sleep 1 giây(dòng 11) để một thread thứ 2 thực hiện. Khi các thread nhận được một cơ hội để chạy lại, nó tăng nextSerialNum lên 1 biến để chuẩn bị cho cuộc gọi tiếp theo (dòng 14).
Main thread có 4 thread tương tác với phương thức getNextSerialNum(). Cả 4 thread đều sử dụng cùng một đối tượng Runnable(dòng 25-30). Main thread bắt đầu với threadA(dòng 33) và sleep khoảng 1,5 giây. Thời gian này đủ để threadA vào và trả về từ phương thức getNextSerialNum(). Tiếp theo, main Thread tiếp tục bắt đầu với threadB(dòng 38), sau đó nó sleep khoảng 0,5 giây(dòng 40) trước khi nó bắt đầu threadC(dòng 43). Cả threadB và threadC cùng nằm trong phương thức getNextSerialNum(), và điều này làm nảy sinh một vài vấn đề. Sau khi chờ 2,5 giây (nhiều thời gian cho threadB và threadC trả về), main thread bắt đầu threadD (dòng 45-48). threadD gọi phương thức getNextSerialNum () lần cuối cùng. Kết quả dưới đây mô tả ví dụ trên:
threadA: getNextSerialNum()=10001
threadB: getNextSerialNum()=10002
threadC: getNextSerialNum()=10002
threadD: getNextSerialNum()=10004
Để giải quyết vấn đề trên, ta chỉ cần thêm vào từ khóa synchronized vào phương thức getNextSerialNum().
1: public class StaticSync extends Object {
2:     private static int nextSerialNum = 10001;
3:
4:     public static synchronized int getNextSerialNum() {
5:         int sn = nextSerialNum;
6:
7:         // Simulate a delay that is possible if the thread
8:         // scheduler chooses to swap this thread off the
9:         // processor at this point. The delay is exaggerated
10:         // for demonstration purposes.
11:         try { Thread.sleep(1000); }
12:         catch ( InterruptedException x ) { }
13:
14:         nextSerialNum++;
15:         return sn;
16:     }
17:
18:     private static void print(String msg) {
19:         String threadName = Thread.currentThread().getName();
20:         System.out.println(threadName + “: “ + msg);
21:     }
22:
23:     public static void main(String[] args) {
24:         try {
25:             Runnable r = new Runnable() {
26:                     public void run() {
27:                         print(“getNextSerialNum()=” +
28:                                 getNextSerialNum());
29:                     }
30:                 };
31:
32:             Thread threadA = new Thread(r, “threadA”);
33:             threadA.start();
34:
35:             Thread.sleep(1500);
36:
37:             Thread threadB = new Thread(r, “threadB”);
38:             threadB.start();
39:
40:             Thread.sleep(500);
41:
42:             Thread threadC = new Thread(r, “threadC”);
43:             threadC.start();
44:
45:             Thread.sleep(2500);
46:
47:             Thread threadD = new Thread(r, “threadD”);
48:             threadD.start();
49:         } catch ( InterruptedException x ) {
50:             // ignore
51:         }
52:     }
53: }
Kết quả từ ví dụ trên:
threadA: getNextSerialNum()=10001
threadB: getNextSerialNum()=10002
threadC: getNextSerialNum()=10003
threadD: getNextSerialNum()=10004
Sở dĩ vấn đề trên được giải quyết là do khi threadC vào phương thức getNextSerialNum(), nó lập tức rơi vào trạng thái block và đợi threadB kết thúc nó mới được phép tiếp tục thực hiện trong phương thức getNextSerialNum().
4.Sử dụng Class-level lock trong synchronized statement.
Để sử dụng synchronized statement trong Class-level lock, ta sử dụng theo cú pháp sau:
synchronized ( ClassName.class ) {
// body
}
Ví dụ: ta tạo lớp StaticBlock để mô phỏng kỹ thuật trên như sau:
1: public class StaticBlock extends Object {
2:     public static synchronized void staticA() {
3:         System.out.println(“entering staticA()”);
4:
5:         try { Thread.sleep(5000); }
6:         catch ( InterruptedException x ) { }
7:
8:         System.out.println(“leaving staticA()”);
9:     }
10:
11:     public static void staticB() {
12:         System.out.println(“entering staticB()”);
13:
14:         synchronized ( StaticBlock.class ) {
15:             System.out.println(
16:                     “in staticB() – inside sync block”);
17:
18:             try { Thread.sleep(2000); }
19:             catch ( InterruptedException x ) { }
20:         }
21:
22:         System.out.println(“leaving staticB()”);
23:     }
24:
25:     public static void main(String[] args) {
26:         Runnable runA = new Runnable() {
27:                 public void run() {
28:                     StaticBlock.staticA();
29:                 }
30:             };
31:
32:         Thread threadA = new Thread(runA, “threadA”);
33:         threadA.start();
34:
35:         try { Thread.sleep(200); }
36:         catch ( InterruptedException x ) { }
37:
38:         Runnable runB = new Runnable() {
39:                 public void run() {
40:                     StaticBlock.staticB();
41:                 }
42:             };
43:
44:         Thread threadB = new Thread(runB, “threadB”);
45:         threadB.start();
46:     }
47: }
Trong lớp StaticBlock, phương thức staticA() được khai báo synchronized và static. Phương thức staticB() được khai báo static và có chứa một block synchronized(dòng 14-20). Các đối tượng sử dụng để kiểm soát truy cập vào block này là đối tượng Class cho StaticBlock và được tìm thấy bằng cách sử StaticBlock.class(dòng 14).
Trong phương thức main, threadA được bắt đầu và gọi phương thức staticA()(dòng 28). Sau một khoảng thời gian 200 mili giây, threadB bắt đầu và gọi phương thức staticB(). Trong khi threadA sleep trong phương thức staticA(), threadB vào phương thức staticB(), in một message và đi vào block static synchronized(dòng 14).
Khi threadA trả về từ staticA(), threadB nhận class-level lock và hoàn tất phương thức staticB().
Sau đây là kết quả của ví dụ trên:
1: entering staticA()
2: entering staticB()
3: leaving staticA()
4: in staticB() – inside sync block
5: leaving staticB()
Lưu ý rằng, mặc dù threadB có thể vào phương thức staticB()(dòng 2) nhưng nó không thể vào để thực hiện block synchronized(dòng 4) cho đến khi threadA trả về từ staticA()(dòng 3). ThreadB sẽ rơi vào trạng thái block cho tới khi threadA giải phóng class-level lock.
5.Deadlocks:
Khi sử dụng nhiều thread truy cập vào các đối tượng có giữ lock, nếu không cẩn thận thì rất có thể xảy ra tình trạng deadlocks. Đó là khi một threadA nắm giữ lock1, một threadB nắm giữ lock2. Trong khi threadA nắm giữ lock1, nó lại muốn nắm giữ thêm lock2, nhưng vì threadB đang nắm giữ lock2 nên threadA sẽ rơi vào trạng thái block, và nó phải đợi cho tới khi threadB giải phóng lock2. Tuy nhiên, vào thời điểm đó, trong khi threadB đang nắm giữ lock2, nó lại muốn tiếp tục nắm giữ lock1, và lock1 đang được nắm giữ bởi threadA, nên threadB lại rơi vào trạng thái block. Lúc này, cả threadA và threadB rơi vào trạng thái block mãi mãi vì phải đợi thread kia giải phóng lock. Đây chính là trường hợp deadlocks.
Ví dụ:
1: public class Deadlock extends Object {
2:     private String objID;
3:
4:     public Deadlock(String id) {
5:         objID = id;
6:     }
7:
8:     public synchronized void checkOther(Deadlock other) {
9:         print(“entering checkOther()”);
10:
11:         // simulate some lengthy process
12:         try { Thread.sleep(2000); }
13:         catch ( InterruptedException x ) { }
14:
15:         print(“in checkOther() – about to “ +
16:                 “invoke ‘other.action()’”);
17:         other.action();
18:
19:         print(“leaving checkOther()”);
20:     }
21:
22:     public synchronized void action() {
23:         print(“entering action()”);
24:
25:         // simulate some work here
26:         try { Thread.sleep(500); }
27:         catch ( InterruptedException x ) { }
28:
29:         print(“leaving action()”);
30:     }
31:
32:     public void print(String msg) {
33:         threadPrint(“objID=” + objID + “ – “ + msg);
34:     }
35:
36:     public static void threadPrint(String msg) {
37:         String threadName = Thread.currentThread().getName();
38:         System.out.println(threadName + “: “ + msg);
39:     }
40:
41:     public static void main(String[] args) {
42:         final Deadlock obj1 = new Deadlock(“obj1”);
43:         final Deadlock obj2 = new Deadlock(“obj2”);
44:
45:         Runnable runA = new Runnable() {
46:                 public void run() {
47:                     obj1.checkOther(obj2);
48:                 }
49:             };
50:
51:         Thread threadA = new Thread(runA, “threadA”);
52:         threadA.start();
53:
54:         try { Thread.sleep(200); }
55:         catch ( InterruptedException x ) { }
56:
57:         Runnable runB = new Runnable() {
58:                 public void run() {
59:                     obj2.checkOther(obj1);
60:                 }
61:             };
62:
63:         Thread threadB = new Thread(runB, “threadB”);
64:         threadB.start();
65:
66:         try { Thread.sleep(5000); }
67:         catch ( InterruptedException x ) { }
68:
69:         threadPrint(“finished sleeping”);
70:
71:         threadPrint(“about to interrupt() threadA”);
72:         threadA.interrupt();
73:
74:         try { Thread.sleep(1000); }
75:         catch ( InterruptedException x ) { }
76:
77:         threadPrint(“about to interrupt() threadB”);
78:         threadB.interrupt();
79:
80:         try { Thread.sleep(1000); }
81:         catch ( InterruptedException x ) { }
82:
83:         threadPrint(“did that break the deadlock?”);
84:     }
85: }
Trong phương thức main(), có hai instance của Deadlock được tạo ra và hai thread được bắt đầu, mỗi thread chạy trên một instance; instance thứ nhất được thiết lập là obj1, cái kia là obj2(dòng 42-43). ThreadA được start và gọi phương thức checkOther() của obj1 và gán một tham chiếu đến obj2(dòng 47), sau khi sleep khoảng 200 mili giây, threadB được start và gọi phương thức checkOther() của obj2 và nó gán một tham chiếu tới obj1. Phương thức checkOther() là synchronized nên thread truy cập vào nó sẽ nhận lock. Cả threadA và threadB sẽ không bao giờ thực hiện được phương thức action(), vì khi threadA đang ở trong phương thức checkOther() nó nắm giữ lock trên obj1, thì threadB cũng ở trong phương thức checkOther() và nắm giữ lock của obj2. Khi threadA cố gắng gọi phương thức action() trong obj2, nó sẽ bị block do threadB đang nắm giữ lock trên obj2, và nó sẽ đợi cho tới khi threadB giải phóng lock trên obj2. Sau đó, threadB gọi phương thức action() trên obj1 và cũng rơi vào trạng thái block do lock trên obj1 đã bị threadA chiếm giữ; main thread sleep trong 5 giây khi deadlock được tạo ra(dòng 66), khi tỉnh dậy, nó cố gắng phá vỡ deadlock bằng cách interrupt threadA(dòng 72), sau 1 giây nó tiếp tục cố gắng interrupt threadB. Thật không may, điều này không phá vỡ deadlock, bởi khi một thread đang bị block vì đợi để có được lock, nó không đáp ứng với interrupt.
Kết quả từ ví dụ trên:
threadA: objID=obj1 – entering checkOther()
threadB: objID=obj2 – entering checkOther()
threadA: objID=obj1 – in checkOther() – about to invoke ‘other.action()’
threadB: objID=obj2 – in checkOther() – about to invoke ‘other.action()’
main: finished sleeping
main: about to interrupt() threadA
main: about to interrupt() threadB
main: did that break the deadlock?

No comments:

Post a Comment