go Channel原理 (三)

Channel

设计原理

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。

在主流编程语言中,多个线程传递数据的方式一般都是共享内存。
在这里插入图片描述
Go 可以使用共享内存加互斥锁进行通信,同时也提供了一种不同的并发模型,即通信顺序进程(Communicating sequential processes,CSP)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。在这里插入图片描述
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。

接收数据

两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。这是一个 生产者 - 消费者 模型,负责接收数据的 goroutine 从 channel 读取一个消息进行消费,channel 起到一个临界区/缓冲区的作用。

// chanrecv 函数接收 channel c 的元素并将其写入 ep 所指向的内存地址。
// 如果 ep 是 nil,说明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在没有数据可接收的情况下,返回 (false, false)
// 否则,如果 c 处于关闭状态,将 ep 指向的地址清零,返回 (true, false)
// 否则,用返回值填充 ep 指向的内存地址。返回 (true, true)
// 如果 ep 非空,则应该指向堆或者函数调用者的栈

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   // 省略 debug 内容 …………

   // 如果是一个 nil 的 channel
   if c == nil {
      // 如果不阻塞,直接返回 (false, false)
      if !block {
         return
      }
      // 否则,接收一个 nil 的 channel,goroutine 挂起
      gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
      // 不会执行到这里
      throw("unreachable")
   }

   // 在非阻塞模式下,快速检测到失败,不用获取锁,快速返回
   // 当我们观察到 channel 没准备好接收:
   // 1. 非缓冲型,等待发送列队 sendq 里没有 goroutine 在等待
   // 2. 缓冲型,但 buf 里没有元素
   // 之后,又观察到 closed == 0,即 channel 未关闭。
   // 因为 channel 不可能被重复打开,所以前一个观测的时候 channel 也是未关闭的,
   // 因此在这种情况下可以直接宣布接收失败,返回 (false, false)
   if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
      c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
      atomic.Load(&c.closed) == 0 {
      return
   }

   var t0 int64
   if blockprofilerate > 0 {
      t0 = cputicks()
   }

   // 加锁
   lock(&c.lock)

   // channel 已关闭,并且循环数组 buf 里没有元素
   // 这里可以处理非缓冲型关闭 和 缓冲型关闭但 buf 无元素的情况
   // 也就是说即使是关闭状态,但在缓冲型的 channel,
   // buf 里有元素的情况下还能接收到元素
   if c.closed != 0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(unsafe.Pointer(c))
      }
      // 解锁
      unlock(&c.lock)
      if ep != nil {
         // 从一个已关闭的 channel 执行接收操作,且未忽略返回值
         // 那么接收的值将是一个该类型的零值
         // typedmemclr 根据类型清理相应地址的内存
         typedmemclr(c.elemtype, ep)
      }
      // 从一个已关闭的 channel 接收,selected 会返回true
      return true, false
   }

   // 等待发送队列里有 goroutine 存在,说明 buf 是满的
   // 1. 非缓冲型的 channel。直接进行内存拷贝(从 sender goroutine -> receiver goroutine)
   // 2. 缓冲型的 channel,但 buf 满了。接收到循环数组头部的元素,并将发送者的元素放到循环数组尾部
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }

   // 缓冲型,buf 里有元素,可以正常接收
   if c.qcount > 0 {
      // 直接从循环数组里找到要接收的元素
      qp := chanbuf(c, c.recvx)

      // …………

      // 没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // 清理掉循环数组里相应位置的值
      typedmemclr(c.elemtype, qp)
      // 接收游标向前移动
      c.recvx++
      // 接收游标归零
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      // buf 数组里的元素个数减 1
      c.qcount--
      // 解锁
      unlock(&c.lock)
      return true, true
   }

   if !block {
      // 非阻塞接收,解锁。selected 返回 false,因为没有接收到值
      unlock(&c.lock)
      return false, false
   }
   
   // 构造一个 sudog 并设置相应参数
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }

   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.selectdone = nil
   mysg.c = c
   gp.param = nil
   // 进入 channel 的等待接收队列
   c.recvq.enqueue(mysg)
   // 将当前 goroutine 挂起
   goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

   // 被唤醒了,接着从这里继续执行一些扫尾工作
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   // 释放当前 gorountine 的 sudog
   releaseSudog(mysg)
   return true, !closed
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是非缓冲型的 channel
   if c.dataqsiz == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      
      // 未忽略接收的数据 不是 "<- ch",而是 "val <- ch",ep 指向 val
      if ep != nil {
         // 直接拷贝数据,从 sender goroutine -> receiver goroutine
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // 缓冲型的 channel,但 buf 已满。
      // 1. 循环数组 buf 队首的元素拷贝到接收数据的地址
      // 2. 将 sender 的数据入队。
      qp := chanbuf(c, c.recvx)
      // …………
      // 将 recvx 处的数据拷贝给接收者
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }

      // sender data -> buf
      typedmemmove(c.elemtype, qp, sg.elem)
      // 更新索引
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx
   }
   sg.elem = nil
   gp := sg.g

   // 解锁
   unlockf()
   gp.param = unsafe.Pointer(sg)
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }

   // 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
   goready(gp, skip+1)
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

