0%

Topic是Kafka的基本组织单位。

Kafka的消息是以Topic作为基本单位来组织的,而多个Topic之间又是相互独立的。

形象的说,可以将Kafka当成一个队列集合,每一个Topic相当于一个队列

每个Topic又分为多个Partition,也就是分区,分别存储了一部分的消息,也起到了负载均衡的作用。

创建Topic的时候,可以指定partition的个数,最终以文件形式存储在磁盘上。

1
./kafka-topics.sh --create --partitions 3 --topic test_p_3 --zookeeper localhost:2181 --replication-factor 1

kafka-tps.png

每个partition以Topic名称-Partition序号来命名,底下又分为多个Segment,也就是分段

生产消息的时候,需要指定Topic,而Topic根据Partition路由算法,路由到具体Partition。

kafka-p-j

同一个Topic,每一个partition各自处理各自的消息。

ukafka-partition.png

每一个segment包含:日志(用来存储消息数据)和索引(用来记录offset相关映射信息)

通过分段,可以减少文件的大小以及通过稀疏索引进一步实现数据的快速查找。

可以通过配置log.dirs的值来修改数据存储的位置,默认是在/tmp/kafka-logs目录下。

kafka-logs

以相同基准offset命名的日志、索引文件,称为段。

如果一个partition只有一个数据文件的话:

  • 对于写入
    由于是顺序追加写入,时间复杂度为O(1)

  • 对于查找
    要查找某个offset,采用的是顺序查找,时间复杂度达到了O(n)

通过对数据进行分段、加索引,解决了痛点。

在Kafka中,索引文件分为:

name type suffix
偏移量索引 OffsetIndex .index
时间戳索引 TimeIndex .timeindex
事务索引 TransactionIndex .txnindex

日志文件分为:

name suffix desc
数据文件 .log 用于存储消息
交换文件 .swap 用于segment的恢复
延迟待删文件 .deleted 用于标识要删除的文件
快照文件 .snapshot 用于记录producer的事务信息
清理临时文件 .cleaned 用于标识正在删除的文件

分段

假设有100条消息,offset取值范围在:0 - 99,如果分为4个段,那么每一个段各存储25条消息,取值范围分别为:0 - 24、25 - 49、 50 - 74、75 - 99,而每一段的文件以20位起始offset来命名,不足20位,用0补齐,要查找某个offset只需要找到对应范围内的文件,然后再通过二分查找算法去文件查找即可。

1
2
# 通过kafka.tools.DumpLogSegments可以查看log文件的内容
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test_p_3-0/00000000000000000000.log

kafka-sg

索引

通过分段可以减少数据文件大小,并且加快查询速度,但仍然存在顺序查找的最差情况。

为了进一步提高数据查找的速度,Kafka通过建立稀疏索引的方式,每隔一定间隔建立一条索引。

1
2
# 通过kafka.tools.DumpLogSegments可以查看index文件的内容
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test_p_3-0/00000000000000000000.index

kafka-index

偏移量索引,记录着 < offset , position > 偏移量 - 物理地址

时间戳索引,记录着 < timestamp , offset >时间戳 - 偏移量

事务索引,记录着 < offset , AbortedTxn > 偏移量 - 被中断事务

消息查找:offset、timestamp

在Kafka中,segment通过一个跳跃表来维护。

1
2
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

每一次,只有一个activeSegment可以进行写入消息,其它的segment是只读的。

1
2
3
4
/**
* The active segment that is currently taking appends
*/
def activeSegment = segments.lastEntry.getValue

基于偏移量查找

OffsetIndex的数据格式为:

1
2
3
offset: 33 position 4158
offset: 66 position 8316
offset: 99 position 12474

在Kafka的IndexEntry.scala中定义为:

1
2
3
4
5
6
7
8
9
/**
* The mapping between a logical log offset and the physical position
* in some log file of the beginning of the message set entry with the
* given offset.
*/
case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
override def indexKey = offset
override def indexValue = position.toLong
}

如果要查找offset=35的消息,那么就先从segments跳跃表上找到对应应访问的segment文件。

然后通过二分查找在偏移量索引文件找到不大于35的offset,也就是offset: 33 position 4158

解析offset对应的position,从position处顺序查找log文件,找到offset=35的位置。

基于时间戳查找

TimeIndex的数据格式为:

1
2
3
timestamp: 1579167998000 offset 33
timestamp: 1579168197621 offset 66
timestamp: 1579168397242 offset 99

在Kafka的IndexEntry.scala中定义为:

1
2
3
4
5
6
7
8
9
10
/**
* The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater
* than that timestamp must be at or after that offset.
* @param timestamp The max timestamp before the given offset.
* @param offset The message offset.
*/
case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
override def indexKey = timestamp
override def indexValue = offset
}

如果要查找timestamp=15791679981234的消息,那么就先比较每个segment文件的largestTimestamp,找到对应的segment文件。

然后通过二分查找在时间戳索引文件找到不大于15791679981234timestamp,也就是timestamp: 1579167998000 offset 33

