终止阻塞的线程

线程状态

我们知道,一个线程可以处于以下四种状态之一:

1. 新建(New):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必须的系统资源,并执行了初始化。此刻线程已经有资格获取CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。

2. 就绪(Runnable):在这种状态下,只要调度器将CPU时间片分给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。

3. 阻塞(Blocked):线程能够运行,但有某个或多个条件阻止它运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间片。直到线程重新进入了就绪状态,它才有可能执行操作。

4. 死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以不被中断。

进入线程状态

而一个任务进入阻塞状态,可能由以下原因造成:

1. 通过调用sleep(milliseconds)方法使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。

2. 通过调用wait()方法使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在JavaSE5的java.util.concurrent类库中等价的signal()活signalAll()消息),线程才会进入就绪状态。

3. 任务在等待某个I/O操作完成。

4. 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

在较早的代码中,也可能会看到用suspend()和resume()方法来阻塞和唤醒线程,但是在Java新版本中这些方法被废弃了,因为它们可能导致死锁。stop()方法也已经被废弃了,因为它不释放线程获得的锁,并且如果线程处于不一致的状态,其他任务可以在这种状态下浏览并修改它们。

现在我们需要查看的问题是:有事你希望能够终止处于阻塞状态的任务。如果对于阻塞装填的任务,你不能等待其到达代码中可以检查其状态值的某一点,因而决定让它主动终止,那么你就必须强制这个任务跳出阻塞状态。

中断

正如你所想象的,在Runnable.run()方法的中间打断它,与到达程序员准备好离开该方法的其他一些地方相比,要复杂得多。因为当你打断被阻塞的任务时,可能需要清理资源。正因为这一点,在任务的run()方法中间打断,更像是抛出的异常,因此在Java线程中的这种类型的异常中断中用到了异常。为了在以这种方式终止任务时返回良好的状态,你必须仔细考虑代码的执行路径,并仔细编写catch字句以便正确的清楚所有事物。

Thread类包含了interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。当抛出该异常或者该任务调用Thread.interrupted()时,中断状态将被复位。正如你将看到的,Thread.interrupted()提供了离开run()循环而不抛出异常的第二种方式。

为了调用interrupt(),你必须持有Thread对象。你可能已经注意到了,新的concurrent类库似乎在避免对Thread对象上的直接操作,转而尽量的通过Executor来执行所有操作。如果你在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的线程。这么做是有意义的,因为当你完成工程中的某个部分或者整个程序时,通常会希望同时关闭某个特定Executor的所有任务。然而,你有时也会希望只中断某个单一任务。如果使用Executor,那么通过调用submit()方法而不是execute()方法来启动任务,就可以持有该任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,因为你永远都不会在其上调用get()——持有这种Future的关键在于你可以在其上调用cancel(),并因此可以使用它来中断某个特定任务。如果你将true传递给cancel(),那么它就会拥有在该线程上调用interrupt()以停止这个线程的能力。因此,cancel是一种中断由Executor启动的单个线程的方式。

下面的示例使用Executor展示了基本的interrupt()用法:

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class SleepBlocked implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;
    public IOBlocked(InputStream is) {
        in = is;
    }
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from Blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) {
            //永不释放获得的锁
            Thread.yield();
        }
    }
    public SynchronizedBlocked() {
        //在构造的时候就获取该对象的锁
        new Thread(){
            @Override
            public void run() {
                f();
            }
        };
    }

    @Override
    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException {
        Future<?> future = exec.submit(r);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Interrupting " + r.getClass().getName());
        future.cancel(true);//如果在运行的话,中断该线程。
        System.out.println("Interrupting sent to " + r.getClass().getName());
    }

    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0);");
        //强行停止退出
        System.exit(0);
    }
}

执行结果:

Interrupting SleepBlocked
Interrupting sent to SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Waiting for read():
Interrupting IOBlocked
Interrupting sent to IOBlocked
Trying to call f()
Interrupting SynchronizedBlocked
Interrupting sent to SynchronizedBlocked
Aborting with System.exit(0);

上面的每个任务都表示了一种不同类型的阻塞。SleepBlock是可中断的阻塞示例,而IOBlocked和SynchronizedBlocked是不可中断的阻塞示例。这个程序证明I/O和在synchronized块上的等待是不可中断的,但是通过浏览代码,你也可以预见到这一点——无论是I/O还是尝试调用synchronized方法,都不需要任何InterruptedException处理器。

两个雷很简单直观:在第一个类中run()方法调用了sleep(),在第二个类中调用了read()。但是为了掩饰SynchronizedBlock,我们必须首先获得锁。这是通过在构造器中创建匿名的Thread类的实例来实现的,这个匿名Thread类的对象通过调用f()获得了对象锁(这个线程必须有别于为启动SynchronizedBlock.run()的线程,因为同一个线程可以多次获得某个对象锁,你将在稍后看见)。由于f()永远都不反回,因此这个锁永远不会释放,而SynchronizedBlock.run()在试图调用f(),并阻塞以等待这个锁被释放。

从输出中可以看到,你能够中断对sleep()的调用(或者任何要求抛出InterruptedException的调用)。但是你不能中断正在试图获取synchronized锁或者正在试图执行I/O操作的线程。这有点令人烦恼,特别是在创建执行I/O任务时,因为这意味着I/O具有锁住你的多线程程序的潜在可能。特别是对于急于Web的程序,这更是关乎厉害。

对于这类问题,有一个略显笨拙但是确实行之有效的解决方案,那就是关闭任务在其上发生阻塞的资源

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InputStream stream = new Socket("localhost", 8080).getInputStream();
        service.execute(new IOBlocked(stream));
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Shutting down all threads");
        service.shutdownNow();//尝试停止所有正在执行的任务
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + stream.getClass().getName());
        stream.close();//通过关闭线程操作的资源来释放阻塞的线程
    }
}

执行结果:

Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from Blocked I/O
Exiting IOBlocked.run()

在shutdownNow()被调用之后以及在输入流上调用close()之前的延迟强调的是一旦底层资源被关闭,任务将解除阻塞。

幸运的是,各种NIO类提供了更人性化的I/O中断。被阻塞的nio通道会自动地响应中断

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class NIOBlocked implements Runnable {
    private final SocketChannel channel;

    public NIOBlocked(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            channel.read(ByteBuffer.allocate(1));//阻塞当前任务
        } catch (ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException");
        } catch (AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        Future<?> f = service.submit(new NIOBlocked(sc1));
        service.execute(new NIOBlocked(sc2));
        //尝试关闭任务,但由于任务处于阻塞状态,关闭不了。
        service.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // 通过在channel1上调用cancel来产生中断
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // 释放channel2
        sc2.close();
    }
}

执行结果:

Waiting for read() in NIOBlocked@18b8914
Waiting for read() in NIOBlocked@1d49247
ClosedByInterruptException
Exiting NIOBlocked.run() NIOBlocked@18b8914
AsynchronousCloseException
Exiting NIOBlocked.run() NIOBlocked@1d49247

如你所见,你还可以关闭底层资源以释放锁,尽管这种做法一般不是必须的。注意,使用execute()来启动两个任务,并调用service.shutdownNow()将可以很容易的终止所有事物,而对于捕获上面示例中的Future,只有在将中断发送给一个线程,同时不发送给另一个线程时才是必须的。



相关文章

发表评论

Comment form

(*) 表示必填项

还没有评论。

跳到底部
返回顶部