I need to calculate profit/loss for given stock data set, ensuring that the first bought items are sold first



如果我们专注于股票符号'REL2300PE',它以450的数量出售,导致总销售价值为$20,250。现在,让我们计算这支股票的前450单位的成本价格。成本价格可以通过将与这支股票相关的每次买入交易的数量和每单位价格的乘积相加来计算。在这种情况下,前450单位的成本价格计算如下:200单位 * $50 + 100单位 * $50 + 150单位 * $35 = $20,250。由于销售价值和前450单位的成本价格相同($20,250),因此这支股票的盈亏应为0。


Consider the following sample dataset.

Date symbol qty price per qty type
07 July 2022 REL2300PE 200 50 buy
07 July 2022 IDBI2300PE 200 50 sell
15 July 2022 REL2300PE 100 50 buy
15 July 2022 IDBI2300PE 20 50 buy
16 July 2022 REL2300PE 200 35 buy
30 July 2022 IDBI2300PE 60 50 sell
30 July 2022 REL2300PE 450 45 sell
30 July 2022 IDBI2300PE 200 25 sell

If we focus on the stock symbol 'REL2300PE', it is being sold in quantities of 450, resulting in a total sale value of $20,250. Now, let's calculate the cost price for the first 450 units of this stock. The cost price can be calculated by summing up the product of the quantity and price per quantity for each buying trade associated with this stock. In this case, the cost price for the first 450 units is calculated as follows: 200 units * $50 + 100 units * $50 + 150 units * $35 = $20,250. Since the sale value and the cost price for the first 450 units are the same ($20,250), the profit/loss for this stock should be 0.


You could follow this algorithm:

  • Calculate the cumulative sum for the sells and buys for the current row and all prev rows, and consider the sell qty with negative
  • Calculate the cumulative sum for the sales only for the current row and all following rows, so you can get the total sold items at the oldest buy qty
  • Get the difference between the first two and consider it as the actual consumed-qty
  • Group by symbol/product and calculate the profit/loss


from pyspark.sql import Window
from pyspark.sql.functions import col, when, sum

data = [\
    (&#39;05 July 2022&#39;, &#39;IDBI2300PE&#39;, 500, 45, &#39;buy&#39;),\
    (&#39;07 July 2022&#39;, &#39;REL2300PE&#39;, 200, 50, &#39;buy&#39;),\
    (&#39;07 July 2022&#39;, &#39;IDBI2300PE&#39;, 200, 50, &#39;sell&#39;),\
    (&#39;15 July 2022&#39;, &#39;REL2300PE&#39;, 100, 50, &#39;buy&#39;),\
    (&#39;15 July 2022&#39;, &#39;IDBI2300PE&#39;, 20, 50, &#39;buy&#39;),\
    (&#39;16 July 2022&#39;, &#39;REL2300PE&#39;, 200, 35, &#39;buy&#39;),\
    (&#39;20 July 2022&#39;, &#39;REL2300PE&#39;, 200, 45, &#39;sell&#39;),\
    (&#39;30 July 2022&#39;, &#39;IDBI2300PE&#39;, 60, 50, &#39;sell&#39;),\
    (&#39;30 July 2022&#39;, &#39;REL2300PE&#39;, 250, 45, &#39;sell&#39;),\
    (&#39;31 July 2022&#39;, &#39;IDBI2300PE&#39;, 200, 25, &#39;sell&#39;)]

df = spark.createDataFrame(data, [&quot;Date&quot;, &quot;symbol&quot;, &quot;qty&quot;, &quot;price&quot;, &quot;type&quot;])

Calculate consumed qty:

# Calculate cumulative sum for sells and buys for the current and all prev rows 
cum_sum_wind = Window.partitionBy(&#39;symbol&#39;).orderBy(&#39;Date&#39;).rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn(&#39;cum_sum&#39;, sum(when(col(&#39;type&#39;) == &#39;sell&#39;, -1 * col(&#39;qty&#39;))\

# Calculate cumulative sum for sells only for the current and all following rows
sell_cum_sum_wind = Window.partitionBy(&#39;symbol&#39;).orderBy(&#39;Date&#39;).rangeBetween(0, Window.unboundedFollowing)
df = df.withColumn(&#39;sell_cum_sum&#39;, sum(when(col(&#39;type&#39;) == &#39;sell&#39;, col(&#39;qty&#39;))\

# Calculate the actual consumed qty
df = df.withColumn(&#39;cons_qty&#39;, when(col(&#39;type&#39;) == &#39;sell&#39;, col(&#39;qty&#39;))\ 
                                .when(col(&#39;sell_cum_sum&#39;) &gt; col(&#39;cum_sum&#39;), col(&#39;qty&#39;))\
                                # If negative then nothing is consumed from this row
                                .when((col(&#39;qty&#39;) - (col(&#39;cum_sum&#39;) - col(&#39;sell_cum_sum&#39;))) &lt; 0, 0)\
                                .otherwise(col(&#39;qty&#39;) - (col(&#39;cum_sum&#39;) - col(&#39;sell_cum_sum&#39;))))

|        Date|    symbol|qty|price|type|cum_sum|sell_cum_sum|cons_qty|
|05 July 2022|IDBI2300PE|500|   45| buy|    500|         460|     460|
|07 July 2022|IDBI2300PE|200|   50|sell|    300|         460|     200|
|15 July 2022|IDBI2300PE| 20|   50| buy|    320|         260|       0|
|30 July 2022|IDBI2300PE| 60|   50|sell|    260|         260|      60|
|31 July 2022|IDBI2300PE|200|   25|sell|     60|         200|     200|
|07 July 2022| REL2300PE|200|   50| buy|    200|         450|     200|
|15 July 2022| REL2300PE|100|   50| buy|    300|         450|     100|
|16 July 2022| REL2300PE|200|   35| buy|    500|         450|     150|
|20 July 2022| REL2300PE|200|   45|sell|    300|         450|     200|
|30 July 2022| REL2300PE|250|   45|sell|     50|         250|     250|

Calculate global profit/loss per symbol:

# Groupby symbol and calculate the profit/loss
result = df.groupby(&#39;symbol&#39;)\
        sum(when(col(&#39;type&#39;) == &#39;buy&#39;, -1 * col(&#39;price&#39;) * col(&#39;cons_qty&#39;))\
                .otherwise(col(&#39;price&#39;) * col(&#39;cons_qty&#39;))\
|    symbol|profit_loss|
|IDBI2300PE|      -2700|
| REL2300PE|          0|


Add a new column total_price using withColumn using additional calculation as multiply by -1 to total_price if type is 'buy' and multiply by 1 if type is 'sell'

df = df.withColumn(&#39;total_price&#39;, when(df.type == &#39;buy&#39;, (df.qty * df.price_per_qty * (-1))).otherwise(df.qty*df.price_per_qty) 
df = df.groupBy(col(&#39;symbol&#39;)).sum(col(&#39;total_price&#39;).alias(&#39;profit_loss&#39;))

Please try to understand my concept here. There may be code syntax error as I don't have laptop now.



If you want to use Hive to solve this problem you can use below query to get the desired result:

    select date
    ,sum(qty * (CASE 
                    WHEN type=&#39;buy&#39; THEN price_per_qty 
                    ELSE (price_per_qty*(-1)) 
        )  over (partition by symbol order by date  rows between unbounded preceding and current row) as total_profit
        ,row_number() over( partition by symbol order by date desc ) as rn
    from stock1) as running_sum where rn=1;