找到offset直接去偏移量索引文件中查找position。

从position处顺序查找log文件,找到timestamp=15791679981234的位置。

通常,应用程序的启动都是基于main函数的,Kafka也不例外。

启动Kafka的时候,通过执行kafka-server-start.sh文件,并且指定配置文件即可。

1
./kafka-server-start.sh ../config/server.properties

kafka-server-start.sh中核心的内容在于启动kafka.Kafka类中的main函数。

1
2
#...省略
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "[email protected]"

主要流程:解析配置文件、创建KafkaServerStartable实例、启动服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main(args: Array[String]): Unit = {
try {
     //解析配置文件
val serverProps = getPropsFromArgs(args)
     //从配置文件创建实例
val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
//...
//优雅退出
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
kafkaServerStartable.startup()//启动应用
kafkaServerStartable.awaitShutdown()//阻塞等待关闭
}
catch {
case e: Throwable =>
fatal("Exiting Kafka due to fatal exception", e)
Exit.exit(1)
}
Exit.exit(0)
}

1. 解析配置文件
读取指定的server.properties文件,进行一系列的解析验证,支持通过命令行传入参数,覆盖配置文件中配置的属性,最后返回Properties实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def getPropsFromArgs(args: Array[String]): Properties = {
val optionParser = new OptionParser(false)
val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file")
.withRequiredArg()
.ofType(classOf[String])

if (args.length == 0) {
CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName()))
}

val props = Utils.loadProps(args(0)) //加载server.properties

if (args.length > 1) {
val options = optionParser.parse(args.slice(1, args.length): _*)
     //命令行参数个数校验
if (options.nonOptionArguments().size() > 0) {
CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(","))
}
//将命令行配置的属性存入props
props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
}
props
}

2. 创建KafkaServerStartable实例
KafkaServerStartable主要的作用是启动Kafka的Metrics监控以及维护Kafka服务的生命周期。
包括:启动、关闭、闭锁阻塞等待关闭、设置Kafka服务状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
object KafkaServerStartable {
def fromProps(serverProps: Properties) = {
 //启动监控
 val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters)
}
}
class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
 //初始化真正的Kafka服务对象
 private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
 //启动
 def startup() {
try server.startup()
catch {
case _: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka.")
Exit.exit(1)
}
}
 //关闭
 def shutdown() {
try server.shutdown()
catch {
case _: Throwable =>
fatal("Halting Kafka.")
// Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit.
Exit.halt(1)
}
}
 //设置状态,包含:NotRunning、Starting、RecoveringFromUncleanShutdown、RunningAsBroker、PendingControlledShutdown、BrokerShuttingDown
 def setServerState(newState: Byte) {
server.brokerState.newState(newState)
}
 //阻塞进程避免退出,通过CountDownLatch实现
 def awaitShutdown(): Unit = server.awaitShutdown()
}

3. 启动服务
启动服务是通过间接调用KafkaServerstartup()方法来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def startup() {
try {
info("starting")
     //是否已关闭
     if (isShuttingDown.get)
throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")
     //是否已启动
     if (startupComplete.get)
return
     //是否可以启动
     val canStartup = isStartingUp.compareAndSet(false, true)
     if (canStartup) {//设置broker状态为Starting
       brokerState.newState(Starting)
       //连接ZK,并创建根节点
       initZkClient(time)
       //从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64
       _clusterId = getOrGenerateClusterId(zkClient)
info(s"Cluster ID = $clusterId")
       //获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id
       //规则:/brokers/seqid的version值 + maxReservedBrokerId(默认1000),保证唯一性
       val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
config.brokerId = brokerId
logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
       //配置logger
       this.logIdent = logContext.logPrefix
       //初始化AdminZkClient,支持动态修改配置
config.dynamicConfig.initialize(zkClient)
//初始化定时任务调度器
       kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()

       //创建及配置监控,默认使用JMX及Yammer Metrics
       val reporters = new util.ArrayList[MetricsReporter]
reporters.add(new JmxReporter(jmxPrefix))
val metricConfig = KafkaServer.metricConfig(config)
metrics = new Metrics(metricConfig, reporters, time, true)
_brokerTopicStats = new BrokerTopicStats
       //初始化配额管理器
       quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)
       //用于保证kafka-log数据目录的存在
       logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

       //启动日志管理器,kafka的消息以日志形式存储
       logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
       //启动日志清理、刷新、校验、恢复等的定时线程
       logManager.startup()

metadataCache = new MetadataCache(config.brokerId)
       // SCRAM认证方式的token缓存
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

       //启动socket,监听9092端口,等待接收客户端请求
socketServer = new SocketServer(config, metrics, time, credentialProvider)
socketServer.startup(startupProcessors = false)

       //启动副本管理器,高可用相关
       replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()

       //将broker信息注册到ZK上
       val brokerInfo = createBrokerInfo
zkClient.registerBrokerInZk(brokerInfo)

       //校验broker信息
       checkpointBrokerId(config.brokerId)

       //启动token管理器
       tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
tokenManager.startup()

       //启动Kafka控制器,只有leader会与ZK建连
       kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
kafkaController.startup()
       
//admin管理器
       adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

       //启动集群群组协调器
       groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
groupCoordinator.startup()

       //启动事务协调器
       transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
transactionCoordinator.startup()

       //ACL
       authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
authZ.configure(config.originals())
authZ
}
       //创建拉取管理器
       val fetchManager = new FetchManager(Time.SYSTEM,
new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

       //初始化KafkaApis,负责核心的请求逻辑处理
       apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
       //请求处理池
       requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)

Mx4jLoader.maybeLoad()

config.dynamicConfig.addReconfigurables(this)

       //启动动态配置处理器
       dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()

       //启动请求处理线程
       socketServer.startProcessors()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
isStartingUp.set(false)
AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
info("started")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
isStartingUp.set(false)
shutdown()
throw e
}
}

