Java Concurrency: Từ Thread truyền thống đến Virtual Threads (Java 21)


Lời mở đầu

Chào mọi người! Sau khi xây dựng Chat Server, hôm nay mình muốn đi sâu vào Java Concurrency - một trong những topics “khó nhằn” nhưng cực kỳ quan trọng.

Đặc biệt, Java 21 vừa ra mắt Virtual Threads - một game changer trong cách chúng ta viết concurrent code. Mình sẽ so sánh từ cách “xưa” đến cách “mới” để các bạn thấy sự tiến bộ!

🧵 Phần 1: Threads truyền thống

1. Thread là gì?

Thread là đơn vị thực thi nhỏ nhất trong một process.

Analogy:

Process = Nhà hàng
Thread = Nhân viên phục vụ

1 thread: 1 nhân viên → Phục vụ tuần tự
Multi-thread: Nhiều nhân viên → Phục vụ song song

2. Tạo Thread trong Java

Cách 1: Extend Thread

class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread running: " + 
            Thread.currentThread().getName());
    }
}

public class Main {
    public static void main(String[] args) {
        MyThread t1 = new MyThread();
        MyThread t2 = new MyThread();
        
        t1.start();  // Khởi động thread
        t2.start();
        
        // ❌ t1.run() - Sai! Sẽ chạy trên main thread
    }
}

Cách 2: Implement Runnable (Preferred)

class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable running");
    }
}

public class Main {
    public static void main(String[] args) {
        Thread t1 = new Thread(new MyRunnable());
        t1.start();
        
        // Lambda expression (Java 8+)
        Thread t2 = new Thread(() -> {
            System.out.println("Lambda thread");
        });
        t2.start();
    }
}

Tại sao Runnable tốt hơn?

  • ✅ Tách logic khỏi Thread class
  • ✅ Vẫn extends class khác được
  • ✅ Reuse Runnable cho nhiều threads
  • ✅ Work với ExecutorService

3. Thread Lifecycle

┌──────────┐
│   NEW    │  Thread created nhưng chưa start
└────┬─────┘
     │ start()

┌──────────┐
│ RUNNABLE │  Đang chạy hoặc sẵn sàng chạy
└────┬─────┘
     │ Scheduler picks thread

┌──────────┐     sleep()/wait()      ┌───────────┐
│ RUNNING  │─────────────────────────▶│  BLOCKED/ │
│          │◀─────────────────────────│  WAITING  │
└────┬─────┘    notify()/interrupt   └───────────┘

     │ run() completes

┌──────────┐
│TERMINATED│  Thread kết thúc
└──────────┘

4. Thread Methods

sleep():

try {
    Thread.sleep(1000);  // Sleep 1 giây
} catch (InterruptedException e) {
    e.printStackTrace();
}

join():

Thread t1 = new Thread(() -> {
    System.out.println("Task 1");
});

t1.start();
t1.join();  // Đợi t1 kết thúc

System.out.println("Task 2");  // Chạy sau t1

interrupt():

Thread t = new Thread(() -> {
    while (!Thread.interrupted()) {
        // Do work
    }
});

t.start();
Thread.sleep(5000);
t.interrupt();  // Dừng thread

priority:

Thread t1 = new Thread(() -> { });
t1.setPriority(Thread.MAX_PRIORITY);  // 10
t1.setPriority(Thread.MIN_PRIORITY);  // 1
t1.setPriority(Thread.NORM_PRIORITY); // 5 (default)

5. Thread Synchronization

Race Condition:

class Counter {
    private int count = 0;
    
    public void increment() {
        count++;  // NOT atomic!
        // Thực tế: read → add 1 → write
    }
    
    public int getCount() {
        return count;
    }
}

// 2 threads cùng increment
Thread t1 = new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        counter.increment();
    }
});

Thread t2 = new Thread(() -> {
    for (int i = 0; i < 1000; i++) {
        counter.increment();
    }
});

