JAVA并发编程揭开篇章,并发编程基本认识,了解多线程意义和使用

多线程(multithreading),是指从软件或者硬件上实现多个线程并发执行的技术。具有多线程能力的计算机因有硬件支持而能够在同一时间执行多于一个线程,进而提升整体处理性能。具有这种能力的系统包括对称多处理机、多核心处理器以及芯片级多处理(Chip-level multithreading)或同时多线程(Simultaneous multithreading)处理器。

一、什么是线程

线程(thread)是操作系统能够进行运算调度的最小单位。大部分情况下,它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。在Unix System V及SunOS中也被称为轻量进程(lightweight processes),但轻量进程更多指内核线程(kernel thread),而把用户线程(user thread)称为线程。

线程是独立调度和分派的基本单位。线程可以为操作系统内核调度的内核线程,如Win32线程;由用户进程自行调度的用户线程,如Linux平台的POSIX Thread;或者由内核与用户进程,如Windows 10的线程,进行混合调度。

同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。

一个进程可以有很多线程,每条线程并行执行不同的任务。


二、什么是并发

并发,在操作系统中,是指一个时间段中有几个程序都处于已启动运行到运行完毕之间,且这几个程序都是在同一个处理机上运行,但任一个时刻点上只有一个程序在处理机上运行。

举个例子,简单来说并发是指单位时间内能够同时处理的请求数。默认情况下Tomcat可以支持的最大请求数是150,也就是同时支持150个并发。当超过这个并发数的时候,就会开始导致响应延迟,连接丢失等问题。

并发与并行

并行是指两个或者多个事件在同一时刻发生;

并发是指两个或多个事件在同一时间间隔内发生,这个词可以冲宏观和微观两个层面来讲,如果从微观角度来看。以线程为例,假设当前电脑的cpu是单核,但是能不能支持多线程呢?当然也是能的,此时如果是多线程运行的话,那么CPU是通过不断分配时间片的方式来实现线程切换,由于切换的速度足够快,我们很难感知到卡顿的过程。

三、Java中的线程

3.1 Runnable 接口

1
2
3
4
5
public class MyThread extends OtherClass implements Runnable {
public void run() {
System.out.println("MyThread.run()");
}
}

3.2 Thread 类

1
2
3
4
5
6
7
8
9
public class MyThread extends Thread {
public void run() {
System.out.println("MyThread.run()");
}
}
MyThread myThread1 = new MyThread();
MyThread myThread2 = new MyThread();
myThread1.start();
myThread2.start();

3.3 Callable/Future 带返回值的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class CallableDemo implements Callable<String> {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool (1);
CallableDemo callableDemo = new CallableDemo();
Future<String> future = executorService.submit(callableDemo);
System.out.println(future.get());
executorService.shutdown();
}
@Override
public String call() throws Exception {
int a = 1;
int b = 2;
System.out.println(a + b);
return "执行结果:" + (a + b);
}
}

四、 多线程的应用场景

  • 网络请求分发的场景
  • 文件导入
  • 短信发送场景

五、 Java并发编程基础

5.1 线程的生命周期

Java线程一共有 6 种状态(NEW、RUNNABLE、BLOCKED、WAITING、TIME_WAITING、TERMINATED

  • NEW:初始状态,线程被构建,但是还没有调用 start()方法;
  • RUNNABLE:运行状态,JAVA线程把操作系统中的就绪和运行两种状态统一称为“运行中”
  • BLOCK:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况:

➢ 等待阻塞:运行的线程执行 wait 方法,jvm 会把当前线程放入到等待队列

➢ 同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程放入到锁池中

➢ 其他阻塞:运行的线程执行 Thread.sleep 或者 t.join 方法,或者发出了 I/O 请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、io 处理完毕则线程恢复

  • WAITING:正在无限期等待另一个线程执行状态,需要唤醒
  • TIME_WAITING:超时等待状态,超时以后自动返回
  • TERMINATED:终止状态,表示当前线程执行完毕

5.2 线程的启动

启动线程的两种方式:

  • new Thread().start();//启动一个线程
  • Thread thread = new Thread(); thread.run();//调用实例中的方法

启动线程是调用start()方法,而不是run()方法,我们来看下Thread类中的start()方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

public class Thread implements Runnable {
/* Make sure registerNatives is the first thing <clinit> does. */
private static native void registerNatives();
static {
registerNatives();//start0()方法是在此方法中注册的
}


public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();

/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);

boolean started = false;
try {
start0();//实体调用的是这个方法,它是native的
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}

private native void start0();//调用C++中的start0()


}