Kafka的背景

kafka是一个分布式高性能高可用可水平拓展发布-订阅式消息队列,更是一个流式处理系统。
对于消息,提供了O(1)时间复杂度的持久化能力,具备高吞吐率,同时支持实时、离线数据处理。

Kafka的架构

KafkaKafka包含了:

  • Broker: Kafka运行所在的服务器。
  • Topic: Kafka生产、消费数据都是需要指定一个Topic的,相当于一个队列。
  • Partition: 一个Topic可划分多个Partition,多机部署,可定义partition路由算法。
  • Segment: 一个Partition被切分为多个Segment,每个Segment包含索引文件和数据文件。
  • Producer: 生产者只需要指定Topic,往里头写数据即可。
  • Consumer: 一个Consumer Group包含多个Consumer,一条消息只被同组中的一个消费。
  • Zookeeper: Zookeeper用来管理Kafka集群。

对于服务器来说,Broker只是一个进程。

Topic则是服务器上的目录,存放在log.dirs指定的路径下,默认是/tmp/kafka-logs,支持路径,逗号分隔,可将数据分散到多个磁盘中,使Kafka吞吐率线性提高。通过指定--partitions数值,可创建多个Partition,命名为TopicName-K,从0开始。

通过指定--replication-factor副本因子数值,可将这些Partition分散、备份在一个或多个可用的Broker中,前提是可用的Broker数要大于等于replication-factor的值,既可数据备份、又可实现高可用、分散负载,提高吞吐量。

每个Partiton目录下存储的是一段段的Segment,包含了索引文件和数据文件,以offset命名。

Kafka采用推拉结合模型

生产消息方面,采用主动推送消息模式,在客户端会累积、压缩、批量发送到Broker。

消费消息方面,采用主动拉取消息模式,由客户端轮询拉取消息,并按照策略提交offset

另外,通过消费者拉消息的方式,可以由消费者自行控制消费的频率。

kafka-ps

此种方式,有利于减轻Broker压力,不需要维护太多状态,可由客户端自定义从哪个offset开始拉取消息。

kafka-offset

内存是操作系统的核心,程序指令只有被加载到内存中才可以被CPU调度执行。

内存,是由字节队列组成,每个字或字节队列都有它自己的地址
CPU,根据程序计数器的值从内存中取指令,可能是从指定内存地址读数据或将数据存入指定内存地址。

系统总线

CPU与内存之间的数据交互通过系统总线来传输。
通常来说,程序是以二进制的形式存储在硬盘上。只有将程序加载到内存中,并构造成进程的形式,才可以被真正的使用。在进程的运行过程中,CPU从内存中获取指令和数据,并服务于进程,而在进程使用完毕,终止之后,将会释放所占用的内存资源。

CPU产生的地址称为:逻辑地址,内存单元的地址称为:物理地址。

逻辑地址,又称为:虚拟地址,程序所产生的所有逻辑地址形成了逻辑地址空间
逻辑地址空间所对应物理地址形成了物理地址空间
有一种物理硬件设备负责将逻辑地址物理地址进行映射,这种设备称为:内存管理单元MMU。
MMU内存管理的核心:将逻辑地址空间绑定到物理地址空间。

内存分配分为:连续内存分配、非连续内存分配。

连续内存分配,包含有:多分区分配方法固定分区机制

  • 多分区分配方法
    当一个区被释放时,就从输入队列中取出一个进程,将其载入空闲区中,当该进程结束运行时,该区又可以被其它进程使用。不过这种方法早年前已经被弃用了。

  • 固定分区机制
    在操作系统内部保留一个用来标识哪块内存可用,哪块内存被占用,当一个进程到达时,就在表中根据首先适应最佳适应最差适应等策略为其分配一个合适的分区。如果分区过大,那么将可以被多个进程使用,如果无法使用就浪费了。当进程终止,就释放内存分区交还给系统,继续为其它进程服务。如果分区过小,可以合并使用。

以上2种方式,都存在着碎片问题,另外,维护内存分区及移动分区的代价过高

