diff --git a/relay-p2p/api.js b/relay-p2p/api.js new file mode 100644 index 0000000000000000000000000000000000000000..e077c026370cc85c6bc8b78b73eb617909320455 --- /dev/null +++ b/relay-p2p/api.js @@ -0,0 +1,511 @@ +export default function ({ app, mesh, punch }) { + var currentListens = [] + var currentTargets = {} + + function allEndpoints() { + return mesh.discover() + } + + function allInbound(ep) { + if (ep === app.endpoint.id) { + return getLocalConfig().then( + config => config.inbound + ) + } else { + return requestPeer(ep, new Message( + { + method: 'GET', + path: `/api/inbound`, + } + )).then(res => res ? JSON.decode(res.body) : null) + } + } + + function allOutbound(ep) { + if (ep === app.endpoint.id) { + return getLocalConfig().then( + config => config.outbound + ) + } else { + return requestPeer(ep, new Message( + { + method: 'GET', + path: `/api/outbound`, + } + )).then(res => res ? JSON.decode(res.body) : null) + } + } + + function getInbound(ep, protocol, name) { + if (ep === app.endpoint.id) { + return getLocalConfig().then( + config => { + var inbound = config.inbound.find( + i => i.protocol === protocol && i.name === name + ) || null + var hole = punch.findHole(ep) + } + ) + } else { + return requestPeer(ep, new Message( + { + method: 'GET', + path: `/api/inbound/${protocol}/${name}`, + } + )).then(res => res?.head?.status === 200 ? JSON.decode(res.body) : null) + } + } + + function getOutbound(ep, protocol, name) { + if (ep === app.endpoint.id) { + return getLocalConfig().then( + config => config.outbound.find( + o => o.protocol === protocol && o.name === name + ) || null + ) + } else { + return requestPeer(ep, new Message( + { + method: 'GET', + path: `/api/outbound/${protocol}/${name}`, + } + )).then(res => res?.head?.status === 200 ? JSON.decode(res.body) : null) + } + } + + function setInbound(ep, protocol, name, listens, exits) { + if (ep === app.endpoint.id) { + exits = exits || [] + checkProtocol(protocol) + checkName(name) + checkListens(listens) + checkExits(exits) + return getLocalConfig().then(config => { + var all = config.inbound + var ent = { protocol, name, listens, exits } + var i = all.findIndex(i => i.protocol === protocol && i.name === name) + if (i >= 0) { + all[i] = ent + } else { + all.push(ent) + } + setLocalConfig(config) + applyLocalConfig(config) + }) + } else { + return requestPeer(ep, new Message( + { + method: 'POST', + path: `/api/inbound/${protocol}/${name}`, + }, + JSON.encode({ listens, exits }) + )) + } + } + + function setOutbound(ep, protocol, name, targets, entrances) { + if (ep === app.endpoint.id) { + entrances = entrances || [] + checkProtocol(protocol) + checkName(name) + checkTargets(targets) + checkEntrances(entrances) + return getLocalConfig().then(config => { + var all = config.outbound + var ent = { protocol, name, targets, entrances } + var i = all.findIndex(o => o.protocol === protocol && o.name === name) + if (i >= 0) { + all[i] = ent + } else { + all.push(ent) + } + setLocalConfig(config) + applyLocalConfig(config) + }) + } else { + return requestPeer(ep, new Message( + { + method: 'POST', + path: `/api/outbound/${protocol}/${name}`, + }, + JSON.encode({ targets, entrances }) + )) + } + } + + function deleteInbound(ep, protocol, name) { + if (ep === app.endpoint.id) { + return getLocalConfig().then(config => { + var all = config.inbound + var i = all.findIndex(i => i.protocol === protocol && i.name === name) + if (i >= 0) { + all.splice(i, 1) + setLocalConfig(config) + } + }) + } else { + return requestPeer(ep, new Message( + { + method: 'DELETE', + path: `/api/inbound/${protocol}/${name}`, + } + )) + } + } + + function deleteOutbound(ep, protocol, name) { + if (ep === app.endpoint.id) { + return getLocalConfig().then(config => { + var all = config.outbound + var i = all.findIndex(o => o.protocol === protocol && o.name === name) + if (i >= 0) { + all.splice(i, 1) + setLocalConfig(config) + } + }) + } else { + return requestPeer(ep, new Message( + { + method: 'DELETE', + path: `/api/outbound/${protocol}/${name}`, + } + )) + } + } + + function createHole(ep, role) { + var h = punch.findHole(ep) + if(h) return h + + if(role === 'server') { + return punch.createOutboundHole(ep) + } else if(role === 'client') { + return punch.createInboundHole(ep) + } + } + + function updateHoleInfo(ep, ip, port, cert) { + checkIP(ip) + checkPort(Number.parseInt(port)) + punch.updateHoleInfo(ep, ip, port, cert) + } + + function syncPunch(ep) { + var hole = punch.findHole(ep) + if(!hole) throw `Invalid Hole State for ${ep}` + hole.punch() + } + + function deleteHole(ep, remote) { + punch.deleteHole(ep, remote) + } + + function getLocalConfig() { + return mesh.read('/local/config.json').then( + data => data ? JSON.decode(data) : { inbound: [], outbound: [] } + ) + } + + function setLocalConfig(config) { + mesh.write('/local/config.json', JSON.encode(config)) + } + + function applyLocalConfig(config) { + currentListens.forEach(l => { + var protocol = l.protocol + var ip = l.ip + var port = l.port + if (!config.inbound.some(i => ( + i.protocol === protocol && + i.listens.some(l => l.ip === ip && l.port === port) + ))) { + pipy.listen(`${ip}:${port}`, protocol, null) + app.log(`Stopped ${protocol} listening ${ip}:${port}`) + } + }) + + currentListens = [] + currentTargets = {} + + console.info(`Applying config: ${JSON.encode(config)}`) + + config.inbound.forEach(i => { + var protocol = i.protocol + var name = i.name + var listens = i.listens + var $selectedEP + + var connectPeer = pipeline($=>$ + .connectHTTPTunnel( + new Message({ + method: 'CONNECT', + path: `/api/outbound/${protocol}/${name}`, + }) + ).to($=>$.pipe(() => { + punch.createInboundHole($selectedEP) + var hole = punch.findHole($selectedEP) + if(hole && hole.ready()) { + console.info("Using direct session") + return hole.directSession() + } + console.info("Using hub forwarded session") + return pipeline($=>$ + .muxHTTP().to($=>$ + .pipe(mesh.connect($selectedEP)) + ) + ) + })) + .onEnd(() => app.log(`Disconnected from ep ${$selectedEP} for ${protocol}/${name}`)) + ) + + var pass = null + var deny = pipeline($=>$.replaceStreamStart(new StreamEnd)) + + switch (protocol) { + case 'tcp': + pass = connectPeer + break + case 'udp': + pass = pipeline($=>$ + .replaceData(data => new Message(data)) + .encodeWebSocket() + .pipe(connectPeer) + .decodeWebSocket() + .replaceMessage(msg => msg.body) + ) + break + } + + var p = pipeline($=>$ + .onStart(() => + ((i.exits && i.exits.length > 0) + ? Promise.resolve(i.exits) + : mesh.discover().then(list => list.map(ep => ep.id)) + ).then(exits => Promise.all( + exits.map( + id => getOutbound(id, protocol, name).then( + o => o ? { ep: id, ...o } : null + ) + ) + )).then(list => { + list = list.filter(o => (o && ( + !o.entrances || + o.entrances.length === 0 || + o.entrances.includes(app.endpoint.id) + ))) + if (list.length > 0) { + $selectedEP = list[Math.floor(Math.random() * list.length)].ep + app.log(`Connect to ep ${$selectedEP} for ${protocol}/${name}`) + } else { + app.log(`No exit found for ${protocol}/${name}`) + } + return new Data + }) + ) + .pipe(() => $selectedEP ? pass : deny) + ) + + listens.forEach(l => { + try { + pipy.listen(`${l.ip}:${l.port}`, protocol, p) + currentListens.push({ protocol, ip: l.ip, port: l.port }) + app.log(`Started ${protocol} listening ${l.ip}:${l.port}`) + } catch (err) { + app.log(`Cannot open port ${l.ip}:${l.port}: ${err}`) + } + }) + }) + + config.outbound.forEach(o => { + var key = `${o.protocol}/${o.name}` + currentTargets[key] = new algo.LoadBalancer(o.targets) + }) + } + + function requestPeer(ep, req) { + var $response + return pipeline($=>$ + .onStart(req) + .pipe(() => { + var h = punch.findHole(ep) + if(h && h.ready()) { + return h.directSession() + } + return pipeline($=>$ + .muxHTTP().to($=>$ + .pipe(mesh.connect(ep)) + )) + }) + .replaceMessage(res => { + $response = res + return new StreamEnd + }) + .onEnd(() => { + console.info('Answers in api: ', $response) + return $response + }) + ).spawn() + } + + var matchApiOutbound = new http.Match('/api/outbound/{proto}/{name}') + var response200 = new Message({ status: 200 }) + var response404 = new Message({ status: 404 }) + + var $resource + var $target + var $protocol + + var servePeerInbound = pipeline($=>$ + .acceptHTTPTunnel(req => { + var params = matchApiOutbound(req.head.path) + var proto = params?.proto + var name = params?.name + var key = `${proto}/${name}` + var lb = currentTargets[key] + if (lb) { + $resource = lb.allocate() + var target = $resource.target + var host = target.host + var port = target.port + $target = `${host}:${port}` + $protocol = proto + app.log(`Connect to ${$target} for ${key}`) + return response200 + } + app.log(`No target found for ${key}`) + return response404 + }).to($=>$ + .pipe(() => $protocol, { + 'tcp': ($=>$ + .connect(() => $target) + ), + 'udp': ($=>$ + .decodeWebSocket() + .replaceMessage(msg => msg.body) + .connect(() => $target, { protocol: 'udp' }) + .replaceData(data => new Message(data)) + .encodeWebSocket() + ) + }) + .onEnd(() => { + $resource.free() + app.log(`Disconnected from ${$target}`) + }) + ) + ) + + var tunnelHole = null + var makeRespTunnel = pipeline($=>$ + .onStart(ctx => { + console.info("Making resp tunnel: ", ctx) + var ep = ctx.peer.id + tunnelHole = punch.findHole(ep) + if(!tunnelHole) throw `Invalid Hole State for ${ep}` + return new Data + }) + .pipe(() => { + var p = tunnelHole.makeRespTunnel() + tunnelHole = null + return p + }, () => tunnelHole) + ) + + getLocalConfig().then(applyLocalConfig) + + return { + allEndpoints, + allInbound, + allOutbound, + getInbound, + getOutbound, + setInbound, + setOutbound, + deleteInbound, + deleteOutbound, + createHole, + updateHoleInfo, + makeRespTunnel, + syncPunch, + deleteHole, + servePeerInbound, + } +} + +function checkProtocol(protocol) { + switch (protocol) { + case 'tcp': + case 'udp': + return + default: throw `invalid protocol '${protocol}'` + } +} + +function checkName(name) { + if ( + typeof name !== 'string' || + name.indexOf('/') >= 0 + ) throw `invalid name '${name}'` +} + +function checkIP(ip) { + try { + new IP(ip) + } catch { + throw `malformed IP address '${ip}'` + } +} + +function checkHost(host) { + if ( + typeof host !== 'string' || + host.indexOf(':') >= 0 || + host.indexOf('[') >= 0 || + host.indexOf(']') >= 0 + ) throw `invalid host '${host}'` +} + +function checkPort(port) { + if ( + typeof port !== 'number' || + port < 1 || port > 65535 + ) throw `invalid port number: ${port}` +} + +function checkUUID(uuid) { + if ( + typeof uuid !== 'string' || + uuid.length !== 36 || + uuid.charAt(8) != '-' || + uuid.charAt(13) != '-' || + uuid.charAt(18) != '-' || + uuid.charAt(23) != '-' + ) throw `malformed UUID '${uuid}'` +} + +function checkListens(listens) { + if (!(listens instanceof Array)) throw 'invalid listen array' + listens.forEach(l => { + if (typeof l !== 'object' || l === null) throw 'invalid listen' + checkIP(l.ip) + checkPort(l.port) + }) +} + +function checkTargets(targets) { + if (!(targets instanceof Array)) throw 'invalid target array' + targets.forEach(t => { + if (typeof t !== 'object' || t === null) throw 'invalid target' + checkHost(t.host) + checkPort(t.port) + }) +} + +function checkExits(exits) { + if (!(exits instanceof Array)) throw 'invalid exit array' + exits.forEach(e => checkUUID(e)) +} + +function checkEntrances(entrances) { + if (!(entrances instanceof Array)) throw 'invalid entrance array' + entrances.forEach(e => checkUUID(e)) +} diff --git a/relay-p2p/cli.js b/relay-p2p/cli.js new file mode 100644 index 0000000000000000000000000000000000000000..6f2416dfb07f228201d9b174bbc1367406e0be8e --- /dev/null +++ b/relay-p2p/cli.js @@ -0,0 +1,303 @@ +export default function ({ api, utils }) { + return pipeline($=>$ + .onStart(ctx => main(ctx)) + ) + + function main({ argv, endpoint }) { + var buffer = new Data + + function output(str) { + buffer.push(str) + } + + function error(err) { + output('ztm: ') + output(err.message || err.toString()) + output('\n') + } + + function flush() { + return [buffer, new StreamEnd] + } + + var endpoints = null + + function allEndpoints() { + if (endpoints) return Promise.resolve(endpoints) + return api.allEndpoints().then(list => (endpoints = list)) + } + + function lookupEndpointNames(list) { + return allEndpoints().then(endpoints => ( + list.map(id => { + var ep = endpoints.find(ep => ep.id === id) + return ep ? ep.name : id + }) + )) + } + + function lookupEndpointIDs(list) { + return allEndpoints().then(endpoints => ( + list.flatMap(name => { + if (endpoints.some(ep => ep.id === name)) return name + var list = endpoints.filter(ep => ep.name === name) + if (list.length === 1) return list[0].id + if (list.length === 0) throw `Endpoint '${name}' not found` + return list.map(ep => ep.id) + }) + )) + } + + try { + return utils.parseArgv(['ztm tunnel', ...argv], { + help: text => Promise.resolve(output(text + '\n')), + notes: objectTypeNotes + objectNameNotes, + commands: [ + + { + title: 'List objects of the specified type', + usage: 'get ', + notes: objectTypeNotes, + action: (args) => { + switch (validateObjectType(args, 'get')) { + case 'inbound': return getInbound() + case 'outbound': return getOutbound() + } + } + }, + + { + title: 'Show detailed info of the specified object', + usage: 'describe ', + notes: objectTypeNotes + objectNameNotes, + action: (args) => { + var name = args[''] + switch (validateObjectType(args, 'describe')) { + case 'inbound': return describeInbound(name) + case 'outbound': return describeOutbound(name) + } + } + }, + + { + title: 'Create an object of the specified type', + usage: 'open ', + options: ` + For inbound end: + + --listen <[ip:]port ...> Set local ports to listen on + --exit Select endpoints as the outbound end + + For outbound end: + + --target Set targets to connect to + --entrance Select endpoints as the inbound end + `, + notes: objectTypeNotes + objectNameNotes, + action: (args) => { + var name = args[''] + switch (validateObjectType(args, 'open')) { + case 'inbound': return openInbound(name, args['--listen'], args['--exit']) + case 'outbound': return openOutbound(name, args['--target'], args['--entrance']) + } + } + }, + + { + title: 'Delete the specified object', + usage: 'close ', + notes: objectTypeNotes + objectNameNotes, + action: (args) => { + var name = args[''] + switch (validateObjectType(args, 'close')) { + case 'inbound': return closeInbound(name) + case 'outbound': return closeOutbound(name) + } + } + }, + ] + + }).then(flush).catch(err => { + error(err) + return flush() + }) + + } catch (err) { + error(err) + return Promise.resolve(flush()) + } + + function getInbound() { + return api.allInbound(endpoint.id).then(list => ( + Promise.all(list.map(i => + lookupEndpointNames(i.exits || []).then(exits => ({ + ...i, + exits, + })) + )).then(list => + printTable(list, { + 'NAME': i => `${i.protocol}/${i.name}`, + 'LISTENS': i => i.listens.map(l => `${l.ip}:${l.port}`).join(', '), + 'EXITS': i => i.exits.join(', '), + }) + ) + )) + } + + function getOutbound() { + return api.allOutbound(endpoint.id).then(list => ( + Promise.all(list.map(o => + lookupEndpointNames(o.entrances || []).then(entrances => ({ + ...o, + entrances, + })) + )).then(list => + printTable(list, { + 'NAME': o => `${o.protocol}/${o.name}`, + 'TARGETS': o => o.targets.map(t => `${t.host}:${t.port}`).join(', '), + 'ENTRANCES': o => o.entrances.join(', '), + }) + ) + )) + } + + function describeInbound(tunnelName) { + var obj = validateObjectName(tunnelName) + return api.getInbound(endpoint.id, obj.protocol, obj.name).then(obj => { + if (!obj) return + return lookupEndpointNames(obj.exits || []).then(exits => { + output(`Inbound ${obj.protocol}/${obj.name}\n`) + output(`Endpoint: ${endpoint.name} (${endpoint.id})\n`) + output(`Listens:\n`) + obj.listens.forEach(l => output(` ${l.ip}:${l.port}\n`)) + output(`Exits:\n`) + exits.forEach(e => output(` ${e}\n`)) + if (exits.length === 0) output(` (all endpoints)\n`) + }) + }) + } + + function describeOutbound(tunnelName) { + var obj = validateObjectName(tunnelName) + return api.getOutbound(endpoint.id, obj.protocol, obj.name).then(obj => { + if (!obj) return + return lookupEndpointNames(obj.entrances || []).then(entrances => { + output(`Outbound ${obj.protocol}/${obj.name}\n`) + output(`Endpoint: ${endpoint.name} (${endpoint.id})\n`) + output(`Targets:\n`) + obj.targets.forEach(t => output(` ${t.host}:${t.port}\n`)) + output(`Entrances:\n`) + entrances.forEach(e => output(` ${e}\n`)) + if (entrances.length === 0) output(` (all endpoints)\n`) + }) + }) + } + + function openInbound(tunnelName, listens, exits) { + var obj = validateObjectName(tunnelName) + if (!listens || listens.length === 0) throw `Option '--listen' is required` + listens = listens.map(l => validateHostPort(l)).map(({ host, port }) => ({ ip: host, port })) + return lookupEndpointIDs(exits || []).then( + exits => api.setInbound(endpoint.id, obj.protocol, obj.name, listens, exits) + ) + } + + function openOutbound(tunnelName, targets, entrances) { + var obj = validateObjectName(tunnelName) + if (!targets || targets.length === 0) throw `Option '--target' is required` + targets = targets.map(t => validateHostPort(t)) + return lookupEndpointIDs(entrances || []).then( + entrances => api.setOutbound(endpoint.id, obj.protocol, obj.name, targets, entrances) + ) + } + + function closeInbound(tunnelName) { + var obj = validateObjectName(tunnelName) + return api.deleteInbound(endpoint.id, obj.protocol, obj.name) + } + + function closeOutbound(tunnelName) { + var obj = validateObjectName(tunnelName) + return api.deleteOutbound(endpoint.id, obj.protocol, obj.name) + } + + function validateObjectType(args, command) { + var ot = args[''] + switch (ot) { + case 'inbound': + case 'in': + return 'inbound' + case 'outbound': + case 'out': + return 'outbound' + default: throw `Invalid object type '${ot}'. Type 'ztm tunnel ${command} for help.'` + } + } + + function validateObjectName(name) { + if (!name) return + var segs = name.split('/') + if (segs.length === 2) { + var protocol = segs[0] + if (protocol === 'tcp' || protocol === 'udp') { + if (segs[1]) { + return { protocol, name: segs[1] } + } + } + } + throw `Invalid inbound/outbound name '${name}'` + } + + function validateHostPort(addr) { + var i = addr.lastIndexOf(':') + if (i >= 0) { + var host = addr.substring(0,i) + var port = addr.substring(i+1) + } else { + var host = '' + var port = addr + } + port = Number.parseInt(port) + if (Number.isNaN(port)) throw `Invalid port number in '${addr}'` + if (!host) host = '127.0.0.1' + return { host, port } + } + + function printTable(data, columns, indent) { + var head = ' '.repeat(indent || 0) + var cols = Object.entries(columns) + var colHeaders = cols.map(i => i[0]) + var colFormats = cols.map(i => i[1]) + var colSizes = colHeaders.map(name => name.length) + var rows = data.map(row => colFormats.map( + (format, i) => { + var v = format(row).toString() + colSizes[i] = Math.max(colSizes[i], v.length) + return v + } + )) + output(head) + colHeaders.forEach((name, i) => output(name.padEnd(colSizes[i]) + ' ')) + output('\n') + rows.forEach(row => { + output(head) + row.forEach((v, i) => output(v.padEnd(colSizes[i]) + ' ')) + output('\n') + }) + } + } +} + +var objectTypeNotes = ` + Object Types: + + inbound in Inbound end of a tunnel + outbound out Outbound end of a tunnel +` + +var objectNameNotes = ` + Object Names: + + tcp/ Name for a TCP tunnel + udp/ Name for a UDP tunnel +` diff --git a/relay-p2p/main.js b/relay-p2p/main.js new file mode 100644 index 0000000000000000000000000000000000000000..4089ea68abf89ebffaf74f1ed3ccfd2bc5081d21 --- /dev/null +++ b/relay-p2p/main.js @@ -0,0 +1,222 @@ +import initHole from './punch.js' +import initAPI from './api.js' +import initCLI from './cli.js' + +export default function ({ app, mesh, utils }) { + var punch = initHole({ app, mesh }) + var api = initAPI({ app, mesh, punch }) + var cli = initCLI({ app, mesh, utils, api }) + + var $ctx + + var gui = new http.Directory(os.path.join(app.root, 'gui')) + var response = utils.createResponse + var responder = utils.createResponder + + var serveUser = utils.createServer({ + '/cli': { + 'CONNECT': utils.createCLIResponder(cli), + }, + + '/api/appinfo': { + 'GET': responder(() => Promise.resolve(response(200, { + name: app.name, + provider: app.provider, + username: app.username, + endpoint: app.endpoint, + }))) + }, + + '/api/endpoints': { + 'GET': responder(() => api.allEndpoints().then( + ret => ret ? response(200, ret) : response(404) + )) + }, + + '/api/endpoints/{ep}/inbound': { + 'GET': responder(({ ep }) => api.allInbound(ep).then( + ret => ret ? response(200, ret) : response(404) + )) + }, + + '/api/endpoints/{ep}/outbound': { + 'GET': responder(({ ep }) => api.allOutbound(ep).then( + ret => ret ? response(200, ret) : response(404) + )) + }, + + '/api/endpoints/{ep}/inbound/{proto}/{name}': { + 'GET': responder(({ ep, proto, name }) => { + return api.getInbound(ep, proto, name).then( + ret => ret ? response(200, ret) : response(404) + ) + }), + + 'POST': responder(({ ep, proto, name }, req) => { + var obj = JSON.decode(req.body) + var listens = obj.listens + var exits = obj.exits || null + return api.setInbound(ep, proto, name, listens, exits).then(response(201)) + }), + + 'DELETE': responder(({ ep, proto, name }) => { + return api.deleteInbound(ep, proto, name).then(response(204)) + }), + + }, + + '/api/endpoints/{ep}/outbound/{proto}/{name}': { + 'GET': responder(({ ep, proto, name }) => { + return api.getOutbound(ep, proto, name).then( + ret => ret ? response(200, ret) : response(404) + ) + }), + + 'POST': responder(({ ep, proto, name }, req) => { + var obj = JSON.decode(req.body) + var targets = obj.targets + var entrances = obj.entrances || null + return api.setOutbound(ep, proto, name, targets, entrances).then(response(201)) + }), + + 'DELETE': responder(({ ep, proto, name }) => { + return api.deleteOutbound(ep, proto, name).then(response(204)) + }), + }, + + // '/api/punch/{destEp}': { + // 'GET': responder(({destEp}) => { + // return api.createHole(destEp) + // }), + + // 'DELETE': responder(({destEp}) => { + // api.deleteHole(destEp) + // return response(204) + // }) + // }, + + '*': { + 'GET': responder((_, req) => { + return Promise.resolve(gui.serve(req) || response(404)) + }) + }, + }) + + var servePeer = utils.createServer({ + '/api/inbound': { + 'GET': responder(() => api.allInbound(app.endpoint.id).then( + ret => ret ? response(200, ret) : response(404) + )) + }, + + '/api/outbound': { + 'GET': responder(() => api.allOutbound(app.endpoint.id).then( + ret => ret ? response(200, ret) : response(404) + )) + }, + + '/api/inbound/{proto}/{name}': { + 'GET': responder(({ proto, name }) => api.getInbound(app.endpoint.id, proto, name).then( + ret => ret ? response(200, ret) : response(404) + )), + + 'POST': responder(({ proto, name }, req) => { + var obj = JSON.decode(req.body) + var listens = obj.listens + var exits = obj.exits || null + return api.setInbound(app.endpoint.id, proto, name, listens, exits).then(response(201)) + }), + + 'DELETE': responder(({ proto, name }) => { + return api.deleteInbound(app.endpoint.id, proto, name).then(response(204)) + }), + }, + + '/api/outbound/{proto}/{name}': { + 'GET': responder(({ proto, name }) => api.getOutbound(app.endpoint.id, proto, name).then( + ret => ret ? response(200, ret) : response(404) + )), + + 'POST': responder(({ proto, name }, req) => { + var obj = JSON.decode(req.body) + var targets = obj.targets + var entrances = obj.entrances || null + return api.setOutbound(app.endpoint.id, proto, name, targets, entrances).then(response(201)) + }), + + 'DELETE': responder(({ proto, name }) => { + return api.deleteOutbound(app.endpoint.id, proto, name).then(response(204)) + }), + + 'CONNECT': api.servePeerInbound, + }, + + '/api/ping': { + 'GET': responder(() => Promise.resolve(response(200))) + }, + + '/api/punch/{action}': { + 'GET': responder(({action}) => { + var ep = $ctx.peer.id + var ip = $ctx.peer.ip + var port = $ctx.peer.port + + console.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`) + switch(action) { + case 'leave': + api.deleteHole(ep, true) + break + default: + return Promise.resolve(response(500, "Unknown punch action")) + } + return Promise.resolve(response(200)) + }), + + 'POST': responder(({action}, req) => { + var obj = JSON.decode(req.body) + var ep = $ctx.peer.id + var ip = $ctx.peer.ip + var port = $ctx.peer.port + + console.log(`Punch Event: ${action} from ${ep} ${ip} ${port}`) + console.log("Punch req: ", obj) + switch(action) { + case 'request': + api.createHole(ep, 'server') + api.updateHoleInfo(ep, ip, port, obj.cert) + api.syncPunch(ep) + // var certs = hole.signPeerCert(new crypto.PublicKey(obj.pkey)) + // return Promise.resolve(response(200, cert)) + break + case 'accept': + api.updateHoleInfo(ep, ip, port, obj.cert) + api.syncPunch(ep) + break + default: + return Promise.resolve(response(500, "Unknown punch action")) + } + return Promise.resolve(response(200)) + }), + + 'CONNECT': pipeline($=>$.pipe(api.makeRespTunnel, () => $ctx)) + }, + }) + + punch.setService((ctx) => { + // Tricky callback to set ctx, + // expecting everything in hole works + // just like it's coming from hub. + $ctx = ctx + return servePeer + }) + + return pipeline($=>$ + .onStart(c => void ($ctx = c)) + .pipe(() => { + switch ($ctx.source) { + case 'user': return serveUser + case 'peer': return servePeer + } + }) + ) +} diff --git a/relay-p2p/punch.js b/relay-p2p/punch.js new file mode 100644 index 0000000000000000000000000000000000000000..60c6867e4d50b038a7118c03b6d513a29097e0f7 --- /dev/null +++ b/relay-p2p/punch.js @@ -0,0 +1,521 @@ +export default function ({ app, mesh }) { + // Only available for symmetric NAT + function Hole(ep) { + // (idle) (handshake) (punching connected closed) (left fail) + var state = 'idle' + var bound = '0.0.0.0:' + randomPort() + var destIP = null + var destPort = null + var role = null + var session = null + var rtt = null + + var tlsOptions = { + certificate: null, + trusted: null + } + + var pHub = new pipeline.Hub + var $connection = null + var $response + + + // Check if ep is self. + console.info(`Creating hole to peer ${ep}, bound ${bound}`) + if (ep === app.endpoint.id) { + throw 'Must not create a hole to self' + } + + function directSession() { + if (!role || !destIP || !destPort) throw 'Hole not init correctly' + if (session) return session + + var retryTimes = 0 + + var buildCtx = () => { + return { + source: 'direct', + self: { + id: app.endpoint.id, + }, + peer: { + id: ep, + ip: destIP, + port: destPort, + } + } + } + + if (role === 'client') { + var reverseTunnel = null + var reverseTunnelStarted = false + + // make session to server side directly + session = pipeline($ => $ + .muxHTTP(() => ep + "direct", { version: 2 }).to($ => $ + // .connectTLS({ + // ...tlsOptions, + // onState: tls => { + // console.info('TLS State: ', tls) + // if($connection.state === 'connected' && tls.state === 'connected') { + // app.log(`Connected TLS to peer ${destIP}:${destPort}`) + // state = 'connected' + // retryTimes = 0 + + // if (!reverseTunnelStarted) { + // reverseTunnel.spawn() + // reverseTunnelStarted = true + // } + // } + // } + // }).to($ => $ + .connect(() => `${destIP}:${destPort}`, { + bind: bound, + onState: function (conn) { + console.info("Conn Info: ", conn) + + if (conn.state === 'open') { + conn.socket.setRawOption(1, 15, new Data([1, 0, 0, 0])) + } else if (conn.state === 'connected') { + app.log(`Connected to peer ${destIP}:${destPort}`) + $connection = conn + state = 'connected' + retryTimes = 0 + + if (!reverseTunnelStarted) { + reverseTunnel.spawn() + reverseTunnelStarted = true + } + } else if (conn.state === 'closed') { + app.log(`Disconnected from peer ${destIP}:${destPort}`) + $connection = null + state = 'closed' + retryTimes += 1 + } + + // Max Retry set to 10 + if (retryTimes > 10 || state === 'fail') { + console.info(`Retry limit exceeded, punch failed.`) + state = 'fail' + updateHoles() + } + }, + }) + .handleStreamEnd(evt => console.info('Hole connection end, retry: ', retryTimes + 1, ' reason: ', evt?.error)) + ) + // ) + ) + + // reverse server for receiving requests + reverseTunnel = pipeline($ => $ + .onStart(new Data) + .repeat(() => new Timeout(1).wait().then(() => { + return state != 'fail' && state != 'left' + })).to($ => $ + .loop($ => $ + .connectHTTPTunnel( + new Message({ + method: 'CONNECT', + path: `/api/punch/tunnel`, + }) + ) + .to(session) + .pipe(() => svc(buildCtx())) + ) + ) + ) + + // Forced Heartbeats + // Do a PCR to the hole. + pacemaker() + + } else if (role === 'server') { + var $msg = null + var listen = pipeline($ => $ + // .acceptTLS({ + // ...tlsOptions, + // onState: tls => console.info('TLS State: ', tls) + // }).to($ => $ + .handleMessage(msg => { + console.info('Server Received: ', msg) + $msg = msg + return new Data + }).pipe(() => svc(buildCtx())), () => $msg + // ) + ) + + console.info("Direct Server Listening...") + pipy.listen(bound, 'tcp', listen) + + session = pipeline($ => $ + .muxHTTP(() => ep + "direct", { version: 2 }).to($ => $ + .swap(() => pHub) + ) + ) + } + return session + } + + function request(req, callback) { + var store = req + return pipeline($ => $ + .onStart(req) + .muxHTTP().to($ => $.pipe( + mesh.connect(ep, { + bind: bound, + onState: conn => { + if (conn.state === 'open') + conn.socket.setRawOption(1, 15, new Data([1, 0, 0, 0])) + } + }) + )) + .print() + .replaceMessage(res => { + $response = res + return new StreamEnd + }) + .onEnd(() => { + console.info('Answers in hole: ', $response, store) + if (callback) + callback($response) + return $response + }) + ).spawn() + } + + // use THE port sending request to hub. + function requestPunch() { + role = 'client' + state = 'handshake' + var start = Date.now() + + console.info("Requesting punch") + request(new Message({ + method: 'POST', + path: '/api/punch/request', + }, JSON.encode({ + timestamp: Date.now(), + cert: genCert() + })), (resp) => { + var end = Date.now() + rtt = (end - start) / 2000 + console.info('Estimated RTT: ', rtt) + + if (resp.head.status != 200) { + app.log(`Failed on requesting`) + state = 'fail' + updateHoles() + } + }) + new Timeout(60).wait().then(connectOrFail) + } + + function acceptPunch() { + role = 'server' + state = 'handshake' + var start = Date.now() + + console.info("Accepting punch") + request(new Message({ + method: 'POST', + path: '/api/punch/accept', + }, JSON.encode({ + timestamp: Date.now(), + cert: genCert() + })), (resp) => { + var end = Date.now() + rtt = (end - start) / 2000 + console.info('Estimated RTT: ', rtt) + + if (!resp || resp.head.status != 200) { + app.log(`Failed on accepting`) + state = 'fail' + updateHoles() + } + }) + + new Timeout(60).wait().then(connectOrFail) + } + + // Locally generate certificate. + // The handshake process and hub + // will ensure peer is trustworthy + function genCert() { + var key = new crypto.PrivateKey({ type: 'rsa', bits: 2048 }) + var pKey = new crypto.PublicKey(key) + var cert = new crypto.Certificate({ + subject: { CN: role }, + publicKey: pKey, + privateKey: key, + days: 365, + }) + + tlsOptions = { + certificate: { + cert: cert, + key: key, + } + } + + return cert.toPEM().toString() + } + + function addPeerCert(cert) { + var peerCert = new crypto.Certificate(cert) + console.info("TLS: ", tlsOptions) + tlsOptions['trusted'] = [peerCert] + } + + function updateNatInfo(ip, port) { + console.info(`Peer NAT Info: ${ip}:${port}`) + destIP = ip + destPort = port + } + + // Punch when: + // 1. Server accept message got 200 OK + // 2. Client receive accept + function punch() { + state = 'punching' + + console.info(`Punching to ${destIP}:${destPort} (${ep})`) + if (role === 'server') { + makeFakeCall(destIP, destPort) + } + directSession() + } + + function makeRespTunnel() { + console.info("Created Resp Tunnel") + state = 'connected' + + return pipeline($ => $ + .acceptHTTPTunnel(() => new Message({ status: 200 })).to($ => $ + .onStart(new Data) + .swap(() => pHub) + .onEnd(() => console.info(`Direct Connection from ${ep} lost`)) + ) + ) + } + + function connectOrFail() { + if (state === 'left') { + // Be quiet when left. + // The hole has been released. + return + } else if (state != 'connected') { + console.info(`Current state ${state}, made the hole failed`) + state = 'fail' + updateHoles() + } + } + + // send a SYN to dest, expect no return. + // this will cheat the firewall to allow inbound connection from peer. + function makeFakeCall(destIP, destPort) { + console.info("Making fake call") + pipeline($ => $ + .onStart(new Data).connect(`${destIP}:${destPort}`, { + bind: bound, + onState: function (conn) { + // Socket Option: REUSEPORT + if (conn.state === 'open') conn.socket.setRawOption(1, 15, new Data([1, 0, 0, 0])) + + // abort this connection. + if (conn.state === 'connecting') { + console.info('Performing early close') + conn.close() + } + } + }) + ).spawn() + } + + // Send something to server from time to time + // So the firewall and NAT rule should be held. + // + // Params: + // - pacemaker: whether called from pacemaker function + // + function heartbeat(pacemaker) { + if (state === 'fail' || state === 'left') return + if (role === 'server') return + + var heart = pipeline($ => $ + .onStart(new Message({ + method: 'GET', + path: '/api/ping' + })) + .pipe(session) + .replaceMessage(res => { + if (res.head.status != 200 && !pacemaker) + app.log("Cardiac Arrest happens, hole: ", ep) + if (pacemaker) return res + return new StreamEnd + }) + ) + + if (pacemaker) + return heart + + // if not called from pacemaker + // the heart should beat automatically :) + heart.spawn() + new Timeout(10).wait().then(() => heartbeat(false)) + } + + // Used on direct connection setup. + // To urge the connect filter try to call the peer + function pacemaker() { + rtt ??= 0.02 + + var timeout = [rtt, rtt, rtt, rtt, rtt, 2 * rtt, 3 * rtt, 5 * rtt, 8 * rtt, 13 * rtt] + var round = 0 + var cont = true + + console.info('Pacemaking......') + pipeline($ => $ + .onStart(new Data) + .repeat(() => { + if(round < 10) + return new Timeout(timeout[round]).wait().then(() => cont) + return false + }) + .to($ => $ + .pipe(() => heartbeat(true)) + .replaceMessage(resp => { + round += 1 + if (resp.head.status == 200) { + cont = false + heartbeat(false) + } + console.info('Pacemaker: ', resp) + return new StreamEnd + }) + ) + ).spawn() + } + + function leave(remote) { + if (role === 'server') { + pipy.listen(bound, 'tcp', null) + } + + if ($connection) { + $connection?.close() + } + $connection = null + if (state != 'fail') state = 'left' + if (!remote) { + request(new Message({ + method: 'GET', + path: '/api/punch/leave' + })) + } else app.log("Hole closed by peer ", ep) + } + + return { + role: () => role, + state: () => state, + ready: () => state === 'connected', + requestPunch, + acceptPunch, + updateNatInfo, + addPeerCert, + punch, + makeRespTunnel, + directSession, + leave, + } + } // End of Hole + + var holes = new Map + var fails = {} + var svc = null + + function updateHoles() { + holes.forEach((key, hole) => { + fails[key] ??= 0 + if (hole.state() === 'fail' || hole.state() === 'left') { + hole.leave() + holes.delete(key) + fails[key] += 1 + } + }) + console.info(`Holes after updating: `, holes) + } + + function createInboundHole(ep) { + updateHoles() + if (findHole(ep)) return + if (fails[ep] && fails[ep] >= 3) { + console.info(`Won't create hole to ${ep}, too many fails!`) + return + } + console.info(`Creating Inbound Hole to ${ep}`) + try { + var hole = Hole(ep) + hole.requestPunch() + holes.set(ep, hole) + } catch (err) { + updateHoles() + app.log('Failed to create Inbound Hole, Error: ', err) + } + + return hole + } + + function createOutboundHole(ep, natIp, natPort) { + updateHoles() + if (findHole(ep)) return + console.info(`Creating Outbound Hole to ${ep}`) + try { + var hole = Hole(ep) + hole.acceptPunch() + holes.set(ep, hole) + } catch (err) { + updateHoles() + app.log('Failed to create Outbound Hole, Error: ', err) + } + + return hole + } + + function updateHoleInfo(ep, natIp, natPort, cert) { + var hole = findHole(ep) + if (!hole) throw `No hole to update, ep ${ep}` + + hole.updateNatInfo(natIp, natPort) + hole.addPeerCert(cert) + } + + function deleteHole(ep, remote) { + var sel = findHole(ep) + if (!sel) return + sel.leave(remote) + updateHoles() + } + + function findHole(ep) { + return holes.get(ep) + } + + function setService(srvPeer) { + svc = srvPeer + } + + function randomPort() { + return Number.parseInt(Math.random() * (65535 - 1024)) + 1024 + } + + return { + getHoles: () => holes, + createInboundHole, + createOutboundHole, + updateHoleInfo, + deleteHole, + findHole, + setService, + randomPort, + } +}