从 channel 接收消息 的 核心函数是 chanrecv

跟 send 流程差不多:
特殊情况:

  1. 如果 channel 为空,那么会直接调用 runtime.gopark 挂起当前 goroutine。
  2. 如果 channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回零值。
    正常情况:
  3. 如果 channel 的 sendq 队列中存在挂起的 goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 goroutine 的数据拷贝到缓冲区。
  4. 如果 channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据。
  5. 在默认情况下会挂起当前的 goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒。
    从 channel 接收数据时,会触发 goroutine 调度的两个时机:
  6. 当 channel 为空时。
  7. 当缓冲区中不存在数据并且也不存在数据的发送者时。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/761323.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用热力图表示联邦学习场景中的客户端数据分布

用于生成热力图&#xff0c;记录过程&#xff0c;方便之后直接使用。 使用场景&#xff1a;联邦学习中显示客户端数据分布&#xff0c;或者显示数据分布的各类其他场景 文章目录 一、代码hot.py使用方法 二、参数解释三、样图关键词 一、代码 写这段代码时主要考虑联邦学习中显…

阿里云物联网应用层开发:第一部分,项目简介

文章目录 1、物联网应用层简介2、阿里云物联网应用层开发例程主要内容3、需要掌握基础知识 1、物联网应用层简介 应用层是物联网系统的用户界面&#xff0c;它提供了用户与系统交互的接口&#xff0c;这一层是将网络传输层的数据结果以易于理解和使用的方式呈现给用户&#xf…

在AvaotaA1全志T527开发板上使用 SSH 连接开发板

使用 SSH 连接开发板 启动系统 前提条件&#xff1a; 确保已经制作好AvaotaA1系统镜像至TF卡。 ​ 确保开发板电源供电正常&#xff1a;默认SPI显示屏有图案输出。 确保当前环境下有可以正常上网的路由器RJ45网线接口。 获取IP地址 如果想通过ssh去登陆开发板系统&#…

广义加性模型

简要介绍 在统计学中&#xff0c;广义加性模型&#xff08;GAM&#xff09;是一种广义线性模型&#xff0c;其中线性响应变量线性地依赖于一些预测变量的未知光滑函数&#xff0c;并且人们对这些光滑函数的推理很感兴趣。 GAM最初由Trevor Hastie和Robert Tibshirani[1]开发&…

数据写入流程,数据读取流程

理解客户端在HDFS上读、写数据的流程 数据写入流程 1. 客户端向NameNode发起请求 2. NameNode审核权限、剩余空间后&#xff0c;满足条件允许写入&#xff0c;并告知客户端写入的DataNode地址 3. 客户端向指定的DataNode发送数据包 4. 被写入数据的DataNode同时完成数据副本的…

mysql岗位实习----教务系统管理