非连续内存分配,包含有:分页机制分段机制

  • 分页机制
    分页机制是一种非连续的内存管理策略,相比连续的内存管理策略来说,不需要再为找不到一块完整的、连续的内存而苦恼。在分页机制中,不会产生外部碎片,因为允许不连续,每一块内存都会被利用,但会产生内部碎片。
    物理内存分成固定大小的块,称为:,将逻辑内存也分成固定大小的块,称为:。程序执行的时候,页就从后备存储器装入到有效物理帧中。
    CPU执行的时候,产生的地址包含:页号页偏移
    逻辑内存是连续的,根据页号页偏移计算得到物理内存具体位置。
    memmem在程序的内存视觉中,内存都是单调递增的。
    通过页号可以索引到页表页表包含了物理内存每个页基地址,也就是帧号
    基地址乘以页面大小,再加上页偏移,就得到物理内存的具体地址。
    每个物理帧都在帧表中存储着,包含:分配了哪些帧、哪些帧空闲、共有多少帧。
    如果被分配了,那么会记录分配给了哪些进程
    由于通过页表这种方式计算出物理内存地址的方式需要访问2次内存,效率较,也就产生了TLB,这是一种较小的快速查找硬件高速缓冲,只通过页号就可以迅速查找到帧号。如果没有查找到,那就通过页表去查找。
    TLB由于现在的计算机支持的逻辑空间较大,在2^32 ~ 2^64之间,基于页表这种机制,这样会导致每一个进程需要创建至少4MB物理空间给页表使用。
    通过两级分页算法页表再次分页,分为:页号页表页号偏移量
    page利用这种方式,可以有效减少物理内存的使用量,甚至还可以使用三级分页再次减少内存使用量。

  • 分段机制
    分页机制不同,分段机制采用一段一段划分的,每一段的大小可以不一样,并且可以动态伸缩。在段表中记录的是二维地址,包含:< 段号 - 偏移量 >
    通过二维地址段表中查找计算出具体的物理地址segment虽然,分段机制不是连续的,但是每一段大小不一致,可能有些段过大,暂时难以找到合适的位置,需要等待且外部碎片会比较多。另外,通过分段,可以针对某些段设置为只读、共享,可以有效的保护内存段不被修改且节约内存。

CPU,中央处理器,是计算机系统的运算和控制中心,是信息处理、程序运行的最终执行单元。

CPU

程序,最终都会变成计算机指令,然后被CPU调度执行

CPU分为:运算器、控制器、高速缓冲存储器。

运算器,包含:算术运算器逻辑运算器,是用来实现加减乘除、与或非、移位、异或等的运算器。

控制器,包含:指令寄存器程序计数器操作控制器,是用来指挥各个部件,按照计算机指令的要求协调工作的部件,是计算机的神经中枢指挥中心
指令寄存器:用来保存当前执行或者即将要执行的指令的寄存器。
程序计数器:用来指明程序下一次要执行的指令的地址。
操作控制器:根据指令操作码和时序信号,产生各自操作信号,以便正确地创建数据链路,从而完成取指令和执行指令的控制。

高速缓冲存储器,位于主存和CPU之间的一级缓存,容量小,但是速度比主存快。

有了CPU之后,那么就需要利用调度算法去让CPU调度程序了。

CPU调度是多道程序操作系统的基础,通过在进程间转换CPU,操作系统可以提高计算机的生产力。

为了极尽CPU所能,操作系统不会让CPU闲着,一旦就绪队列中有新的进程,将被CPU调度,然后移交CPU使用权执行,而CPU如何从就绪队列中获取要执行的进程任务,就依赖于调度算法

CPU也不是随便就能调度的,需要等待时机

  • 当进程从运行状态转换为等待状态。
  • 当进程从运行状态转换为就绪状态。
  • 当进程从等待状态转换为就绪状态。
  • 当进程终止的时候。

衡量CPU调度算法也是有指标的。

  • CPU利用率
  • 吞吐量
  • 周转时间
  • 等待时间
  • 响应时间

理想的情况是,CPU利用率、吞吐量最大化,周转时间、等待时间、响应时间最小化

cpu-mark

围绕以上指标,产生了一些CPU调度算法

CPU调度,决定了哪个进程会被从就绪队列中拿出来,并被分配CPU资源。

CPU就绪队列作为一个队列,可以通过FIFO优先级无序来实现。

调度算法分为:

  • 先来先服务算法
    按照进入队列的顺序,一个一个调度。
  • 短作业优先算法
    本质上也是优先调度算法,根据CPU Burst排序。
  • 优先调度算法
    为每一进程配置优先级,根据优先级排序。
  • 轮转调度算法
    分配时间片,循环调度队列中的进程。
  • 多级队列调度算法
    将反馈队列分为多个队列,采用不同调度算法。
  • 多级队列反馈调度算法
    根据CPU Burst调整进程应该所处带有不同优先级的队列。

进程,是指运行中的程序,也称为作业。