t1.start();
t2.start();
t1.join();
t2.join();

System.out.println(counter.getCount());
// Expected: 2000
// Actual: ~1800 (random) ❌

Solution 1: synchronized keyword

class Counter {
    private int count = 0;
    
    public synchronized void increment() {
        count++;  // Now thread-safe ✅
    }
    
    public synchronized int getCount() {
        return count;
    }
}

Solution 2: synchronized block

class Counter {
    private int count = 0;
    private Object lock = new Object();
    
    public void increment() {
        synchronized (lock) {
            count++;
        }
    }
}

Solution 3: AtomicInteger

import java.util.concurrent.atomic.AtomicInteger;

class Counter {
    private AtomicInteger count = new AtomicInteger(0);
    
    public void increment() {
        count.incrementAndGet();  // Atomic operation
    }
    
    public int getCount() {
        return count.get();
    }
}

6. Deadlock

Ví dụ classic:

Object lock1 = new Object();
Object lock2 = new Object();

Thread t1 = new Thread(() -> {
    synchronized (lock1) {
        System.out.println("T1 has lock1");
        Thread.sleep(100);
        
        synchronized (lock2) {  // Đợi lock2
            System.out.println("T1 has lock2");
        }
    }
});

Thread t2 = new Thread(() -> {
    synchronized (lock2) {
        System.out.println("T2 has lock2");
        Thread.sleep(100);
        
        synchronized (lock1) {  // Đợi lock1
            System.out.println("T2 has lock1");
        }
    }
});

t1.start();
t2.start();

// Deadlock! ❌
// T1 giữ lock1, đợi lock2
// T2 giữ lock2, đợi lock1

Solution: Lock ordering

// Luôn acquire locks theo thứ tự cố định
Thread t1 = new Thread(() -> {
    synchronized (lock1) {  // Luôn lock1 trước
        synchronized (lock2) {
            // Work
        }
    }
});

Thread t2 = new Thread(() -> {
    synchronized (lock1) {  // Luôn lock1 trước
        synchronized (lock2) {
            // Work
        }
    }
});

🎯 Phần 2: ExecutorService

1. Vấn đề với Thread trực tiếp

// Bad practice ❌
for (int i = 0; i < 1000; i++) {
    new Thread(() -> {
        // Task
    }).start();
}

// Vấn đề:
// - Tạo 1000 threads → Tốn RAM
// - Context switching → Chậm
// - Không control được
// - Không reuse threads

2. ExecutorService - Thread Pool

Fixed Thread Pool:

ExecutorService executor = Executors.newFixedThreadPool(10);

for (int i = 0; i < 1000; i++) {
    executor.submit(() -> {
        // Task
    });
}

executor.shutdown();  // Không nhận task mới
executor.awaitTermination(1, TimeUnit.MINUTES);  // Đợi hoàn thành

Lợi ích:

  • ✅ Reuse 10 threads cho 1000 tasks
  • ✅ Queue tasks tự động
  • ✅ Giới hạn resource usage
  • ✅ Dễ quản lý

3. Các loại ExecutorService

Fixed Thread Pool:

ExecutorService executor = Executors.newFixedThreadPool(5);
// 5 threads cố định
// Queue không giới hạn

Cached Thread Pool:

ExecutorService executor = Executors.newCachedThreadPool();
// Tạo thread mới khi cần
// Reuse threads idle
// Tự động terminate threads sau 60s idle

Single Thread Executor:

ExecutorService executor = Executors.newSingleThreadExecutor();
// Chỉ 1 thread
// Tasks chạy tuần tự

Scheduled Thread Pool:

ScheduledExecutorService scheduler = 
    Executors.newScheduledThreadPool(5);

// Delay 5 giây
scheduler.schedule(() -> {
    System.out.println("Delayed task");
}, 5, TimeUnit.SECONDS);