start0()方法注册在registerNatives()中,registerNatives的本地方法定义在Thread.c中,Thread.c定义了各个操作系统平台要用的关于线程的公共数据和操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};

#undef THD
#undef OBJ
#undef STE

JNIEXPORT void JNICALL
Java_java_lang_Thread_registerNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}

start0()实际上在C++中真正的执行的是JVM_StartThread方法,这个方法是在JVM层面执行的方法,这样需要下载hotspot的源码才能找到答案,我们接着找。在jvm.cpp文件中找到如下代码:

1
2
3
JVM_ENTRY(void, JVM_StartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
JavaThread *native_thread = NULL;

再在thread.cppJavaThread相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
JavaThread::JavaThread(ThreadFunction entry_point, size_t stack_sz) :
Thread()
#if INCLUDE_ALL_GCS
, _satb_mark_queue(&_satb_mark_queue_set),
_dirty_card_queue(&_dirty_card_queue_set)
#endif // INCLUDE_ALL_GCS
{
if (TraceThreadEvents) {
tty->print_cr("creating thread %p", this);
}
initialize();
_jni_attach_state = _not_attaching_via_jni;
set_entry_point(entry_point);
// Create the native thread itself.
// %note runtime_23
os::ThreadType thr_type = os::java_thread;
thr_type = entry_point == &compiler_thread_entry ? os::compiler_thread :
os::java_thread;
os::create_thread(this, thr_type, stack_sz);
_safepoint_visible = false;
// The _osthread may be NULL here because we ran out of memory (too many threads active).
// We need to throw and OutOfMemoryError - however we cannot do this here because the caller
// may hold a lock and all locks must be unlocked before throwing the exception (throwing
// the exception consists of creating the exception object & initializing it, initialization
// will leave the VM via a JavaCall and then all locks must be unlocked).
//
// The thread is still suspended when we reach here. Thread must be explicit started
// by creator! Furthermore, the thread must also explicitly be added to the Threads list
// by calling Threads:add. The reason why this is not done here, is because the thread
// object must be fully initialized (take a look at JVM_Start)
}

os::create_thread 就是调用系统创建线程的方法来创建java线程。创建完线程之后就来启动线程。启动线程调用Thread.cppThread::start(Thread* thread) 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void Thread::start(Thread* thread) {
trace("start", thread);
// Start is different from resume in that its safety is guaranteed by context or
// being called from a Java method synchronized on the Thread object.
if (!DisableStartThread) {
if (thread->is_Java_thread()) {
// Initialize the thread state to RUNNABLE before starting this thread.
// Can not set it after the thread started because we do not know the
// exact thread state at that time. It could be in MONITOR_WAIT or
// in SLEEPING or some other state.
java_lang_Thread::set_thread_status(((JavaThread*)thread)->threadObj(),
java_lang_Thread::RUNNABLE);
}
os::start_thread(thread);
}
}

调用平台启动线程的方法,最终会调用Thread.cppJavaThread::run() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// The first routine called by a new Java thread
void JavaThread::run() {
// initialize thread-local alloc buffer related fields
this->initialize_tlab();

// used to test validitity of stack trace backs
this->record_base_of_stack_pointer();

// Record real stack base and size.
this->record_stack_base_and_size();

// Initialize thread local storage; set before calling MutexLocker
this->initialize_thread_local_storage();

this->create_stack_guard_pages();

this->cache_global_variables();

// Thread is now sufficient initialized to be handled by the safepoint code as being
// in the VM. Change thread state from _thread_new to _thread_in_vm
ThreadStateTransition::transition_and_fence(this, _thread_new, _thread_in_vm);

assert(JavaThread::current() == this, "sanity check");
assert(!Thread::current()->owns_locks(), "sanity check");

DTRACE_THREAD_PROBE(start, this);

// This operation might block. We call that after all safepoint checks for a new thread has
// been completed.
this->set_active_handles(JNIHandleBlock::allocate_block());

if (JvmtiExport::should_post_thread_life()) {
JvmtiExport::post_thread_start(this);
}

EventThreadStart event;
if (event.should_commit()) {
event.set_javalangthread(java_lang_Thread::thread_id(this->threadObj()));
event.commit();
}

// We call another function to do the rest so we are sure that the stack addresses used
// from there will be lower than the stack base just computed
thread_main_inner();

// Note, thread is no longer valid at this point!
}

最后来一张图总结一下Java线程的启动:

5.3 线程的终止

对于线程的终止并不是调用stop()方法的,在线程中提供了interrput()方法去优雅的中断一个线程。

下面通过一个例子来说明线程终止,调用interrupted()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class InterruptDemo {

private static int i;

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
//默认情况下isInterrupted 返回 false、通过 thread.interrupt 变成了 true
while(!Thread.currentThread().isInterrupted()){
i ++;
}
System.out.println("Num:"+ i);
},"interruptDemo");

thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}

通过 interrupt()方法,设置了一个标识告诉线程可以终止了 ,线程中还提供了静态方法Thread.interrupted()对设置中断标识的线程复位

5.3.1 线程复位

我们来改造上面示例中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class InterruptDemo {

private static int i;

public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(()->{
while(true) {
if(Thread.currentThread().isInterrupted()) {
System.out.println("before" + Thread.currentThread().isInterrupted());
}
Thread.interrupted();//对线程进行复位,由true变为false
System.out.println("after" + Thread.currentThread().isInterrupted());
}
},"interruptDemo");

thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}

}

除了使用Thread.interrupted()的方法对线程中断标识进行复位之外,还有一种被动的复位场景,就是抛出InterruptedException异常的方法,在InterruptedException 抛出之前,JVM 会先把线程的中断标识位清除,然后才会抛出 InterruptedException,这个时候如果调用 isInterrupted 方法,将会返回 false

5.3.2 为什么要进行复位

Thread.interrupted()是属于当前线程的,是当前线程对外界中断信号的一个响应,表示自己已经得到了中断信号,但不会立刻中断自己,具体什么时候中断由自己决定,让外界知道在自身中断前,他的中断状态仍然是 false,这就是复位的原因

5.3.3 线程的终止原理

终止线程是调用interrupt()方法,我们来看下Thread类中的interrupt()方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