早期的计算机,只允许运行 一个 程序,这个程序被分配了计算机所能提供的所有资源。
现代的计算机,允许并行运行 多个 程序,系统资源也就被这些程序瓜分。
每一个程序也就成为了一个 工作单元,将这些 工作单元 称呼为:进程
因此,进程 也是计算机系统分配资源的基本单位。

程序,是静态的代码,而进程是动态的实体。

程序,也就是 代码,在硬盘中存放着,经过 主存储器(内存) 的加载,被封装成 PCB 数据结构,在 内核队列 中排队,等待 CPU调用 执行。相同的一份代码,可以被多次加载,形成多个独立的进程,对于系统来说,不过是队列中多了一个成员,当然,资源也是需要被瓜分的。

操作系统通过进程控制块PCB来表示进程。  

PCB

进程控制块,也就是PCB,是操作系统内部的一种用来表示进程数据结构,记载着和进程相关的一些信息。
包括:内存指针、进程状态、进程号、程序计数器、寄存器、内存限制说明、I/O状态信息等。

内存指针,进程内存数据相关的指针。
进程状态,包含:新建、就绪、运行、阻塞、停止。
进程号,用来标识唯一进程的标识符,也就是PID。
程序计数器,用来标识程序要执行的下一条指令的地址。
寄存器,用于发生CPU中断时临时存储信息。
内存限制说明,包含了内存管理系统的一些信息,比如:页表、段表。
I/O状态信息,包含了进程打开的文件列表及分配给进程的I/O设备。

进程的一生都在队列之间徘徊,一旦被CPU调用,那么就出队列,改变状态,读取PCB中记录的信息,恢复现场(也就是CPU上下文切换),相同状态的PCB会形成链表,从程序计数器记录的地址开始执行,直到时间片用完或者被中断,回到队列之中或者执行结束,释放资源。

如果,所有的进程都是CPU繁忙型,那么等待队列(CPU都在忙着)几乎都是空的。
如果,所有的进程都是I/O繁忙型,那么就绪队列(CPU都在闲着)几乎都是空的。

那么,就需要合理分配进程组合,避免内存需求过多或者设备过度空闲。

一个进程,在运行期间可以创建多个子进程。

创建进程的进程称为父进程,通过fork来实现,而子进程也可以再创建子进程,形成一个进程树
创建进程是需要分配系统资源的,子进程所需要的资源可以直接从系统获取,也可以从父进程获取,父进程所拥有的数据也可以传送给子进程,父子进程也可以共享同样的资源。
子进程也不是无限制的创建的,需要受限于父进程,否则会负载过高。

进程之间,可以通过多种方式相互通信。

进程的通信可以分为:直接通信、间接通信。
具体的方式包含:管道、信号、消息队列、Socket、共享内存。

管道,一种半双工的通信方式,数据只能在父子进程之间单向流动。
信号,一种异步通信方式,通过监听、中断来实现,如:SIGINT。
消息队列,是一种保存在内核中的消息的链表,进程之间通过读写消息队列通信。
Socket,通过TCP、UDP协议进行通信。
共享内存,多个进程读取同一块共享的内存。

最后,进程是系统分配资源的基本单位,是执行中的程序,被封装为PCB数据结构,由CPU来进行调度,每次调度将会加载进程数据,切换上下文,通过此方式切换进程,成本也是过高。

因此,诞生了线程

线程,本质上是轻量级进程,且共享了进程所拥有的数据和资源。

线程,建立在进程的基础上,一个进程可以拥有多个线程,每一个线程共享进程的数据和资源,并且可以拥有自己独享的数据,是CPU执行的基本单位。通过线程这种方式,CPU无需进行复杂的进程上下文切换,只需要切换线程即可,内存数据在同一个进程内是共享的,相比切换进程来说,成本明显降低许多。

thread

一个线程,包含:代码、数据、打开文件列表、寄存区、程序计数器、堆栈。
多个线程之间,处于并行状态。

举例,在我们打开浏览器的时候,一个线程负责拉取数据,一个线程负责加载页面,一个线程负责显示图像等,这几个线程之间互不影响,换做进程的话,需要来回切换。

RPC的场景中,每来一个请求,就单独使用一个线程来处理,比起单线程来说,并行的方式显然提高了处理的效率。当然,线程也不是无限开辟的,可创建的线程数受限于所拥有的资源。虽然创建线程的成本比创建进程的成本低,但也不是没有成本的。为了降低这种成本,基于线程池,可以快速创建进程,充分利用资源。

线程的实现方式有:一对一、多对一、多对多。

线程分为用户线程内核线程,用户线程由内核线程来实现。

一对一模型情况下,一个用户线程由一个内核线程来实现。这种方式使得线程之间并行化,不会因为一个用户线程的阻塞导致其它用户线程的阻塞,但是资源消耗就比较大了。

多对一模型情况下,将多个用户线程映射到一个内核线程。这种方式使得线程管理变得方便了,但一次只能执行一个线程,无法并行化,如果一个用户线程阻塞,将导致所有由此内核线程实现的用户线程阻塞。

