Skip to content

Commit

Permalink
Support for Inferentia2 (draft) (#118)
Browse files Browse the repository at this point in the history
* v1: get it working

* update neuron

* update batch_size

* make format

* update deps

* poetry lock

* fmt

* update

* lock

* update neuron

* fmt

* lint
  • Loading branch information
michaelfeil authored Mar 16, 2024
1 parent 8833f43 commit 7cfa12d
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 56 deletions.
4 changes: 2 additions & 2 deletions libs/infinity_emb/infinity_emb/inference/batch_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def __init__(

async def embed(
self, sentences: List[str]
) -> tuple[List[EmbeddingReturnType], int]:
) -> Tuple[List[EmbeddingReturnType], int]:
"""Schedule a sentence to be embedded. Awaits until embedded.
Args:
Expand All @@ -113,7 +113,7 @@ async def embed(

async def rerank(
self, query: str, docs: List[str], raw_scores: bool = False
) -> tuple[List[float], int]:
) -> Tuple[List[float], int]:
"""Schedule a query to be reranked with documents. Awaits until reranked.
Args:
Expand Down
153 changes: 153 additions & 0 deletions libs/infinity_emb/infinity_emb/transformer/embedder/neuron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import copy
import json
import subprocess
from typing import Dict, List, Union

import numpy as np

from infinity_emb.args import EngineArgs
from infinity_emb.primitives import EmbeddingReturnType, PoolingMethod
from infinity_emb.transformer.abstract import BaseEmbedder
from infinity_emb.transformer.utils_optimum import (
cls_token_pooling,
mean_pooling,
normalize,
)

try:
import torch
from optimum.neuron import NeuronModelForFeatureExtraction # type: ignore
from transformers import AutoConfig, AutoTokenizer # type: ignore

OPTIMUM_AVAILABLE = True
except (ImportError, RuntimeError):
OPTIMUM_AVAILABLE = False

__all__ = [
"NeuronOptimumEmbedder",
]


def get_nc_count() -> Union[int, None]:
"""Returns the number of neuron cores on the current instance."""
try:
cmd = "neuron-ls --json-output"
result = subprocess.run(cmd, shell=True, capture_output=True)
print("inferring nc_count from `neuron-ls`")
print(result.stdout.decode("utf-8"))
json_output = json.loads(result.stdout)
count = sum([x["nc_count"] for x in json_output])
print(f"nc_count={count}")
return count
except Exception:
return None


def pad_up_to_size(desired_max_bs, input_ids):
"""input_ids a 2D array with batch_size on dim=0
makes sure the func runs with self.batch_size
"""
# access a from TestSample
batch_size = input_ids.shape[0]

if batch_size < desired_max_bs:
# handle the event of input_ids.shape[0] != batch_size
# Neuron cores expect constant batch_size
input_ids = torch.concat(
(
input_ids,
# add missing_batch_size dummy
torch.zeros(
[desired_max_bs - batch_size, *input_ids.size()[1:]],
dtype=input_ids.dtype,
device=input_ids.device,
),
),
dim=0,
)
elif batch_size > desired_max_bs:
raise ValueError(
f"The specified batch_size ({batch_size}) exceeds the model static batch size ({desired_max_bs})"
)
# return the forward pass that requires constant batch size
return input_ids


class NeuronOptimumEmbedder(BaseEmbedder):
def __init__(self, *, engine_args: EngineArgs):
if not OPTIMUM_AVAILABLE:
raise ImportError(
"optimum.neuron is not installed." "run this somewhere with neuron"
)

self.pooling = (
mean_pooling
if engine_args.pooling_method == PoolingMethod.mean
else cls_token_pooling
)

self.tokenizer = AutoTokenizer.from_pretrained(engine_args.model_name_or_path)
self.config = AutoConfig.from_pretrained(engine_args.model_name_or_path)
self._infinity_tokenizer = copy.deepcopy(self.tokenizer)

compiler_args = {"num_cores": get_nc_count(), "auto_cast_type": "fp16"}
input_shapes = {
"batch_size": 4,
"sequence_length": (
self.config.max_position_embeddings
if hasattr(self.config, "max_position_embeddings")
else 512
),
}
self.model = NeuronModelForFeatureExtraction.from_pretrained(
model_id=engine_args.model_name_or_path,
export=True,
**compiler_args,
**input_shapes,
)
self.batch_size = self.model.neuron_config.input_shapes["batch_size"]

def encode_pre(self, sentences: List[str]) -> Dict[str, np.ndarray]:
input_dict = self.tokenizer(
sentences,
max_length=self.config.max_position_embeddings,
padding=True,
truncation="longest_first",
return_tensors="pt",
)
input_dict.pop("token_type_ids", None)
return input_dict

def encode_core(self, input_dict: Dict[str, np.ndarray]) -> dict:
"""requires constant batch size, which is a bit of extra work"""
for key, tensor in input_dict.items():
actual_bsize = tensor.shape[0]
input_dict[key] = pad_up_to_size(self.batch_size, tensor)
with torch.inference_mode():
outputs = self.model(**input_dict)
return {
"token_embeddings": outputs["last_hidden_state"][:actual_bsize],
"attention_mask": input_dict["attention_mask"][:actual_bsize],
}

def encode_post(self, embedding: dict) -> EmbeddingReturnType:
embedding = self.pooling( # type: ignore
embedding["token_embeddings"].numpy(), embedding["attention_mask"].numpy()
)

return normalize(embedding).astype(np.float32)

def tokenize_lengths(self, sentences: List[str]) -> List[int]:
if hasattr(self._infinity_tokenizer, "encode_batch"):
tks = self._infinity_tokenizer.encode_batch(
sentences,
padding=False,
truncation="longest_first",
)
else:
tks = self._infinity_tokenizer(
sentences, padding=False, truncation="longest_first"
)

return [len(t) for t in tks["input_ids"]]
17 changes: 2 additions & 15 deletions libs/infinity_emb/infinity_emb/transformer/embedder/optimum.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from infinity_emb.primitives import EmbeddingReturnType, PoolingMethod
from infinity_emb.transformer.abstract import BaseEmbedder
from infinity_emb.transformer.utils_optimum import (
cls_token_pooling,
device_to_onnx,
get_onnx_files,
mean_pooling,
normalize,
optimize_model,
)
Expand All @@ -23,21 +25,6 @@
OPTIMUM_AVAILABLE = False


def mean_pooling(last_hidden_states: np.ndarray, attention_mask: np.ndarray):
input_mask_expanded = (np.expand_dims(attention_mask, axis=-1)).astype(float)

sum_embeddings = np.sum(
last_hidden_states.astype(float) * input_mask_expanded, axis=1
)
mask_sum = np.maximum(np.sum(input_mask_expanded, axis=1), 1e-9)

return sum_embeddings / mask_sum


def cls_token_pooling(model_output, *args):
return model_output[:, 0]


class OptimumEmbedder(BaseEmbedder):
def __init__(self, *, engine_args: EngineArgs):
if not OPTIMUM_AVAILABLE:
Expand Down
4 changes: 4 additions & 0 deletions libs/infinity_emb/infinity_emb/transformer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from infinity_emb.transformer.embedder.ct2 import CT2SentenceTransformer
from infinity_emb.transformer.embedder.dummytransformer import DummyTransformer
from infinity_emb.transformer.embedder.neuron import NeuronOptimumEmbedder
from infinity_emb.transformer.embedder.optimum import OptimumEmbedder
from infinity_emb.transformer.embedder.sentence_transformer import (
SentenceTransformerPatched,
Expand All @@ -28,6 +29,7 @@ class EmbedderEngine(Enum):
ctranslate2 = CT2SentenceTransformer
debugengine = DummyTransformer
optimum = OptimumEmbedder
neuron = NeuronOptimumEmbedder

@staticmethod
def from_inference_engine(engine: InferenceEngine):
Expand All @@ -39,6 +41,8 @@ def from_inference_engine(engine: InferenceEngine):
return EmbedderEngine.debugengine
elif engine == InferenceEngine.optimum:
return EmbedderEngine.optimum
elif engine == InferenceEngine.neuron:
return EmbedderEngine.neuron
else:
raise NotImplementedError(f"EmbedderEngine for {engine} not implemented")

Expand Down
15 changes: 15 additions & 0 deletions libs/infinity_emb/infinity_emb/transformer/utils_optimum.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@
torch = None # type: ignore


def mean_pooling(last_hidden_states: np.ndarray, attention_mask: np.ndarray):
input_mask_expanded = (np.expand_dims(attention_mask, axis=-1)).astype(float)

sum_embeddings = np.sum(
last_hidden_states.astype(float) * input_mask_expanded, axis=1
)
mask_sum = np.maximum(np.sum(input_mask_expanded, axis=1), 1e-9)

return sum_embeddings / mask_sum


def cls_token_pooling(model_output, *args):
return model_output[:, 0]


def normalize(input_array, p=2, dim=1, eps=1e-12):
# Calculate the Lp norm along the specified dimension
norm = np.linalg.norm(input_array, ord=p, axis=dim, keepdims=True)
Expand Down
83 changes: 44 additions & 39 deletions libs/infinity_emb/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7cfa12d

Please sign in to comment.