Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/connection-encrypter-plaintext/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
"@libp2p/interface": "^2.10.5",
"@libp2p/peer-id": "^5.1.8",
"it-protobuf-stream": "^2.0.2",
"it-stream-types": "^2.0.2",
"protons-runtime": "^5.5.0",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.1.0"
Expand Down
17 changes: 8 additions & 9 deletions packages/connection-encrypter-plaintext/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import { pbStream } from 'it-protobuf-stream'
import { equals as uint8ArrayEquals } from 'uint8arrays/equals'
import { Exchange, KeyType } from './pb/proto.js'
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PrivateKey, SecureConnectionOptions } from '@libp2p/interface'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { ComponentLogger, Logger, MultiaddrConnection, ConnectionEncrypter, SecuredConnection, PrivateKey, SecureConnectionOptions, SecurableStream } from '@libp2p/interface'

const PROTOCOL = '/plaintext/2.0.0'

Expand All @@ -54,21 +52,22 @@
'@libp2p/connection-encryption'
]

async secureInbound<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
async secureInbound<Stream extends SecurableStream = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, options)
}

async secureOutbound<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
async secureOutbound<Stream extends SecurableStream = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, options)
}

/**
* Encrypt connection
*/
async _encrypt<Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
async _encrypt<Stream extends SecurableStream = MultiaddrConnection>(conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
const log = conn.log?.newScope('plaintext') ?? this.log
const pb = pbStream(conn).pb(Exchange)

this.log('write pubkey exchange to peer %p', options?.remotePeer)
log('write pubkey exchange to peer %p', options?.remotePeer)

const publicKey = this.privateKey.publicKey

Expand Down Expand Up @@ -108,15 +107,15 @@
throw new InvalidCryptoExchangeError('Public key did not match id')
}
} catch (err: any) {
this.log.error(err)
log.error(err)

Check warning on line 110 in packages/connection-encrypter-plaintext/src/index.ts

View check run for this annotation

Codecov / codecov/patch

packages/connection-encrypter-plaintext/src/index.ts#L110

Added line #L110 was not covered by tests
throw new InvalidCryptoExchangeError('Invalid public key - ' + err.message)
}

if (options?.remotePeer != null && !peerId.equals(options?.remotePeer)) {
throw new UnexpectedPeerError()
}

this.log('plaintext key exchange completed successfully with peer %p', peerId)
log('plaintext key exchange completed successfully with peer %p', peerId)

return {
conn: pb.unwrap().unwrap(),
Expand Down
17 changes: 8 additions & 9 deletions packages/connection-encrypter-tls/src/tls.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import { generateCertificate, verifyPeerCertificate, itToStream, streamToIt } from './utils.js'
import { PROTOCOL } from './index.js'
import type { TLSComponents } from './index.js'
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, Logger, SecureConnectionOptions, CounterGroup, StreamMuxerFactory } from '@libp2p/interface'
import type { Duplex } from 'it-stream-types'
import type { MultiaddrConnection, ConnectionEncrypter, SecuredConnection, Logger, SecureConnectionOptions, CounterGroup, StreamMuxerFactory, SecurableStream } from '@libp2p/interface'
import type { TLSSocketOptions } from 'node:tls'
import type { Uint8ArrayList } from 'uint8arraylist'

export class TLS implements ConnectionEncrypter {
public protocol: string = PROTOCOL
Expand Down Expand Up @@ -77,18 +75,19 @@
'@libp2p/connection-encryption'
]

async secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
async secureInbound <Stream extends SecurableStream = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, true, options)
}

async secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
async secureOutbound <Stream extends SecurableStream = MultiaddrConnection> (conn: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
return this._encrypt(conn, false, options)
}