多对多模型情况下,多个用户线程映射到多个内核线程上,处于交叉状态,做到多路复用,一个内核线程阻塞的情况下,可以切换到其它的内核线程,避免了上面两种模型的缺点。

操作系统是被设计用来管理计算机硬件和应用程序的系统程序。

试想,一台计算机本身是一堆零件拼凑而成,而这些零件本身也具备了可编程能力,每一个应用程序要运行,必然要去操作调用这些硬件的接口,而多个应用程序之间各自以各自的方式去调用这些硬件,要这些应用程序开发商按照规矩去操作这些硬件,那是很困难的。

为了解决这个问题,操作系统就诞生了,将操作这些硬件的方式封装在操作系统当中,并给这些应用程序开发商提供统一的接口去调用,并管理控制这些应用程序能够有条不紊、无冲突地分配和使用系统资源。

操作系统

简单的说,操作系统相当于一个中介,帮助应用程序去调用硬件资源,同时也管理着应用程序。

操作系统是一个从始至终都运行在计算机中的程序,俗称内核

操作系统要正常运行,也需要底层硬件的支持,而硬件之间也要相互配合。

计算机底层的硬件包含:CPU、内存、磁盘、磁带、打印机等。

这些硬件通过一条公共总线连接到一块,程序指令及任务由CPU来负责调度,由内存和磁盘来存储资源,所有的硬件共同争抢总线资源,为了保证有序使用内存资源,由内存控制器来统一分配。程序只有被加载到内存当中,才能够被CPU执行。

早期计算机一次只能执行一个任务,为了提高处理速度,相似的任务会被分批执行。由于CPU的运行速度远远大于I/O设备的处理速度,为了使得CPU总有任务可以运行,不至于过于空闲,产生了作业系统,也就是多道程序设计,将作业放入底层作业队列中,由CPU空闲时从队列中取出任务然后执行。再后来,为了提高系统吞吐量,一个CPU已经无法满足需求了,一台计算机被植入了多个CPU,大大提高了计算机的处理能力。

系统总线

计算机从开机到启动操作系统需要经过一个初始化过程。

当点击开机按钮的时候,计算机通电,主板BIOS开始进行初始化固件操作,CPU开始运转。

计算机首先会进行一个自检操作,检查硬件是否正常,如果出现了异常,就发出声响或者关机、蓝屏、显示错误信息等。

自检通过以后,读取第一块磁盘的第一个扇区(主引导扇区),开始加载主引导记录MBR,计算机支持多系统的话,通常会有多个引导记录。引导记录是在磁盘格式化的时候写在磁盘上的。系统启动时,自动将它装入内存并用于加载操作系统的其它部分。

接着,启动Boot Loader 引导加载器,通常使用的是GRUB多操作系统启动程序。如果计算机安装了多个系统的话,可以在这个选择要进入的操作系统,同时会在这个阶段进行内存的初始化。

操作系统选择完毕之后,计算机的控制器就转移给了操作系统,操作系统的内核会被装载到系统内存之中,然后执行初始化操作。

初始化的时候,会调用系统底下的一个init方法,如:/sbin/init,执行后续的一些初始化及系统服务的启动,根据传入的参数,给用户展示的界面可以是命令行(通常是服务端),也可以是图形交互界面(通常是客户端)。

执行完以上操作,操作系统就启动了。

系统启动

应用层,建立在传输层的基础上,规定了应用程序的数据格式。

我们所使用的软件都在应用层上工作,每一个应用具有自己的数据格式,也就是要有共同方言。

只有规定好了数据格式,应用程序才可以和服务端正常交互,用户也才能正常使用这些应用程序。

这些数据格式,也被约定俗成为一些通用协议,也可以自定义协议。

比如:Email、HTTP、FTP都属于应用层协议。

应用程序通过实现这些协议,将应用层数据封装成协议规定的格式,然后由TCP或者UDP来进行传输。

七层数据

每一层对应的封装如下:

七层数据

常见的应用层协议有:DNS、HTTP。

DNS域名解析协议

IP地址,即使采用十进制点分法来标识,要记住也是比较困难的,因此产生了域名

域名,相比IP地址来说,会比较容易记住,如:www.baidu.com.

DNS协议是用来解析域名,获取真实IP地址的一种协议。

通过nslookup + 域名,再使用wireshark抓包,可以看到DNS查询的时候,采用的是UDP协议,而DNS服务器之间进行数据推送的时候,会采用TCP协议。

dns

HTTP超文本传输协议

HTTP协议,将数据以明文的方式传输,所有的www文件都采用这种方式。

浏览器为客户端,以TCP为底层传输方式。

通过浏览器向服务端发送请求,服务端向客户端浏览器返回响应。

http

简单的说,传输层就是将两个端口连接起来通信的介质。

网络层IP协议为非同一网络上的主机之间的精准通信提供了高效的手段。

数据链路层为同一网络下的主机之间的通信提供了基于MAC地址的身份识别手段。