最终调用一个nativeinterrupt0()方法,和start0()方法一样,找到jvm.cpp中的JVM_Interrupt的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
JVM_ENTRY(void, JVM_Interrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");

// Ensure that the C++ Thread and OSThread structures aren't freed before we operate
oop java_thread = JNIHandles::resolve_non_null(jthread);
MutexLockerEx ml(thread->threadObj() == java_thread ? NULL : Threads_lock);
// We need to re-resolve the java_thread, since a GC might have happened during the
// acquire of the lock
JavaThread* thr = java_lang_Thread::thread(JNIHandles::resolve_non_null(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
JVM_END

thread.cpp中的Thread::interrupt()方法源码:

1
2
3
4
5
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
os::interrupt(thread);
}

Thread::interrupt() 方法调用了 os::interrupt() 方法,这个是调用平台的 interrupt 方法,这个方法的实现是在 os_*.cpp
文件中,我们以 os_linux.cpp 文件为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
//获取本地线程对象
OSThread* osthread = thread->osthread();
//判断本地线程是否为中断
if (!osthread->interrupted()) {
//设置中断状态为true
osthread->set_interrupted(true);
// More than one thread can get here with the same value of osthread,
// resulting in multiple notifications. We do, however, want the store
// to interrupted() to be visible to other threads before we execute unpark().
//内存屏障的目的是使得interrupted状态对其他线程立即可见
OrderAccess::fence();
//_SleepEvent相当于Thread.sleep,表示如果线程调用了sleep方法,则通过unpark唤醒
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}

// For JSR166. Unpark even if interrupt status already was set
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
//_ParkEvent用于synchronized同步块和Object.wait(),这里相当于也是通过unpark进行唤醒
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;

}

set_interrupted(true) 实际上就是调用osThread.hpp中的set_interrupted()方法,在 osThread 中定义了一个成员属性 volatile jint _interrupted

1
2
3
volatile jint _interrupted;     // Thread.isInterrupted state

void set_interrupted(bool z) { _interrupted = z ? 1 : 0; }

thread.interrupt()方法实际就是设置一个 interrupted 状态标识为 true、并且通过ParkEventunpark 方法来唤醒线程。

  • 对于 synchronized阻塞的线程,被唤醒以后会继续尝试获取锁,如果失败仍然可能被 park
  • 在调用 ParkEventpark方法之前,会先判断线程的中断状态,如果为 true,会清除当前线程的中断标识
  • Object.wait 、 Thread.sleep、Thread.join会抛出InterruptedException,不难发现这些方法都是阻塞的。阻塞方法的释放会取决于一些外部的事件,所以
    它允许一个线程请求自己来停止它正在做的事情。当一个方法抛出InterruptedException 时,它是在告诉调用者如果执行该方法的线程被中断,它会尝试停止正在做的事情并且通过抛出 InterruptedException 表示提前返回。

InterruptedException这个异常的意思是表示一个阻塞被其他线程中断了。然后,由于线程调用了 interrupt()中断方法,那么Object.wait**、Thread.sleep** 等被阻塞的线程被唤醒以后会通过is_interrupted方法判断中断标识的状态变化,如果发现中断标识为true,则先清除中断标识,然后抛出InterruptedException需要注意的是,InterruptedException异常的抛出并不意味着线程必须终止,而是提醒当前线程有中断的操作发生,至于接下来怎么处理取决于线程本身,比如:

  • 直接捕获异常不做任何处理
  • 将异常往外抛出
  • 停止当前线程,并打印异常信息

为了让大家能够更好的理解上面这段话,我们以Thread.sleep为例直接从jdk的源码中找到中断标识的清除以及异常抛出的方法代码找到is_interrupted() 方法:

1
public static native void sleep(long millis) throws InterruptedException;

jvm.cpp中的JVM_Sleep的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");

if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//判断并清除线程中断状态,如果中断状态为true,则抛出中断异常
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}

// Save current thread state and restore it at the end of this block.
// And set new thread state to SLEEPING.
JavaThreadSleepState jtss(thread);

os_linux.cpp中的is_interrupted()方法源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool os::is_interrupted(Thread* thread, bool clear_interrupted) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");

OSThread* osthread = thread->osthread();
//获取线程中断标识
bool interrupted = osthread->interrupted();
//如果中断标识为true
if (interrupted && clear_interrupted) {
//设置中断标识为false
osthread->set_interrupted(false);
// consider thread->_SleepEvent->reset() ... optional optimization
}

return interrupted;
}

至此,我们就已经分析清楚了中断的整个流程。最后还是来画图总结一下吧。

5.3.4 interrupt()的作用

  • 设置一个共享变量的值 true
  • 唤醒处于阻塞状态下的线程