Go语言——sync.Map详解
sync.Map是1.9才推荐的并发安全的map,除了互斥量以外,还运用了原子操作,所以在这之前,有必要了解下 Go语言——原子操作
创新互联咨询电话:18982081108,为您提供成都网站建设网页设计及定制高端网站建设服务,创新互联网页制作领域10余年,包括成都建筑动画等多个行业拥有丰富建站经验,选择创新互联,为企业保驾护航!
go1.10\src\sync\map.go
entry分为三种情况:
从read中读取key,如果key存在就tryStore。
注意这里开始需要加锁,因为需要操作dirty。
条目在read中,首先取消标记,然后将条目保存到dirty里。(因为标记的数据不在dirty里)
最后原子保存value到条目里面,这里注意read和dirty都有条目。
总结一下Store:
这里可以看到dirty保存了数据的修改,除非可以直接原子更新read,继续保持read clean。
有了之前的经验,可以猜测下load流程:
与猜测的 区别 :
由于数据保存两份,所以删除考虑:
先看第二种情况。加锁直接删除dirty数据。思考下貌似没什么问题,本身就是脏数据。
第一种和第三种情况唯一的区别就是条目是否被标记。标记代表删除,所以直接返回。否则CAS操作置为nil。这里总感觉少点什么,因为条目其实还是存在的,虽然指针nil。
看了一圈貌似没找到标记的逻辑,因为删除只是将他变成nil。
之前以为这个逻辑就是简单的将为标记的条目拷贝给dirty,现在看来大有文章。
p == nil,说明条目已经被delete了,CAS将他置为标记删除。然后这个条目就不会保存在dirty里面。
这里其实就跟miss逻辑串起来了,因为miss达到阈值之后,dirty会全量变成read,也就是说标记删除在这一步最终删除。这个还是很巧妙的。
真正的删除逻辑:
很绕。。。。
使用Go实现一个数据库连接池
开始本文之前,我们看一段Go连接数据库的代码:
本文内容我们将解释连接池背后是如何工作的,并 探索 如何配置数据库能改变或优化其性能。
转自:
整理:地鼠文档:
那么sql.DB连接池是如何工作的呢?
需要理解的最重要一点是,sql.DB池包含两种类型的连接——“正在使用”连接和“空闲”连接。当您使用连接执行数据库任务(例如执行SQL语句或查询行)时,该连接被标记为正在使用,任务完成后,该连接被标记为空闲。
当您使用Go执行数据库操作时,它将首先检查池中是否有可用的空闲连接。如果有可用的连接,那么Go将重用这个现有连接,并在任务期间将其标记为正在使用。如果在您需要空闲连接时池中没有空闲连接,那么Go将创建一个新的连接。
当Go重用池中的空闲连接时,与该连接有关的任何问题都会被优雅地处理。异常连接将在放弃之前自动重试两次,这时Go将从池中删除异常连接并创建一个新的连接来执行该任务。
连接池有四个方法,我们可以使用它们来配置连接池的行为。让我们一个一个地来讨论。
SetMaxOpenConns()方法允许您设置池中“打开”连接(使用中+空闲连接)数量的上限。默认情况下,打开的连接数是无限的。
一般来说,MaxOpenConns设置得越大,可以并发执行的数据库查询就越多,连接池本身成为应用程序中的瓶颈的风险就越低。
但让它无限并不是最好的选择。默认情况下,PostgreSQL最多100个打开连接的硬限制,如果达到这个限制的话,它将导致pq驱动返回”sorry, too many clients already”错误。
为了避免这个错误,将池中打开的连接数量限制在100以下是有意义的,可以为其他需要使用PostgreSQL的应用程序或会话留下足够的空间。
设置MaxOpenConns限制的另一个好处是,它充当一个非常基本的限流器,防止数据库同时被大量任务压垮。
但设定上限有一个重要的警告。如果达到MaxOpenConns限制,并且所有连接都在使用中,那么任何新的数据库任务将被迫等待,直到有连接空闲。在我们的API上下文中,用户的HTTP请求可能在等待空闲连接时无限期地“挂起”。因此,为了缓解这种情况,使用上下文为数据库任务设置超时是很重要的。我们将在书的后面解释如何处理。
SetMaxIdleConns()方法的作用是:设置池中空闲连接数的上限。缺省情况下,最大空闲连接数为2。
理论上,在池中允许更多的空闲连接将增加性能。因为它减少了从头建立新连接发生概率—,因此有助于节省资源。
但要意识到保持空闲连接是有代价的。它占用了本来可以用于应用程序和数据库的内存,而且如果一个连接空闲时间过长,它也可能变得不可用。例如,默认情况下MySQL会自动关闭任何8小时未使用的连接。
因此,与使用更小的空闲连接池相比,将MaxIdleConns设置得过高可能会导致更多的连接变得不可用,浪费资源。因此保持适量的空闲连接是必要的。理想情况下,你只希望保持一个连接空闲,可以快速使用。
另一件要指出的事情是MaxIdleConns值应该总是小于或等于MaxOpenConns。Go会强制保证这点,并在必要时自动减少MaxIdleConns值。
SetConnMaxLifetime()方法用于设置ConnMaxLifetime的极限值,表示一个连接保持可用的最长时间。默认连接的存活时间没有限制,永久可用。
如果设置ConnMaxLifetime的值为1小时,意味着所有的连接在创建后,经过一个小时就会被标记为失效连接,标志后就不可复用。但需要注意:
理论上,ConnMaxLifetime为无限大(或设置为很长生命周期)将提升性能,因为这样可以减少新建连接。但是在某些情况下,设置短期存活时间有用。比如:
如果您决定对连接池设置ConnMaxLifetime,那么一定要记住连接过期(然后重新创建)的频率。例如,如果连接池中有100个打开的连接,而ConnMaxLifetime为1分钟,那么您的应用程序平均每秒可以杀死并重新创建多达1.67个连接。您不希望频率太大而最终影响性能吧。
SetConnMaxIdleTime()方法在Go 1.15版本引入对ConnMaxIdleTime进行配置。其效果和ConnMaxLifeTime类似,但这里设置的是:在被标记为失效之前一个连接最长空闲时间。例如,如果我们将ConnMaxIdleTime设置为1小时,那么自上次使用以后在池中空闲了1小时的任何连接都将被标记为过期并被后台清理操作删除。
这个配置非常有用,因为它意味着我们可以对池中空闲连接的数量设置相对较高的限制,但可以通过删除不再真正使用的空闲连接来周期性地释放资源。
所以有很多信息要吸收。这在实践中意味着什么?我们把以上所有的内容总结成一些可行的要点。
1、根据经验,您应该显式地设置MaxOpenConns值。这个值应该低于数据库和操作系统对连接数量的硬性限制,您还可以考虑将其保持在相当低的水平,以充当基本的限流作用。
对于本书中的项目,我们将MaxOpenConns限制为25个连接。我发现这对于小型到中型的web应用程序和API来说是一个合理的初始值,但理想情况下,您应该根据基准测试和压测结果调整这个值。
2、通常,更大的MaxOpenConns和MaxIdleConns值会带来更好的性能。但是,效果是逐渐降低的,而且您应该注意,太多的空闲连接(连接没有被复用)实际上会导致性能下降和不必要的资源消耗。
因为MaxIdleConns应该总是小于或等于MaxOpenConns,所以对于这个项目,我们还将MaxIdleConns限制为25个连接。
3、为了降低上面第2点的风险,通常应该设置ConnMaxIdleTime值来删除长时间未使用的空闲连接。在这个项目中,我们将设置ConnMaxIdleTime持续时间为15分钟。
4、ConnMaxLifetime默认设置为无限大是可以的,除非您的数据库对连接生命周期施加了硬限制,或者您需要它协助一些操作,比如优雅地交换数据库。这些都不适用于本项目,所以我们将保留这个默认的无限制配置。
与其硬编码这些配置,不如更新cmd/api/main.go文件通过命令行参数读取配置。
ConnMaxIdleTime值比较有意思,因为我们希望它传递一段时间,最终需要将其转换为Go的time.Duration类型。这里有几个选择:
1、我们可以使用一个整数来表示秒(或分钟)的数量,并将其转换为time.Duration。
2、我们可以使用一个表示持续时间的字符串——比如“5s”(5秒)或“10m”(10分钟)——然后使用time.ParseDuration()函数解析它。
3、两种方法都可以很好地工作,但是在这个项目中我们将使用选项2。继续并更新cmd/api/main.go文件如下:
File: cmd/api/main.go
如何用go语言每分钟处理100万个请求
在Malwarebytes 我们经历了显著的增长,自从我一年前加入了硅谷的公司,一个主要的职责成了设计架构和开发一些系统来支持一个快速增长的信息安全公司和所有需要的设施来支持一个每天百万用户使用的产品。我在反病毒和反恶意软件行业的不同公司工作了12年,从而我知道由于我们每天处理大量的数据,这些系统是多么复杂。
有趣的是,在过去的大约9年间,我参与的所有的web后端的开发通常是通过Ruby on Rails技术实现的。不要错怪我。我喜欢Ruby on Rails,并且我相信它是个令人惊讶的环境。但是一段时间后,你会开始以ruby的方式开始思考和设计系统,你会忘记,如果你可以利用多线程、并行、快速执行和小内存开销,软件架构本来应该是多么高效和简单。很多年期间,我是一个c/c++、Delphi和c#开发者,我刚开始意识到使用正确的工具可以把复杂的事情变得简单些。
作为首席架构师,我不会很关心在互联网上的语言和框架战争。我相信效率、生产力。代码可维护性主要依赖于你如何把解决方案设计得很简单。
问题
当工作在我们的匿名遥测和分析系统中,我们的目标是可以处理来自于百万级别的终端的大量的POST请求。web处理服务可以接收包含了很多payload的集合的JSON数据,这些数据需要写入Amazon S3中。接下来,map-reduce系统可以操作这些数据。
按照习惯,我们会调研服务层级架构,涉及的软件如下:
Sidekiq
Resque
DelayedJob
Elasticbeanstalk Worker Tier
RabbitMQ
and so on…
搭建了2个不同的集群,一个提供web前端,另外一个提供后端处理,这样我们可以横向扩展后端服务的数量。
但是,从刚开始,在 讨论阶段我们的团队就知道我们应该使用Go,因为我们看到这会潜在性地成为一个非常庞大( large traffic)的系统。我已经使用了Go语言大约2年时间,我们开发了几个系统,但是很少会达到这样的负载(amount of load)。
我们开始创建一些结构,定义从POST调用得到的web请求负载,还有一个上传到S3 budket的函数。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
本地Go routines方法
刚开始,我们采用了一个非常本地化的POST处理实现,仅仅尝试把发到简单go routine的job并行化:
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // ----- DON'T DO THIS
}
w.WriteHeader(http.StatusOK)
}
对于中小负载,这会对大多数的人适用,但是大规模下,这个方案会很快被证明不是很好用。我们期望的请求数,不在我们刚开始计划的数量级,当我们把第一个版本部署到生产环境上。我们完全低估了流量。
上面的方案在很多地方很不好。没有办法控制我们产生的go routine的数量。由于我们收到了每分钟1百万的POST请求,这段代码很快就崩溃了。
再次尝试
我们需要找一个不同的方式。自开始我们就讨论过, 我们需要保持请求处理程序的生命周期很短,并且进程在后台产生。当然,这是你在Ruby on Rails的世界里必须要做的事情,否则你会阻塞在所有可用的工作 web处理器上,不管你是使用puma、unicore还是passenger(我们不要讨论JRuby这个话题)。然后我们需要利用常用的处理方案来做这些,比如Resque、 Sidekiq、 SQS等。这个列表会继续保留,因为有很多的方案可以实现这些。
所以,第二次迭代,我们创建了一个缓冲channel,我们可以把job排队,然后把它们上传到S3。因为我们可以控制我们队列中的item最大值,我们有大量的内存来排列job,我们认为只要把job在channel里面缓冲就可以了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue - payload
}
...
}
接下来,我们再从队列中取job,然后处理它们。我们使用类似于下面的代码:
func StartProcessor() {
for {
select {
case job := -Queue:
job.payload.UploadToS3() // -- STILL NOT GOOD
}
}
}
说实话,我不知道我们在想什么。这肯定是一个满是Red-Bulls的夜晚。这个方法不会带来什么改善,我们用了一个 有缺陷的缓冲队列并发,仅仅是把问题推迟了。我们的同步处理器同时仅仅会上传一个数据到S3,因为来到的请求远远大于单核处理器上传到S3的能力,我们的带缓冲channel很快达到了它的极限,然后阻塞了请求处理逻辑的queue更多item的能力。
我们仅仅避免了问题,同时开始了我们的系统挂掉的倒计时。当部署了这个有缺陷的版本后,我们的延时保持在每分钟以常量增长。
最好的解决方案
我们讨论过在使用用Go channel时利用一种常用的模式,来创建一个二级channel系统,一个来queue job,另外一个来控制使用多少个worker来并发操作JobQueue。
想法是,以一个恒定速率并行上传到S3,既不会导致机器崩溃也不好产生S3的连接错误。这样我们选择了创建一个Job/Worker模式。对于那些熟悉Java、C#等语言的开发者,可以把这种模式想象成利用channel以golang的方式来实现了一个worker线程池,作为一种替代。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool - w.JobChannel
select {
case job := -w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case -w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit - true
}()
}
我们已经修改了我们的web请求handler,用payload创建一个Job实例,然后发到JobQueue channel,以便于worker来获取。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload
work := Job{Payload: payload}
// Push the work onto the queue.
JobQueue - work
}
w.WriteHeader(http.StatusOK)
}
在web server初始化时,我们创建一个Dispatcher,然后调用Run()函数创建一个worker池子,然后开始监听JobQueue中的job。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
下面是dispatcher的实现代码:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := -JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := -d.WorkerPool
// dispatch the job to the worker job channel
jobChannel - job
}(job)
}
}
}
注意到,我们提供了初始化并加入到池子的worker的最大数量。因为这个工程我们利用了Amazon Elasticbeanstalk带有的docker化的Go环境,所以我们常常会遵守12-factor方法论来配置我们的生成环境中的系统,我们从环境变了读取这些值。这种方式,我们控制worker的数量和JobQueue的大小,所以我们可以很快的改变这些值,而不需要重新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
直接结果
我们部署了之后,立马看到了延时降到微乎其微的数值,并未我们处理请求的能力提升很大。
Elastic Load Balancers完全启动后,我们看到ElasticBeanstalk 应用服务于每分钟1百万请求。通常情况下在上午时间有几个小时,流量峰值超过每分钟一百万次。
我们一旦部署了新的代码,服务器的数量从100台大幅 下降到大约20台。
我们合理配置了我们的集群和自动均衡配置之后,我们可以把服务器的数量降至4x EC2 c4.Large实例,并且Elastic Auto-Scaling设置为如果CPU达到5分钟的90%利用率,我们就会产生新的实例。
总结
在我的书中,简单总是获胜。我们可以使用多队列、后台worker、复杂的部署设计一个复杂的系统,但是我们决定利用Elasticbeanstalk 的auto-scaling的能力和Go语言开箱即用的特性简化并发。
我们仅仅用了4台机器,这并不是什么新鲜事了。可能它们还不如我的MacBook能力强大,但是却处理了每分钟1百万的写入到S3的请求。
处理问题有正确的工具。当你的 Ruby on Rails 系统需要更强大的web handler时,可以考虑下ruby生态系统之外的技术,或许可以得到更简单但更强大的替代方案。
golang sync.Pool的用法及实现
正如sycn.Pool的名字所示,这是go中实现的一个对象池,为什么要有这个池呢?首先go是自带垃圾回收机制(也就是通常所说的gc)。gc会带来运行时的开销,对于高频的内存申请与释放,如果将不用的对象存放在一个池子中,用的时候从池子中取出一个对象,用完了再还回去,这样就能减轻gc的压力。
对于池这个概念,之前可能听说过连接池。能否用sync.Pool实现一个连接池呢?答案是不能的。因为对于sync.Pool而言,我们无法保证每次放回去再取出来的对象是与之前一致的,对象的内存存在着呗销毁的可能。因此,这个sync.Pool的存在仅仅是为了减缓gc的压力而生的。
定义sync.Pool的时候只需要设置一个New成员,它是一个函数,类型为func() interface{},当池子中没有空闲的对象时就会调用New函数生成一个。由于pool中对象的数量不可控,因此并没有传递任何与对象数量有关的参数。
然后,调用调用Get函数就可以取出一个对象,调用Put函数就可以将对象归还到池子中。
golang sync.pool对象复用 并发原理 缓存池
在go http每一次go serve(l)都会构建Request数据结构。在大量数据请求或高并发的场景中,频繁创建销毁对象,会导致GC压力。解决办法之一就是使用对象复用技术。在http协议层之下,使用对象复用技术创建Request数据结构。在http协议层之上,可以使用对象复用技术创建(w,*r,ctx)数据结构。这样即可以回快TCP层读包之后的解析速度,也可也加快请求处理的速度。
先上一个测试:
结论是这样的:
貌似使用池化,性能弱爆了???这似乎与net/http使用sync.pool池化Request来优化性能的选择相违背。这同时也说明了一个问题,好的东西,如果滥用反而造成了性能成倍的下降。在看过pool原理之后,结合实例,将给出正确的使用方法,并给出预期的效果。
sync.Pool是一个 协程安全 的 临时对象池 。数据结构如下:
local 成员的真实类型是一个 poolLocal 数组,localSize 是数组长度。这涉及到Pool实现,pool为每个P分配了一个对象,P数量设置为runtime.GOMAXPROCS(0)。在并发读写时,goroutine绑定的P有对象,先用自己的,没有去偷其它P的。go语言将数据分散在了各个真正运行的P中,降低了锁竞争,提高了并发能力。
不要习惯性地误认为New是一个关键字,这里的New是Pool的一个字段,也是一个闭包名称。其API:
如果不指定New字段,对象池为空时会返回nil,而不是一个新构建的对象。Get()到的对象是随机的。
原生sync.Pool的问题是,Pool中的对象会被GC清理掉,这使得sync.Pool只适合做简单地对象池,不适合作连接池。
pool创建时不能指定大小,没有数量限制。pool中对象会被GC清掉,只存在于两次GC之间。实现是pool的init方法注册了一个poolCleanup()函数,这个方法在GC之前执行,清空pool中的所有缓存对象。
为使多协程使用同一个POOL。最基本的想法就是每个协程,加锁去操作共享的POOL,这显然是低效的。而进一步改进,类似于ConcurrentHashMap(JDK7)的分Segment,提高其并发性可以一定程度性缓解。
注意到pool中的对象是无差异性的,加锁或者分段加锁都不是较好的做法。go的做法是为每一个绑定协程的P都分配一个子池。每个子池又分为私有池和共享列表。共享列表是分别存放在各个P之上的共享区域,而不是各个P共享的一块内存。协程拿自己P里的子池对象不需要加锁,拿共享列表中的就需要加锁了。
Get对象过程:
Put过程:
如何解决Get最坏情况遍历所有P才获取得对象呢:
方法1止前sync.pool并没有这样的设置。方法2由于goroutine被分配到哪个P由调度器调度不可控,无法确保其平衡。
由于不可控的GC导致生命周期过短,且池大小不可控,因而不适合作连接池。仅适用于增加对象重用机率,减少GC负担。2
执行结果:
单线程情况下,遍历其它无元素的P,长时间加锁性能低下。启用协程改善。
结果:
测试场景在goroutines远大于GOMAXPROCS情况下,与非池化性能差异巨大。
测试结果
可以看到同样使用*sync.pool,较大池大小的命中率较高,性能远高于空池。
结论:pool在一定的使用条件下提高并发性能,条件1是协程数远大于GOMAXPROCS,条件2是池中对象远大于GOMAXPROCS。归结成一个原因就是使对象在各个P中均匀分布。
池pool和缓存cache的区别。池的意思是,池内对象是可以互换的,不关心具体值,甚至不需要区分是新建的还是从池中拿出的。缓存指的是KV映射,缓存里的值互不相同,清除机制更为复杂。缓存清除算法如LRU、LIRS缓存算法。
池空间回收的几种方式。一些是GC前回收,一些是基于时钟或弱引用回收。最终确定在GC时回收Pool内对象,即不回避GC。用java的GC解释弱引用。GC的四种引用:强引用、弱引用、软引用、虚引用。虚引用即没有引用,弱引用GC但有空间则保留,软引用GC即清除。ThreadLocal的值为弱引用的例子。
regexp 包为了保证并发时使用同一个正则,而维护了一组状态机。
fmt包做字串拼接,从sync.pool拿[]byte对象。避免频繁构建再GC效率高很多。
网页名称:go语言内存池总结 go语言线程池
分享路径:http://scpingwu.com/article/hgesjh.html