物理层为数据信号在信道上传输、信号转换等提供了方案。

然而,同一台计算机内部,同一网络中,甚至是全球网络中的两台计算机的两个应用程序之间要相互通信,光靠以上三层是无法实现的,也因此引入了端口TCP协议UDP协议

传输层的主要任务是

  • 建立端口到端口的通信。
  • 通过传输层协议进行数据的传输。
端口

网络层通过IP协议,区分了子网。

以太网通过MAC地址,区分了主机。

传输层通过端口,区分了应用程序。

同一个IP下、同一个MAC地址下的同一台计算机,运行着许许多多的应用程序(进程),每一个应用程序通过端口来做唯一标识。

端口的范围为:0 - 65535,其中0 - 1023为系统占用端口,共16位, 2^16。

有了端口、IP、MAC地址之后,那么数据就可以找到传输的入口了。

传输层的传输方式包含:TCP(可靠)、UDP(不可靠)。

无论是TCP,还是UDP,目的都是为了发送IP数据包。

二者的区别主要是:有无连接、是否可靠、传输效率。

TCP-UDP

UDP,用户数据报协议。

UDP,不需要和目标主机建立连接,直接发送数据包即可。(无连接)

无论网络问题或者目标主机问题都不管,无法保证发送的分组一定到达,只能通过ICMP来回应异常报告,不会对分组进行超时重试、丢失重发、流量控制、乱序调整等。(不可靠)

正常情况,发出去的数据报是可以被成功接收的,并且省了建立及维护连接通道的成本。(高效)

UDP

UDP抓包情况如下:

UDP

UDP包含了:源IP地址目的IP地址协议号源端口目的端口 五元组,当一台计算机的进程收到多个进程的数据报时候,以此五元组来做区分。

TCP,传输层控制协议。

TCP,需要先和目标主机通过三次握手建立连接,然后才能进行通信。(有连接)

无论网络问题或者目标主机问题都要管,保证发送的分组可靠到达,但不是100%。当发送分组超时了,将会重试,丢失分组了,也会重发,为了防止网络崩溃,启用了滑动窗口机制来限制发送速率,通过ack机制及包编号实现了有序性。(可靠)

由于发送数据包之前需要先建立连接,用完了还要四次挥手断开连接,再用再建立连接,如此反复,成本较大。(低效)

TCP

TCP抓包情况如下:

TCP

序列号、确认号

由于TCP是面向 字节流 的,每一个字节都会被标注一个序列号。

比如:一段数据100字节。

第1次发送50个字节,序列号 seq 为1-50,第2次序列号从51开始。

当目标主机收到数据包之后,会回复确认号ack

控制位

TCP包含的控制位有:URGRSTPSHACKSYNFIN

TCP

TCP

URG,紧急指针,用于标识该数据包应该被优先接收。

RST,复位,当连接断了,再继续发送包会报这个错。

PSH,推送,表示数据包被成功推送。

SYN,同步,三次握手的时候会用到。

ACK,确认,三次握手和四次挥手的时候用到,确认。

FIN,终止,四次挥手的时候用到。

窗口大小

滑动窗口,Window Size Value 随着发送端和接收端的情况动态调整。

TCP需要先经过三次握手,才能实现可靠连接。

三次握手

三次握手的流程为

  • 客户端主动打开连接,通过向服务端发送SYN包,并发送自己的序列号seq = x,服务端端口监听,接收到之后,会将SYN放到Sync Queue半连接队列中,处理完后向客户端发送 SYN + ACK及客户端的确认号+1,ack = x + 1 ,表明已经收到了SYN连接请求,并发送自己的序列号 seq = y

  • 客户端收到服务端发送的 SYN + ACK 后,回复一个 ACK,并将对方的确认号+1,ack = y + 1

  • 如果客户端没有收到服务端发送的 SYN + ACK,那么服务端会再继续重发,直到超时或者成功。DDos中有一种 SYN Flood洪水攻击,通过伪造大量IP端口,并发送SYN,然后消失,导致服务端一直等不到SYN + ACK之后的ACK,导致稀缺的资源被占用,甚至宕机。

  • 最终,双方相互确认之后,正式建立连接,连接会在 Accept Queue 连接队列中维护,用来控制服务端 最大连接数

三次握手的原因

可以防止已经失效的连接请求报文又被突然传到服务端,导致错误产生。

一个因为网络延迟到达的SYN连接请求报文被服务端接收了,没有二次确认的话,服务端将会和客户端直接建立连接,否则客户端会将服务端回应的报文直接忽略

类似地,TCP需要先经过四次挥手,才能实现可靠断连。

四次挥手

