Java Concurrency: Từ Thread truyền thống đến Virtual Threads (Java 21)
Lời mở đầu
Chào mọi người! Sau khi xây dựng Chat Server, hôm nay mình muốn đi sâu vào Java Concurrency - một trong những topics “khó nhằn” nhưng cực kỳ quan trọng.
Đặc biệt, Java 21 vừa ra mắt Virtual Threads - một game changer trong cách chúng ta viết concurrent code. Mình sẽ so sánh từ cách “xưa” đến cách “mới” để các bạn thấy sự tiến bộ!
🧵 Phần 1: Threads truyền thống
1. Thread là gì?
Thread là đơn vị thực thi nhỏ nhất trong một process.
Analogy:
Process = Nhà hàng
Thread = Nhân viên phục vụ
1 thread: 1 nhân viên → Phục vụ tuần tự
Multi-thread: Nhiều nhân viên → Phục vụ song song
2. Tạo Thread trong Java
Cách 1: Extend Thread
class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread running: " +
Thread.currentThread().getName());
}
}
public class Main {
public static void main(String[] args) {
MyThread t1 = new MyThread();
MyThread t2 = new MyThread();
t1.start(); // Khởi động thread
t2.start();
// ❌ t1.run() - Sai! Sẽ chạy trên main thread
}
}
Cách 2: Implement Runnable (Preferred)
class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable running");
}
}
public class Main {
public static void main(String[] args) {
Thread t1 = new Thread(new MyRunnable());
t1.start();
// Lambda expression (Java 8+)
Thread t2 = new Thread(() -> {
System.out.println("Lambda thread");
});
t2.start();
}
}
Tại sao Runnable tốt hơn?
- ✅ Tách logic khỏi Thread class
- ✅ Vẫn extends class khác được
- ✅ Reuse Runnable cho nhiều threads
- ✅ Work với ExecutorService
3. Thread Lifecycle
┌──────────┐
│ NEW │ Thread created nhưng chưa start
└────┬─────┘
│ start()
▼
┌──────────┐
│ RUNNABLE │ Đang chạy hoặc sẵn sàng chạy
└────┬─────┘
│ Scheduler picks thread
▼
┌──────────┐ sleep()/wait() ┌───────────┐
│ RUNNING │─────────────────────────▶│ BLOCKED/ │
│ │◀─────────────────────────│ WAITING │
└────┬─────┘ notify()/interrupt └───────────┘
│
│ run() completes
▼
┌──────────┐
│TERMINATED│ Thread kết thúc
└──────────┘
4. Thread Methods
sleep():
try {
Thread.sleep(1000); // Sleep 1 giây
} catch (InterruptedException e) {
e.printStackTrace();
}
join():
Thread t1 = new Thread(() -> {
System.out.println("Task 1");
});
t1.start();
t1.join(); // Đợi t1 kết thúc
System.out.println("Task 2"); // Chạy sau t1
interrupt():
Thread t = new Thread(() -> {
while (!Thread.interrupted()) {
// Do work
}
});
t.start();
Thread.sleep(5000);
t.interrupt(); // Dừng thread
priority:
Thread t1 = new Thread(() -> { });
t1.setPriority(Thread.MAX_PRIORITY); // 10
t1.setPriority(Thread.MIN_PRIORITY); // 1
t1.setPriority(Thread.NORM_PRIORITY); // 5 (default)
5. Thread Synchronization
Race Condition:
class Counter {
private int count = 0;
public void increment() {
count++; // NOT atomic!
// Thực tế: read → add 1 → write
}
public int getCount() {
return count;
}
}
// 2 threads cùng increment
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println(counter.getCount());
// Expected: 2000
// Actual: ~1800 (random) ❌
Solution 1: synchronized keyword
class Counter {
private int count = 0;
public synchronized void increment() {
count++; // Now thread-safe ✅
}
public synchronized int getCount() {
return count;
}
}
Solution 2: synchronized block
class Counter {
private int count = 0;
private Object lock = new Object();
public void increment() {
synchronized (lock) {
count++;
}
}
}
Solution 3: AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;
class Counter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // Atomic operation
}
public int getCount() {
return count.get();
}
}
6. Deadlock
Ví dụ classic:
Object lock1 = new Object();
Object lock2 = new Object();
Thread t1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("T1 has lock1");
Thread.sleep(100);
synchronized (lock2) { // Đợi lock2
System.out.println("T1 has lock2");
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("T2 has lock2");
Thread.sleep(100);
synchronized (lock1) { // Đợi lock1
System.out.println("T2 has lock1");
}
}
});
t1.start();
t2.start();
// Deadlock! ❌
// T1 giữ lock1, đợi lock2
// T2 giữ lock2, đợi lock1
Solution: Lock ordering
// Luôn acquire locks theo thứ tự cố định
Thread t1 = new Thread(() -> {
synchronized (lock1) { // Luôn lock1 trước
synchronized (lock2) {
// Work
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock1) { // Luôn lock1 trước
synchronized (lock2) {
// Work
}
}
});
🎯 Phần 2: ExecutorService
1. Vấn đề với Thread trực tiếp
// Bad practice ❌
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
// Task
}).start();
}
// Vấn đề:
// - Tạo 1000 threads → Tốn RAM
// - Context switching → Chậm
// - Không control được
// - Không reuse threads
2. ExecutorService - Thread Pool
Fixed Thread Pool:
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executor.submit(() -> {
// Task
});
}
executor.shutdown(); // Không nhận task mới
executor.awaitTermination(1, TimeUnit.MINUTES); // Đợi hoàn thành
Lợi ích:
- ✅ Reuse 10 threads cho 1000 tasks
- ✅ Queue tasks tự động
- ✅ Giới hạn resource usage
- ✅ Dễ quản lý
3. Các loại ExecutorService
Fixed Thread Pool:
ExecutorService executor = Executors.newFixedThreadPool(5);
// 5 threads cố định
// Queue không giới hạn
Cached Thread Pool:
ExecutorService executor = Executors.newCachedThreadPool();
// Tạo thread mới khi cần
// Reuse threads idle
// Tự động terminate threads sau 60s idle
Single Thread Executor:
ExecutorService executor = Executors.newSingleThreadExecutor();
// Chỉ 1 thread
// Tasks chạy tuần tự
Scheduled Thread Pool:
ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(5);
// Delay 5 giây
scheduler.schedule(() -> {
System.out.println("Delayed task");
}, 5, TimeUnit.SECONDS);
// Repeat mỗi 10 giây
scheduler.scheduleAtFixedRate(() -> {
System.out.println("Periodic task");
}, 0, 10, TimeUnit.SECONDS);
// Delay 10 giây sau khi task trước hoàn thành
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("Fixed delay task");
}, 0, 10, TimeUnit.SECONDS);
4. Callable vs Runnable
Runnable:
Runnable task = () -> {
System.out.println("No return value");
};
Callable:
Callable<Integer> task = () -> {
return 42; // Can return value
};
Future<Integer> future = executor.submit(task);
Integer result = future.get(); // Blocking
5. Future
Basic usage:
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "Result";
});
System.out.println("Doing other work...");
String result = future.get(); // Block until done
System.out.println(result);
With timeout:
try {
String result = future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("Timeout!");
future.cancel(true); // Cancel task
}
Check status:
future.isDone(); // Task hoàn thành?
future.isCancelled(); // Task bị cancel?
future.cancel(true); // Cancel task
6. CompletableFuture (Java 8+)
Problem with Future:
// Cannot chain operations
// Cannot combine multiple futures
// Cannot handle errors elegantly
CompletableFuture - Modern approach:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// Async task
return "Hello";
});
future
.thenApply(s -> s + " World") // Transform
.thenApply(String::toUpperCase) // Chain
.thenAccept(System.out::println) // Consume
.exceptionally(ex -> { // Handle error
System.err.println("Error: " + ex);
return null;
});
Combining futures:
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(() -> 20);
// Combine results
CompletableFuture<Integer> combined = future1.thenCombine(
future2,
(a, b) -> a + b
);
System.out.println(combined.get()); // 30
All vs Any:
CompletableFuture<?>[] futures = {future1, future2, future3};
// Đợi tất cả hoàn thành
CompletableFuture.allOf(futures).join();
// Đợi bất kỳ 1 cái hoàn thành
CompletableFuture.anyOf(futures).join();
🚀 Phần 3: Virtual Threads (Java 21)
1. Vấn đề với Platform Threads
Platform Threads (threads truyền thống):
Platform Thread = OS Thread
1 Platform Thread = ~2MB RAM
10,000 threads = ~20GB RAM ❌
Context switching:
- Expensive
- Limited by OS
- Practical limit: ~few thousands
Ví dụ thực tế:
// Web server với 10,000 concurrent requests
ExecutorService executor = Executors.newFixedThreadPool(10000);
// Vấn đề:
// - 20GB RAM chỉ cho threads
// - Context switching overhead
// - OS limit
2. Virtual Threads - Game Changer
Virtual Threads:
Virtual Thread ≠ OS Thread
Nhiều Virtual Threads → Ít Platform Threads
1 Virtual Thread ≈ ~1KB RAM
1,000,000 threads = ~1GB RAM ✅
JVM quản lý mapping Virtual → Platform
Tạo Virtual Thread:
// Cách 1: Thread.ofVirtual()
Thread vThread = Thread.ofVirtual().start(() -> {
System.out.println("Virtual thread!");
});
// Cách 2: Thread.startVirtualThread()
Thread.startVirtualThread(() -> {
System.out.println("Another virtual thread!");
});
// Cách 3: ExecutorService
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
executor.submit(() -> {
System.out.println("Virtual thread from executor!");
});
3. So sánh Platform vs Virtual
Platform Threads:
long start = System.currentTimeMillis();
try (ExecutorService executor = Executors.newFixedThreadPool(100)) {
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start) + "ms");
// Time: ~100 seconds (10000 tasks / 100 threads)
// RAM: ~200MB
Virtual Threads:
long start = System.currentTimeMillis();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 10000; i++) {
executor.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start) + "ms");
// Time: ~1 second (all run concurrently!)
// RAM: ~50MB
4. HTTP Server với Virtual Threads
Platform Threads:
public class PlatformThreadServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8080);
ExecutorService executor = Executors.newFixedThreadPool(100);
while (true) {
Socket client = server.accept();
executor.submit(() -> handleClient(client));
}
// Limit: 100 concurrent connections
}
}
Virtual Threads:
public class VirtualThreadServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8080);
while (true) {
Socket client = server.accept();
Thread.startVirtualThread(() -> handleClient(client));
}
// No limit! Can handle millions of connections
}
}
5. Structured Concurrency (Java 21)
Problem với unstructured concurrency:
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Future<String> future1 = executor.submit(() -> fetchUser());
Future<String> future2 = executor.submit(() -> fetchOrders());
// Vấn đề:
// - Future1 fail → Future2 vẫn chạy
// - Phải manually cancel
// - Khó track lifecycle
Structured Concurrency:
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Future<String> user = scope.fork(() -> fetchUser());
Future<String> orders = scope.fork(() -> fetchOrders());
scope.join(); // Đợi tất cả
scope.throwIfFailed(); // Throw nếu có lỗi
// Cả 2 thành công mới đến đây
String userData = user.resultNow();
String ordersData = orders.resultNow();
} // Auto cleanup khi exit scope
ShutdownOnSuccess:
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> fetchFromCache());
scope.fork(() -> fetchFromDB());
scope.fork(() -> fetchFromAPI());
scope.join();
// Lấy kết quả từ cái đầu tiên thành công
String result = scope.result();
}
6. Scoped Values (Java 21)
Problem với ThreadLocal:
// ThreadLocal không work tốt với Virtual Threads
ThreadLocal<String> userId = new ThreadLocal<>();
// Virtual threads rất nhiều → Memory overhead
Scoped Values:
import java.lang.ScopedValue;
public class UserContext {
public static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
public static void main(String[] args) {
ScopedValue.where(USER_ID, "user123")
.run(() -> {
// USER_ID available trong scope này
processRequest();
});
// USER_ID không available ngoài scope
}
static void processRequest() {
String userId = USER_ID.get();
System.out.println("Processing for: " + userId);
}
}
📊 Benchmark: Platform vs Virtual Threads
Test Case: 100,000 HTTP requests
Platform Threads (Pool 200):
Threads: 200 platform threads
Time: 500 seconds
RAM: 400MB
CPU: 60%
Throughput: 200 req/s
Virtual Threads:
Threads: 100,000 virtual threads
Time: 10 seconds
RAM: 150MB
CPU: 70%
Throughput: 10,000 req/s
Kết luận:
- 🚀 50x faster
- 💾 60% less RAM
- 📈 50x higher throughput
🎨 Best Practices
1. Khi nào dùng Virtual Threads?
✅ Dùng Virtual Threads khi:
- I/O-bound tasks (network, database, file)
- Blocking operations
- High concurrency (thousands/millions)
- Web servers, microservices
- Chat servers, real-time apps
❌ KHÔNG dùng Virtual Threads khi:
- CPU-bound tasks (tính toán nặng)
- synchronized blocks (pinning issue)
- Native code (JNI)
- Need fine-grained control
2. Avoid Pinning
Pinning = Virtual thread pin to platform thread
Bad:
synchronized (lock) {
// Blocking I/O trong synchronized → Pinning!
Thread.sleep(1000);
}
Good:
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// Blocking I/O OK với ReentrantLock
Thread.sleep(1000);
} finally {
lock.unlock();
}
3. Migration từ Platform → Virtual
Before:
ExecutorService executor = Executors.newFixedThreadPool(100);
After:
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
That’s it! 🎉
4. Structured Concurrency Pattern
public class UserService {
public UserProfile getUserProfile(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// Fork parallel tasks
Future<User> userFuture =
scope.fork(() -> fetchUser(userId));
Future<List<Order>> ordersFuture =
scope.fork(() -> fetchOrders(userId));
Future<Address> addressFuture =
scope.fork(() -> fetchAddress(userId));
// Wait for all
scope.join();
scope.throwIfFailed();
// Combine results
return new UserProfile(
userFuture.resultNow(),
ordersFuture.resultNow(),
addressFuture.resultNow()
);
}
}
}
🎓 Lessons Learned
From Experience
1. Virtual Threads không phải silver bullet
CPU-bound: Platform threads vẫn tốt hơn
GPU tasks: Cần native libraries
2. Monitor với JFR (Java Flight Recorder)
java -XX:StartFlightRecording=filename=recording.jfr MyApp
3. Test với realistic workload
// Không chỉ test với sleep()
// Test với real I/O: DB, HTTP, File
4. Upgrade dependencies
Một số libraries chưa optimize cho Virtual Threads
Check compatibility!
🚀 Real-world Example: Chat Server
Platform Threads Version:
public class PlatformChatServer {
private static ExecutorService executor =
Executors.newFixedThreadPool(100);
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(5000);
while (true) {
Socket client = server.accept();
executor.submit(() -> handleClient(client));
}
// Giới hạn: 100 concurrent clients
}
}
Virtual Threads Version:
public class VirtualChatServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(5000);
while (true) {
Socket client = server.accept();
Thread.startVirtualThread(() -> handleClient(client));
}
// Không giới hạn! Có thể handle millions!
}
private static void handleClient(Socket socket) {
try (
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(
socket.getOutputStream(), true)
) {
String message;
while ((message = in.readLine()) != null) {
// Blocking I/O - No problem với Virtual Threads
broadcast(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
🎯 Kết luận
Java Concurrency đã tiến hóa rất nhiều:
2004: Java 5
- ExecutorService
- Concurrent collections
- Locks framework
2011: Java 7
- Fork/Join framework
- Parallel streams
2014: Java 8
- CompletableFuture
- Parallel streams
2023: Java 21 🎉
- Virtual Threads
- Structured Concurrency
- Scoped Values
Key Takeaways:
- Platform threads cho CPU-bound
- Virtual threads cho I/O-bound
- ExecutorService vẫn relevant
- CompletableFuture cho async programming
- Structured Concurrency cho code rõ ràng hơn
Tương lai:
- Virtual threads sẽ trở thành default
- Frameworks (Spring, Quarkus) đang adopt
- Java tiếp tục evolve
Resources recommend:
📚 Books:
- Java Concurrency in Practice (Brian Goetz)
- Effective Java (Joshua Bloch)
🌐 Reading:
- JEP 444: Virtual Threads
- Project Loom documentation
🛠️ Practice:
- Migrate existing apps to Virtual Threads
- Build high-throughput server
- Benchmark Platform vs Virtual
Cảm ơn đã đọc series về Java Networking và Concurrency! Hi vọng các bạn học được nhiều điều bổ ích! 🚀
#Java #Concurrency #VirtualThreads #Java21 #Multithreading