1 Star 0 Fork 0

pku-min-java/min-browser-http

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
conn.go 12.49 KB
一键复制 编辑 原始数据 按行查看 历史
wzq 提交于 2021-11-02 18:47 +08:00 . fix:去除响应输出
/*
* @Author: hongyu guo
* @Description:
* @Version: 1.0.0
* @Date: 11:34 2021/06/15
* @Copyright: MIN-Group;国家重大科技基础设施——未来网络北大实验室;深圳市信息论与未来网络重点实验室
*/
package minhttp
import (
"bufio"
"context"
"crypto/tls"
"fmt"
"golang.org/x/net/http/httpguts"
"io"
"minhttp/common"
"net"
"runtime"
"sync"
"sync/atomic"
"time"
)
// A conn represents the server side of an HTTP connection.
type conn struct {
// server is the server on which the connection arrived.
// Immutable; never nil.
server *Server
// cancelCtx cancels the connection-level context.
cancelCtx context.CancelFunc
// DONE: type为自己实现的min-tcp接口
// minConn is the underlying network connection.
// This is never wrapped by other types and is the value given out
// to CloseNotifier callers. It is usually of type *net.TCPConn or
// *tls.Conn.
minConn net.Conn
// remoteAddr is minConn.RemoteAddr().String(). It is not populated synchronously
// inside the Listener's Accept goroutine, as some implementations block.
// It is populated immediately inside the (*conn).serve goroutine.
// This is the value of a Handler's (*Request).RemoteAddr.
remoteAddr string
// TODO: 第一版不实现TLS
// tlsState is the TLS connection state when using TLS.
// nil means not TLS.
tlsState *tls.ConnectionState
// werr is set to the first write error to minConn.
// It is set via checkConnErrorWriter{w}, where bufw writes.
werr error
// r is bufr's read source. It's a wrapper around minConn that provides
// io.LimitedReader-style limiting (while reading request headers)
// and functionality to support CloseNotifier. See *connReader docs.
r *connReader
// bufr reads from r.
bufr *bufio.Reader
// bufw writes to checkConnErrorWriter{c}, which populates werr on error.
bufw *bufio.Writer
// lastMethod is the method of the most recent request
// on this connection, if any.
lastMethod string
curReq atomic.Value // of *response (which has a Request in it)
curState struct{ atomic uint64 } // packed (unixtime<<8|uint8(ConnState))
// mu guards hijackedv
mu sync.Mutex
// hijackedv is whether this connection has been hijacked
// by a Handler with the Hijacker interface.
// It is guarded by mu.
hijackedv bool
}
// checkConnErrorWriter writes to c.rwc and records any write errors to c.werr.
// It only contains one field (and a pointer field at that), so it
// fits in an interface value without an extra allocation.
type checkConnErrorWriter struct {
c *conn
}
func (w checkConnErrorWriter) Write(p []byte) (n int, err error) {
n, err = w.c.minConn.Write(p)
if err != nil && w.c.werr == nil {
w.c.werr = err
w.c.cancelCtx()
}
return
}
// 缓冲池
var (
bufioReaderPool sync.Pool
bufioWriter2kPool sync.Pool
bufioWriter4kPool sync.Pool
)
// Serve a new connection
func (c *conn) serve(ctx context.Context) {
c.remoteAddr = c.minConn.RemoteAddr().String()
ctx = context.WithValue(ctx, LocalAddrContextKey, c.minConn.LocalAddr())
defer func() {
if err := recover(); err != nil && err != ErrAbortHandler {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
c.server.logf("http: panic serving %v: %v\n%s", c.remoteAddr, err, buf)
}
if !c.hijacked() {
c.close()
c.setState(c.minConn, StateClosed)
}
}()
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
c.r = &connReader{conn: c}
c.bufr = newBufioReader(c.r)
c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
for {
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// 只要读到了任何一个bytes, 就认为连接是active状态的
c.setState(c.minConn, StateActive)
}
if err != nil {
common.LogDebug("err:", err, ",remoteAddr = ", c.remoteAddr)
// TODO: 错误处理
const errorHeaders = "\r\nContent-Type: text/plain; charset=utf-8\r\nConnection: close\r\n\r\n"
switch {
case err == errTooLarge:
// Their HTTP client may or may not be
// able to read this if we're
// responding to them and hanging up
// while they're still writing their
// request. Undefined behavior.
const publicErr = "431 Request Header Fields Too Large"
fmt.Fprintf(c.minConn, "MINHTTP/1.1 "+publicErr+errorHeaders+publicErr)
c.closeWriteAndWait()
return
case isUnsupportedTEError(err):
// Respond as per RFC 7230 Section 3.3.1 which says,
// A server that receives a request message with a
// transfer coding it does not understand SHOULD
// respond with 501 (Unimplemented).
code := StatusNotImplemented
// We purposefully aren't echoing back the transfer-encoding's value,
// so as to mitigate the risk of cross side scripting by an attacker.
fmt.Fprintf(c.minConn, "MINHTTP/1.1 %d %s%sUnsupported transfer encoding", code, StatusText(code), errorHeaders)
return
case isCommonNetReadError(err):
return // don't reply
default:
if v, ok := err.(statusError); ok {
fmt.Fprintf(c.minConn, "MINHTTP/1.1 %d %s: %s%s%d %s: %s", v.code, StatusText(v.code), v.text, errorHeaders, v.code, StatusText(v.code), v.text)
return
}
publicErr := "400 Bad Request"
fmt.Fprintf(c.minConn, "MINHTTP/1.1 "+publicErr+errorHeaders+publicErr)
return
}
} else {
common.LogDebug("remoteAddr = ", c.remoteAddr)
}
common.LogDebug("finish readRequest")
req := w.req
//if req.expectsContinue() {
// if req.ProtoAtLeast(1, 1) && req.ContentLength != 0 {
// // Wrap the Body reader with one that replies on the connection
// //req.Body = &expectContinueReader{readCloser: req.Body, resp: w}
// w.canWriteContinue.setTrue()
// }
//} else if req.Header.get("Expect") != "" {
// w.sendExpectationFailed()
// return
//}
c.curReq.Store(w)
if requestBodyRemains(req.Body) {
// 未读取到EOF, 注册后台read函数
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
// 否则直接启动 后台read函数
w.conn.r.startBackgroundRead()
}
// HTTP cannot have multiple simultaneous active requests.[*]
// Until the server replies to this request, it can't read another,
// so we might as well run the handler in this goroutine.
// [*] Not strictly true: HTTP pipelining. We could let them all process
// in parallel even if their responses need to be serialized.
// But we're not going to implement HTTP pipelining because it
// was never deployed in the wild and the answer is HTTP/2.
serverHandler{c.server}.ServeHTTP(w, w.req)
w.cancelCtx()
if c.hijacked() {
return
}
w.finishRequest() // 返回hello world!
if !w.shouldReuseConnection() {
if w.requestBodyLimitHit || w.closedRequestBodyEarly() {
c.closeWriteAndWait()
}
return
}
c.setState(c.minConn, StateIdle)
c.curReq.Store((*response)(nil))
if !w.conn.server.doKeepAlives() {
// We're in shutdown mode. We might've replied
// to the user without "Connection: close" and
// they might think they can send another
// request, but such is life with HTTP/1.1.
return
}
if d := c.server.idleTimeout(); d != 0 {
c.minConn.SetReadDeadline(time.Now().Add(d))
if _, err := c.bufr.Peek(4); err != nil {
return
}
}
c.minConn.SetReadDeadline(time.Time{})
}
}
func (c *conn) setState(nc net.Conn, state ConnState) {
srv := c.server
switch state {
case StateNew:
srv.trackConn(c, true)
case StateHijacked, StateClosed:
srv.trackConn(c, false)
}
if state > 0xff || state < 0 {
panic("internal error")
}
packedState := uint64(time.Now().Unix()<<8) | uint64(state)
atomic.StoreUint64(&c.curState.atomic, packedState)
if hook := srv.ConnState; hook != nil {
hook(nc, state)
}
}
func (c *conn) hijacked() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.hijackedv
}
// Close the connection.
func (c *conn) close() {
c.finalFlush()
c.minConn.Close()
}
func (c *conn) finalFlush() {
if c.bufr != nil {
// Steal the bufio.Reader (~4KB worth of memory) and its associated
// reader for a future connection.
putBufioReader(c.bufr)
c.bufr = nil
}
if c.bufw != nil {
c.bufw.Flush()
// Steal the bufio.Writer (~4KB worth of memory) and its associated
// writer for a future connection.
putBufioWriter(c.bufw)
c.bufw = nil
}
}
// closeWrite flushes any outstanding data and sends a FIN packet (if
// client is connected via TCP), signalling that we're done. We then
// pause for a bit, hoping the client processes it before any
// subsequent RST.
//
// See https://golang.org/issue/3595
func (c *conn) closeWriteAndWait() {
c.finalFlush()
if tcp, ok := c.minConn.(closeWriter); ok {
tcp.CloseWrite()
}
time.Sleep(rstAvoidanceDelay)
}
func (c *conn) readRequest(ctx context.Context) (w *response, err error) {
if c.hijacked() {
return nil, ErrHijacked
}
// 设置读超时时间
var (
wholeReqDeadline time.Time // or zero if none
hdrDeadline time.Time // or zero if none
)
t0 := time.Now()
if d := c.server.readHeaderTimeout(); d != 0 {
hdrDeadline = t0.Add(d)
}
if d := c.server.ReadTimeout; d != 0 {
wholeReqDeadline = t0.Add(d)
}
c.minConn.SetReadDeadline(hdrDeadline)
// read request结束后设置写超时时间
if d := c.server.WriteTimeout; d != 0 {
defer func() {
c.minConn.SetWriteDeadline(time.Now().Add(d))
}()
}
c.r.setReadLimit(c.server.initialReadLimitSize())
// TODO: 应该不需要
// if c.lastMethod == "POST" {
// // RFC 7230 section 3 tolerance for old buggy clients.
// peek, _ := c.bufr.Peek(4) // ReadRequest will get err below
// c.bufr.Discard(numLeadingCRorLF(peek))
// }
req, err := readRequest(c.bufr, keepHostHeader)
if err != nil {
if c.r.hitReadLimit() {
return nil, errTooLarge
}
return nil, err
}
// TODO: 暂不需要判断是否是符合要求的http版本
// if !http1ServerSupportsRequest(req) {
// return nil, badRequestError("unsupported protocol version")
// }
c.lastMethod = req.Method
c.r.setInfiniteReadLimit()
hosts, haveHost := req.Header["Host"]
// TODO: 暂不支持HTTP2
// isH2Upgrade := req.isH2Upgrade()
// if req.ProtoAtLeast(1, 1) && (!haveHost || len(hosts) == 0) && !isH2Upgrade && req.Method != "CONNECT" {
// return nil, badRequestError("missing required Host header")
// }
if !haveHost {
return nil, badRequestError("missing required Host header")
}
if len(hosts) > 1 {
return nil, badRequestError("too many Host headers")
}
// TODO: host有效性检查
// header有效性检查
for k, vv := range req.Header {
if !httpguts.ValidHeaderFieldName(k) {
return nil, badRequestError("invalid header name")
}
for _, v := range vv {
if !httpguts.ValidHeaderFieldValue(v) {
return nil, badRequestError("invalid header value")
}
}
}
delete(req.Header, "Host")
ctx, cancelCtx := context.WithCancel(ctx)
req.ctx = ctx
req.RemoteAddr = c.remoteAddr
req.TLS = c.tlsState
// TODO: 应该不需要执行, 暂时不考虑trailer字段
// if body, ok := req.Body.(*body); ok {
// body.doEarlyClose = true
// }
// Adjust the read deadline if necessary.
if !hdrDeadline.Equal(wholeReqDeadline) {
c.minConn.SetReadDeadline(wholeReqDeadline)
}
w = &response{
conn: c,
cancelCtx: cancelCtx,
req: req,
reqBody: req.Body,
handlerHeader: make(Header),
contentLength: -1,
closeNotifyCh: make(chan bool, 1),
// We populate these ahead of time so we're not
// reading from req.Header after their Handler starts
// and maybe mutates it (Issue 14940)
wants10KeepAlive: req.wants10KeepAlive(),
wantsClose: req.wantsClose(),
}
// http2
// if isH2Upgrade {
// w.closeAfterReply = true
// }
w.cw.res = w
w.w = newBufioWriterSize(&w.cw, bufferBeforeChunkingSize)
return w, nil
}
func putBufioReader(br *bufio.Reader) {
br.Reset(nil)
bufioReaderPool.Put(br)
}
func putBufioWriter(bw *bufio.Writer) {
bw.Reset(nil)
if pool := bufioWriterPool(bw.Available()); pool != nil {
pool.Put(bw)
}
}
func bufioWriterPool(size int) *sync.Pool {
switch size {
case 2 << 10:
return &bufioWriter2kPool
case 4 << 10:
return &bufioWriter4kPool
}
return nil
}
func newBufioReader(r io.Reader) *bufio.Reader {
if v := bufioReaderPool.Get(); v != nil {
br := v.(*bufio.Reader)
br.Reset(r)
return br
}
// Note: if this reader size is ever changed, update
// TestHandlerBodyClose's assumptions.
return bufio.NewReader(r)
}
func newBufioWriterSize(w io.Writer, size int) *bufio.Writer {
pool := bufioWriterPool(size)
if pool != nil {
if v := pool.Get(); v != nil {
bw := v.(*bufio.Writer)
bw.Reset(w)
return bw
}
}
return bufio.NewWriterSize(w, size)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/pku-min-java/min-browser-http.git
git@gitee.com:pku-min-java/min-browser-http.git
pku-min-java
min-browser-http
min-browser-http
master

搜索帮助