四次挥手的流程为

  • 客户端发送 FIN主动关闭连接,并发送序列号seq = x,同时可能会发送ACK,用来确认之前接收的报文,然后进入FIN_WAIT_1状态,等待服务端回应,并且不会再发送数据了。

  • 服务端接收到客户端的FIN控制指令,此时还有未发送完的数据,那么就先回复一个ACK以及确认号ack = x + 1,表示自己已经接收到了客户端的FIN

  • 服务端发送完剩下的数据后,向客户端发送FIN,同样也会附带一个ACK,用来确认之前的指令,随后服务端进入CLOSE_WAIT半关闭状态,等待客户端ACK

  • 客户端接收到FIN之后,双方确认数据已经传输完毕,并立即回复一个ACK,服务端收到后,立即进入closed状态。

  • 但是,客户端需要进入TIME_WAIT状态,防止发出去的ACK没有被服务端接收。服务器端如果没有接收到ACK,将会重新发送FIN给客户端,直到超时或者成功。当然,也是因为这个特性,DDos攻击也会存在于这个阶段。

  • 最后,客户端真正关闭连接。

TCP通过拥塞控制防止过多数据注入网络,造成过载。

数据链路层中存在着流量控制,当收发数据的时候,双方能知道对方的剩余可用缓冲区大小,然后调整发送频率及数据量。

TCP当中,存在着比数据链路层更高级的拥塞控制,需要考虑双方的收发能力及链路的通畅程度。

TCP通过 滑动窗口慢开始快恢复 算法来实现拥塞控制。

当存在数据包没有被ACK,那么就会被认定为网络堵塞,然后调小 滑动窗口 大小,且门限阈值降低为原来的 一半

拥塞控制

拥塞避免,为了防止窗口增加速度过快,设置了一个慢开始门限,窗口每次增加1。

  • 当窗口大小小于门限的时候,采用慢开始算法。(指数增大)
  • 当窗口大小大于门限的时候,采用拥塞避免算法。(加法增大)
  • 当窗口大小大于门限的时候,两种算法都可以。

对于不同的广播域,要通信靠全球广播是不可行的,因此引入了网络层。

一个局域网称为一个广播域,在一个广播域中要进行通信,只需要向所有计算机发送请求,目标计算机在同一广播域中,收到请求后,响应即可。

不同的局域网就是不同的广播域,跨广播域通信,理论上可以向所有广播域发送请求,等待目标计算机响应即可,但是全球计算机数量过于庞大,一台计算机能接收到全世界计算机发送的包,纯靠广播容易产生网络风暴以及低效。

引入一套新的地址来区分不同的广播域、子网,这套地址称为:网络地址

网络层引入了IP、路由,跨广播域通信只能通过路由转发。

给计算机提供IP地址,经过路由器的转发,寻找到目标广播域,由目标广播域内部再进行广播,找到目标计算机即可。

跨广播域

简单的说,网络层 就是在 数据链路层 的基础上进一步管理网络中的数据通信。

广播的方式转为跳跃若干个中间节点的方式来完成数据通信。

跨广播域通信只能通过 路由转发

网络层包含了:IP、ARP、RARP、ICMP、IGMP、路由选择、拥塞控制。

IP协议,定义网络地址的协议。

通过IP地址,为网络上的计算机提供一个逻辑地址及编号。

IP协议,分为:IPv4、IPv6。

IPv4,由32位的二进制数组成,用点分隔,因为可读性差,通常写成4个十进制数。

范围: 0.0.0.0 - 255.255.255.255

通常,家庭里拨号上网,ISP就会给分配一个IP,每次拨号获得的IP都可能是不一致的。

网络上的其它计算机要通信可以通过分配到的IP找到家庭里的计算机。

IP地址 = 网络地址 + 主机地址

通过 子网掩码,可以很方便的将IP地址划分为网络部分主机部分

比如:

  • IP地址 172.16.10.1
  • 子网掩码 255.255.255.0

分别将二者转为二进制,再做一个与运算,得到 172.16.10.0,这部分就是网络部分,而主机部分则可以取值为 0.0.0.1 - 0.0.0.254,主机部分不能全为0,也不能全为1。

2个IP地址,通过与子网掩码做一个与运算,就可以计算出这两个IP是否在同一个子网中。不同子网中的2个IP地址是无法直接通信的,则需要通过 网关路由器来间接通信。

通过 tracert 或者 tracerouter 命令可以查看到IP在路由过程中跳跃的路径。

百度tracert

IP数据包作为数据链路层帧的数据部分。

IP数据包分为:头部和数据部分。

头部:大小在20到60字节之间。

数据部分:最长为65515字节。

超过下层数据链路帧限制的 MTU 1500字节的话,将需要分片传输。

数据包

网络层IP数据包的结构如下:

数据包

IP数据包抓包如下:

数据包

由于IP协议,无连接,不可靠,引入了ICMP。

发送IP数据包的时候,即使是丢包了,那么对于IP协议来说也是未知的,因此,通过ICMP(Internet控制报文协议)可以得知网络通不通、路由是否可达、是否超时等信息。

ICMP是一种无连接的面向无连接的控制报文协议,用来传输出错控制报文信息。

常用的就是ping命令。

正常情况下的ping如下
ping

异常的ping如下
ping

wireshark抓包情况如下
ping

正常情况下,每一个request都会收到一个reply,否则会收到no response found之类的响应。