/**
* Encrypt connection
*/
async _encrypt <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (conn: Stream, isServer: boolean, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
async _encrypt <Stream extends SecurableStream = MultiaddrConnection> (conn: Stream, isServer: boolean, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream>> {
const log = conn.log?.newScope('tls') ?? this.log
let streamMuxer: StreamMuxerFactory | undefined

let streamMuxers: string[] = []
Expand All @@ -112,7 +111,7 @@
'libp2p'
],
ALPNCallback: ({ protocols }) => {
this.log.trace('received protocols %s', protocols)
log.trace('received protocols %s', protocols)
let chosenProtocol: string | undefined

for (const protocol of protocols) {
Expand Down Expand Up @@ -165,7 +164,7 @@

verifyPeerCertificate(remote.raw, options?.remotePeer, this.log)
.then(remotePeer => {
this.log('remote certificate ok, remote peer %p', remotePeer)
log('remote certificate ok, remote peer %p', remotePeer)

// 'libp2p' is a special protocol - if it's sent the remote does not
// support early muxer negotiation
Expand All @@ -175,7 +174,7 @@

if (streamMuxer == null) {
const err = new InvalidCryptoExchangeError(`Selected muxer ${socket.alpnProtocol} did not exist`)
this.log.error(`Selected muxer ${socket.alpnProtocol} did not exist - %e`, err)
log.error(`Selected muxer ${socket.alpnProtocol} did not exist - %e`, err)

Check warning on line 177 in packages/connection-encrypter-tls/src/tls.ts

View check run for this annotation

Codecov / codecov/patch

packages/connection-encrypter-tls/src/tls.ts#L177

Added line #L177 was not covered by tests

if (isAbortable(conn)) {
conn.abort(err)
Expand Down
24 changes: 16 additions & 8 deletions packages/connection-encrypter-tls/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@ describe('tls', () => {

await Promise.all([
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
...inbound
...inbound,
log: defaultLogger().forComponent('inbound')
}), {
remotePeer
}),
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
...outbound
...outbound,
log: defaultLogger().forComponent('outbound')
}), {
remotePeer: wrongPeer
})
Expand Down Expand Up @@ -81,12 +83,14 @@ describe('tls', () => {

await expect(Promise.all([
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
...inbound
...inbound,
log: defaultLogger().forComponent('inbound')
}), {
remotePeer
}),
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
...outbound
...outbound,
log: defaultLogger().forComponent('outbound')
}), {
remotePeer: localPeer
})
Expand All @@ -99,12 +103,14 @@ describe('tls', () => {

const result = await Promise.all([
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
...inbound
...inbound,
log: defaultLogger().forComponent('inbound')
}), {
remotePeer: localPeer
}),
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
...outbound
...outbound,
log: defaultLogger().forComponent('outbound')
}), {
remotePeer: localPeer
})
Expand All @@ -119,13 +125,15 @@ describe('tls', () => {

const result = await Promise.all([
encrypter.secureInbound(stubInterface<MultiaddrConnection>({
...inbound
...inbound,
log: defaultLogger().forComponent('inbound')
}), {
remotePeer: localPeer,
skipStreamMuxerNegotiation: true
}),
encrypter.secureOutbound(stubInterface<MultiaddrConnection>({
...outbound
...outbound,
log: defaultLogger().forComponent('outbound')
}), {
remotePeer: localPeer,
skipStreamMuxerNegotiation: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
const log = logger.forComponent('libp2p:mock-muxer')

const muxer = muxerFactory.createStreamMuxer({
log,
direction,
onIncomingStream: (muxedStream) => {
try {
Expand Down
4 changes: 2 additions & 2 deletions packages/interface-compliance-tests/src/mocks/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { logger } from '@libp2p/logger'
import { defaultLogger, logger } from '@libp2p/logger'
import { AbstractStream } from '@libp2p/utils/abstract-stream'
import { abortableSource } from 'abortable-iterator'
import map from 'it-map'
Expand Down Expand Up @@ -131,7 +131,7 @@ class MockMuxer implements StreamMuxer {
this.registryInitiatorStreams = new Map()
this.registryRecipientStreams = new Map()
this.log('create muxer')
this.options = init ?? { direction: 'inbound' }
this.options = init ?? { direction: 'inbound', log: defaultLogger().forComponent('mock-muxer') }
this.closeController = new AbortController()
// receives data from the muxer at the other end of the stream
this.source = this.input = pushable({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
const onStreamEndPromise: DeferredPromise<Stream> = defer()

const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
Expand Down Expand Up @@ -88,7 +90,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
})

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({ direction: 'inbound' })
const listener = listenerFactory.createStreamMuxer({
direction: 'inbound'
})

void pipe(p[0], dialer, p[0])
void pipe(p[1], listener, p[1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
let openedStreams = 0
const expectedStreams = 5
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

// Listener is echo server :)
const listenerFactory = await common.setup()
Expand Down Expand Up @@ -114,7 +116,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
let openedStreams = 0
const expectedStreams = 5
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

// Listener is echo server :)
const listenerFactory = await common.setup()
Expand Down Expand Up @@ -157,7 +161,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
let openedStreams = 0
const expectedStreams = 5
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

// Listener is echo server :)
const listenerFactory = await common.setup()
Expand Down Expand Up @@ -212,7 +218,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {

it('calling newStream after close throws an error', async () => {
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

await dialer.close()

Expand All @@ -227,7 +235,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
it('closing one of the muxed streams doesn\'t close others', async () => {
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

// Listener is echo server :)
const listenerFactory = await common.setup()
Expand Down Expand Up @@ -280,7 +290,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {

const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})
const data = [randomBuffer(), randomBuffer()]

const listenerFactory = await common.setup()
Expand Down Expand Up @@ -325,7 +337,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {
const deferred = pDefer<Uint8ArrayList[]>()
const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})
const data = [randomBuffer(), randomBuffer()].map(d => new Uint8ArrayList(d))
const expected = toBuffer(data.map(d => d.subarray()))

Expand Down Expand Up @@ -391,7 +405,9 @@ export default (common: TestSetup<StreamMuxerFactory>): void => {

const p = duplexPair<Uint8Array | Uint8ArrayList>()
const dialerFactory = await common.setup()
const dialer = dialerFactory.createStreamMuxer({ direction: 'outbound' })
const dialer = dialerFactory.createStreamMuxer({
direction: 'outbound'
})

const listenerFactory = await common.setup()
const listener = listenerFactory.createStreamMuxer({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ export default async (createMuxer: (init?: StreamMuxerInit) => Promise<StreamMux
.catch(err => { stream.abort(err) })
}
})
const dialer = await createMuxer({ direction: 'outbound' })
const dialer = await createMuxer({
direction: 'outbound'
})

void pipe(listenerSocket, listener, listenerSocket)
void pipe(dialerSocket, dialer, dialerSocket)
Expand Down
13 changes: 10 additions & 3 deletions packages/interface/src/connection-encrypter.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { MultiaddrConnection } from './connection.js'
import type { AbortOptions, StreamMuxerFactory } from './index.js'
import type { AbortOptions, Logger, StreamMuxerFactory } from './index.js'
import type { PeerId } from './peer-id.js'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'
Expand All @@ -21,6 +21,13 @@ export interface SecureConnectionOptions extends AbortOptions {
skipStreamMuxerNegotiation?: boolean
}

/**
* A stream with an optional logger
*/
export interface SecurableStream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> {
log?: Logger
}

/**
* A libp2p connection encrypter module must be compliant to this interface
* to ensure all exchanged data between two peers is encrypted.
Expand All @@ -33,14 +40,14 @@ export interface ConnectionEncrypter<Extension = unknown> {
* pass it for extra verification, otherwise it will be determined during
* the handshake.
*/
secureOutbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
secureOutbound <Stream extends SecurableStream = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>

/**
* Decrypt incoming data. If the remote PeerId is known,
* pass it for extra verification, otherwise it will be determined during
* the handshake
*/
secureInbound <Stream extends Duplex<AsyncGenerator<Uint8Array | Uint8ArrayList>> = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
secureInbound <Stream extends SecurableStream = MultiaddrConnection> (connection: Stream, options?: SecureConnectionOptions): Promise<SecuredConnection<Stream, Extension>>
}

export interface SecuredConnection<Stream = any, Extension = unknown> {
Expand Down
7 changes: 6 additions & 1 deletion packages/interface/src/stream-muxer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Direction, Stream } from './connection.js'
import type { AbortOptions } from './index.js'
import type { AbortOptions, Logger } from './index.js'
import type { Duplex } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand Down Expand Up @@ -60,4 +60,9 @@ export interface StreamMuxerInit {
* Outbound stream muxers are opened by the local node, inbound stream muxers are opened by the remote
*/
direction?: Direction

/**
* The logger used by the connection
*/
log?: Logger
}
Loading