// Repeat mỗi 10 giây
scheduler.scheduleAtFixedRate(() -> {
    System.out.println("Periodic task");
}, 0, 10, TimeUnit.SECONDS);

// Delay 10 giây sau khi task trước hoàn thành
scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("Fixed delay task");
}, 0, 10, TimeUnit.SECONDS);

4. Callable vs Runnable

Runnable:

Runnable task = () -> {
    System.out.println("No return value");
};

Callable:

Callable<Integer> task = () -> {
    return 42;  // Can return value
};

Future<Integer> future = executor.submit(task);
Integer result = future.get();  // Blocking

5. Future

Basic usage:

Future<String> future = executor.submit(() -> {
    Thread.sleep(2000);
    return "Result";
});

System.out.println("Doing other work...");

String result = future.get();  // Block until done
System.out.println(result);

With timeout:

try {
    String result = future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    System.out.println("Timeout!");
    future.cancel(true);  // Cancel task
}

Check status:

future.isDone();      // Task hoàn thành?
future.isCancelled(); // Task bị cancel?
future.cancel(true);  // Cancel task

6. CompletableFuture (Java 8+)

Problem with Future:

// Cannot chain operations
// Cannot combine multiple futures
// Cannot handle errors elegantly

CompletableFuture - Modern approach:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // Async task
    return "Hello";
});

future
    .thenApply(s -> s + " World")        // Transform
    .thenApply(String::toUpperCase)      // Chain
    .thenAccept(System.out::println)     // Consume
    .exceptionally(ex -> {               // Handle error
        System.err.println("Error: " + ex);
        return null;
    });

Combining futures:

CompletableFuture<Integer> future1 = 
    CompletableFuture.supplyAsync(() -> 10);

CompletableFuture<Integer> future2 = 
    CompletableFuture.supplyAsync(() -> 20);

// Combine results
CompletableFuture<Integer> combined = future1.thenCombine(
    future2, 
    (a, b) -> a + b
);

System.out.println(combined.get());  // 30

All vs Any:

CompletableFuture<?>[] futures = {future1, future2, future3};

// Đợi tất cả hoàn thành
CompletableFuture.allOf(futures).join();

// Đợi bất kỳ 1 cái hoàn thành
CompletableFuture.anyOf(futures).join();

🚀 Phần 3: Virtual Threads (Java 21)

1. Vấn đề với Platform Threads

Platform Threads (threads truyền thống):

Platform Thread = OS Thread
1 Platform Thread = ~2MB RAM
10,000 threads = ~20GB RAM ❌

Context switching:
- Expensive
- Limited by OS
- Practical limit: ~few thousands

Ví dụ thực tế:

// Web server với 10,000 concurrent requests
ExecutorService executor = Executors.newFixedThreadPool(10000);

// Vấn đề:
// - 20GB RAM chỉ cho threads
// - Context switching overhead
// - OS limit

2. Virtual Threads - Game Changer

Virtual Threads:

Virtual Thread ≠ OS Thread
Nhiều Virtual Threads → Ít Platform Threads

1 Virtual Thread ≈ ~1KB RAM
1,000,000 threads = ~1GB RAM ✅

JVM quản lý mapping Virtual → Platform

Tạo Virtual Thread:

// Cách 1: Thread.ofVirtual()
Thread vThread = Thread.ofVirtual().start(() -> {
    System.out.println("Virtual thread!");
});

// Cách 2: Thread.startVirtualThread()
Thread.startVirtualThread(() -> {
    System.out.println("Another virtual thread!");
});

// Cách 3: ExecutorService
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
executor.submit(() -> {
    System.out.println("Virtual thread from executor!");
});

3. So sánh Platform vs Virtual

Platform Threads:

long start = System.currentTimeMillis();

try (ExecutorService executor = Executors.newFixedThreadPool(100)) {
    for (int i = 0; i < 10000; i++) {
        executor.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start) + "ms");
// Time: ~100 seconds (10000 tasks / 100 threads)
// RAM: ~200MB