教务管理系统 一、DDL CREATE TABLE users (user_id int(11) NOT NULL AUTO_INCREMENT COMMENT 用户ID,username varchar(50) NOT NULL COMMENT 用户名,password varchar(255) NOT NULL COMMENT 密码,gender enum(男,女) NOT NULL COMMENT 性别,email varchar(100) DEFAULT N…

绿色算力|暴雨发布浸没式液冷服务器

随着数字经济的飞速发展和AI创新应用的不断突破&#xff0c;算力规模持续增长&#xff0c;最新发布的《数字中国发展报告&#xff08;2023年&#xff09;》显示&#xff0c;2023年中国算力总规模达到230EFLOPS&#xff0c;居全球第二位。 服务器作为算力基础设施底座&#xff…

spring中集成mybatis,并测试是否成功

首先你要配置pom.xml <!-- 连接 MySQL 数据库的驱动程序 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.16</version></dependency><!-- spring-jdbc -->&…

OSRAM欧司朗XBO短弧氙灯160WHSXLOFR短弧氙灯450W

OSRAM欧司朗XBO短弧氙灯160WHSXLOFR短弧氙灯450W

【python012】Python根据页码处理PDF文件的内容

在日常工作和学习中&#xff0c;需要从PDF文件中提取特定页面的内容&#xff0c;以便进行知识、材料压缩等。 2.欢迎点赞、关注、批评、指正&#xff0c;互三走起来&#xff0c;小手动起来&#xff01; 3.欢迎点赞、关注、批评、指正&#xff0c;互三走起来&#xff0c;小手动起…

Java外卖点餐连锁店餐饮生鲜奶茶外卖店内扫码点餐源码同城外卖校园外卖源码

外卖点餐连锁店餐饮生鲜奶茶外卖小程序&#xff1a;打造一站式便捷服务 &#x1f680; 引领外卖新潮流&#xff1a;小程序的力量 在数字化时代&#xff0c;外卖小程序已成为餐饮行业的新宠。它不仅提供了便捷的点餐服务&#xff0c;还融合了连锁店、餐饮生鲜、奶茶等多种业务…

【机器学习】机器学习与医疗健康在疾病预测中的融合应用与性能优化新探索

文章目录 引言第一章&#xff1a;机器学习在医疗健康中的应用1.1 数据预处理1.1.1 数据清洗1.1.2 数据归一化1.1.3 特征工程 1.2 模型选择1.2.1 逻辑回归1.2.2 决策树1.2.3 随机森林1.2.4 支持向量机1.2.5 神经网络 1.3 模型训练1.3.1 梯度下降1.3.2 随机梯度下降1.3.3 Adam优化…

P4. 微服务: 匹配系统(下)

P4. 微服务: 匹配系统 下 0 概述1 游戏同步系统1.1 游戏同步的设计1.2 游戏同步的实现 2 匹配系统微服务的实现2.1 微服务概述2.2 匹配系统接口url的实现2.3 微服务之间的通信2.4 匹配逻辑的实现2.5 匹配系统的权限控制 3 bug的解决3.1 自己匹配自己3.2 断开连接问题 0 概述 本…

3403两个图像分析引擎差异

1.设置环境变量差异 2.获取数据大小差异 3.ATC差异

【EXCEL技巧】Excel如何将数字前面的0去掉

Excel文件中经常会遇到数据是0001345这种&#xff0c;那么&#xff0c;如何将数字前面的0去掉呢&#xff1f;今天和大家分享方法。 首先&#xff0c;选中一列空的单元格&#xff0c;然后在单元格中输入公式TEXT(D3,0)&#xff0c;这里的D3指的是前面带有0的数据的位置 回车之后…

Elasticsearch:Painless scripting 语言(一)

Painless 是一种高性能、安全的脚本语言&#xff0c;专为 Elasticsearch 设计。你可以使用 Painless 在 Elasticsearch 支持脚本的任何地方安全地编写内联和存储脚本。 Painless 提供众多功能&#xff0c;这些功能围绕以下核心原则&#xff1a; 安全性&#xff1a;确保集群的…

【征服数据结构】:期末通关秘籍

【征服数据结构】&#xff1a;期末通关秘籍 &#x1f498; 数据结构的基本概念&#x1f608; 数据结构的基本概念&#x1f608; 逻辑结构和存储结构的区别和联系&#x1f608; 算法及其特性&#x1f608; 简答题 &#x1f498; 线性表&#xff08;链表、单链表&#xff09;&…

RPC架构基本结构和核心技术

当你在构建一个分布式系统时&#xff0c;势必需要考虑的一个问题是&#xff1a;如何实现服务与服务之间高效调用&#xff1f;当然&#xff0c;你可以使用Dubbo或Spring Cloud等分布式服务框架来完成这个目标&#xff0c;这些框架帮助我们封装了技术实现的复杂性。那么&#xff…

【论文阅读】-- 研究时间序列可视化,提升用户体验

Investigating Time Series Visualisations to Improve the User Experience 摘要1 引言2 相关工作互动技巧视觉编码坐标系 3 用户研究时间序列可视化互动技巧任务实验设计 4 结果交互技术的效果视觉编码的影响坐标系的影响 5 讨论交互技术的效果视觉编码的影响坐标系的影响 6 …

[JS]正则表达式

介绍 正则表达式是定义匹配字符串的规则, 在JS中, 正则表达式也是对象, 通常用于查找或替换符合规则的文本 许多语言都支持正则表达式, 在前端中常见的场景就是表单验证和敏感词替换 语法 正则字面量 / / const str 好好学习,天天向上 // 1.定义规则: const reg /好///…