Virtual Threads:

long start = System.currentTimeMillis();

try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (int i = 0; i < 10000; i++) {
        executor.submit(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

long end = System.currentTimeMillis();
System.out.println("Time: " + (end - start) + "ms");
// Time: ~1 second (all run concurrently!)
// RAM: ~50MB

4. HTTP Server với Virtual Threads

Platform Threads:

public class PlatformThreadServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(8080);
        ExecutorService executor = Executors.newFixedThreadPool(100);
        
        while (true) {
            Socket client = server.accept();
            executor.submit(() -> handleClient(client));
        }
        
        // Limit: 100 concurrent connections
    }
}

Virtual Threads:

public class VirtualThreadServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(8080);
        
        while (true) {
            Socket client = server.accept();
            Thread.startVirtualThread(() -> handleClient(client));
        }
        
        // No limit! Can handle millions of connections
    }
}

5. Structured Concurrency (Java 21)

Problem với unstructured concurrency:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

Future<String> future1 = executor.submit(() -> fetchUser());
Future<String> future2 = executor.submit(() -> fetchOrders());

// Vấn đề:
// - Future1 fail → Future2 vẫn chạy
// - Phải manually cancel
// - Khó track lifecycle

Structured Concurrency:

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    Future<String> user = scope.fork(() -> fetchUser());
    Future<String> orders = scope.fork(() -> fetchOrders());
    
    scope.join();           // Đợi tất cả
    scope.throwIfFailed();  // Throw nếu có lỗi
    
    // Cả 2 thành công mới đến đây
    String userData = user.resultNow();
    String ordersData = orders.resultNow();
    
} // Auto cleanup khi exit scope

ShutdownOnSuccess:

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    scope.fork(() -> fetchFromCache());
    scope.fork(() -> fetchFromDB());
    scope.fork(() -> fetchFromAPI());
    
    scope.join();
    
    // Lấy kết quả từ cái đầu tiên thành công
    String result = scope.result();
}

6. Scoped Values (Java 21)

Problem với ThreadLocal:

// ThreadLocal không work tốt với Virtual Threads
ThreadLocal<String> userId = new ThreadLocal<>();

// Virtual threads rất nhiều → Memory overhead

Scoped Values:

import java.lang.ScopedValue;

public class UserContext {
    public static final ScopedValue<String> USER_ID = ScopedValue.newInstance();
    
    public static void main(String[] args) {
        ScopedValue.where(USER_ID, "user123")
            .run(() -> {
                // USER_ID available trong scope này
                processRequest();
            });
        
        // USER_ID không available ngoài scope
    }
    
    static void processRequest() {
        String userId = USER_ID.get();
        System.out.println("Processing for: " + userId);
    }
}

📊 Benchmark: Platform vs Virtual Threads

Test Case: 100,000 HTTP requests

Platform Threads (Pool 200):

Threads: 200 platform threads
Time: 500 seconds
RAM: 400MB
CPU: 60%
Throughput: 200 req/s

Virtual Threads:

Threads: 100,000 virtual threads
Time: 10 seconds
RAM: 150MB
CPU: 70%
Throughput: 10,000 req/s

Kết luận:

  • 🚀 50x faster
  • 💾 60% less RAM
  • 📈 50x higher throughput

🎨 Best Practices

1. Khi nào dùng Virtual Threads?

✅ Dùng Virtual Threads khi:

  • I/O-bound tasks (network, database, file)
  • Blocking operations
  • High concurrency (thousands/millions)
  • Web servers, microservices
  • Chat servers, real-time apps

❌ KHÔNG dùng Virtual Threads khi:

  • CPU-bound tasks (tính toán nặng)
  • synchronized blocks (pinning issue)
  • Native code (JNI)
  • Need fine-grained control

2. Avoid Pinning

Pinning = Virtual thread pin to platform thread

Bad:

synchronized (lock) {
    // Blocking I/O trong synchronized → Pinning!
    Thread.sleep(1000);
}

Good:

ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
    // Blocking I/O OK với ReentrantLock
    Thread.sleep(1000);
} finally {
    lock.unlock();
}

3. Migration từ Platform → Virtual

Before:

ExecutorService executor = Executors.newFixedThreadPool(100);

After:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

That’s it! 🎉

4. Structured Concurrency Pattern

public class UserService {
    public UserProfile getUserProfile(String userId) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            // Fork parallel tasks
            Future<User> userFuture = 
                scope.fork(() -> fetchUser(userId));
            Future<List<Order>> ordersFuture = 
                scope.fork(() -> fetchOrders(userId));
            Future<Address> addressFuture = 
                scope.fork(() -> fetchAddress(userId));
            
            // Wait for all
            scope.join();
            scope.throwIfFailed();
            
            // Combine results
            return new UserProfile(
                userFuture.resultNow(),
                ordersFuture.resultNow(),
                addressFuture.resultNow()
            );
        }
    }
}

🎓 Lessons Learned

From Experience

1. Virtual Threads không phải silver bullet

CPU-bound: Platform threads vẫn tốt hơn
GPU tasks: Cần native libraries

2. Monitor với JFR (Java Flight Recorder)

java -XX:StartFlightRecording=filename=recording.jfr MyApp

3. Test với realistic workload

// Không chỉ test với sleep()
// Test với real I/O: DB, HTTP, File

4. Upgrade dependencies

Một số libraries chưa optimize cho Virtual Threads
Check compatibility!

🚀 Real-world Example: Chat Server

Platform Threads Version:

public class PlatformChatServer {
    private static ExecutorService executor = 
        Executors.newFixedThreadPool(100);
    
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(5000);
        
        while (true) {
            Socket client = server.accept();
            executor.submit(() -> handleClient(client));
        }
        
        // Giới hạn: 100 concurrent clients
    }
}

Virtual Threads Version:

public class VirtualChatServer {
    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(5000);
        
        while (true) {
            Socket client = server.accept();
            Thread.startVirtualThread(() -> handleClient(client));
        }
        
        // Không giới hạn! Có thể handle millions!
    }
    
    private static void handleClient(Socket socket) {
        try (
            BufferedReader in = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(
                socket.getOutputStream(), true)
        ) {
            String message;
            while ((message = in.readLine()) != null) {
                // Blocking I/O - No problem với Virtual Threads
                broadcast(message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

🎯 Kết luận

Java Concurrency đã tiến hóa rất nhiều:

2004: Java 5

  • ExecutorService
  • Concurrent collections
  • Locks framework

2011: Java 7

  • Fork/Join framework
  • Parallel streams

2014: Java 8

  • CompletableFuture
  • Parallel streams

2023: Java 21 🎉

  • Virtual Threads
  • Structured Concurrency
  • Scoped Values

Key Takeaways:

  1. Platform threads cho CPU-bound
  2. Virtual threads cho I/O-bound
  3. ExecutorService vẫn relevant
  4. CompletableFuture cho async programming
  5. Structured Concurrency cho code rõ ràng hơn

Tương lai:

  • Virtual threads sẽ trở thành default
  • Frameworks (Spring, Quarkus) đang adopt
  • Java tiếp tục evolve

Resources recommend:

📚 Books:

  • Java Concurrency in Practice (Brian Goetz)
  • Effective Java (Joshua Bloch)

🌐 Reading:

  • JEP 444: Virtual Threads
  • Project Loom documentation

🛠️ Practice:

  • Migrate existing apps to Virtual Threads
  • Build high-throughput server
  • Benchmark Platform vs Virtual

Cảm ơn đã đọc series về Java Networking và Concurrency! Hi vọng các bạn học được nhiều điều bổ ích! 🚀

#Java #Concurrency #VirtualThreads #Java21 #Multithreading